/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.time.Duration;
import java.util.Collection;
import java.util.function.BiConsumer;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.listener.ListenerInvokingBatchErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler;
import org.springframework.lang.Nullable;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.BackOffExecution;
import org.springframework.util.backoff.FixedBackOff;

public class RetryingBatchErrorHandler
implements ListenerInvokingBatchErrorHandler {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(RetryingBatchErrorHandler.class));
    private final BackOff backOff;
    private final BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer;
    private final SeekToCurrentBatchErrorHandler seeker = new SeekToCurrentBatchErrorHandler();

    public RetryingBatchErrorHandler() {
        this((BackOff)new FixedBackOff(), null);
    }

    public RetryingBatchErrorHandler(BackOff backOff, @Nullable ConsumerRecordRecoverer recoverer) {
        this.backOff = backOff;
        this.recoverer = (crs, ex) -> {
            if (recoverer == null) {
                LOGGER.error((Throwable)ex, () -> "Records discarded: " + this.tpos((ConsumerRecords<?, ?>)crs));
            } else {
                crs.spliterator().forEachRemaining(rec -> recoverer.accept(rec, ex));
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void handle(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
        BackOffExecution execution = this.backOff.start();
        long nextBackOff = execution.nextBackOff();
        String failed = null;
        consumer.pause((Collection)consumer.assignment());
        while (nextBackOff != -1L) {
            consumer.poll(Duration.ZERO);
            try {
                Thread.sleep(nextBackOff);
            }
            catch (InterruptedException e1) {
                Thread.currentThread().interrupt();
                this.seeker.handle(thrownException, records, consumer, container);
                throw new KafkaException("Interrupted during retry", e1);
            }
            try {
                invokeListener.run();
                return;
            }
            catch (Exception e) {
                if (failed == null) {
                    failed = this.tpos(records);
                }
                String toLog = failed;
                LOGGER.debug((Throwable)e, () -> "Retry failed for: " + toLog);
                nextBackOff = execution.nextBackOff();
            }
        }
        this.recoverer.accept(records, thrownException);
        return;
        catch (Exception e) {
            LOGGER.error((Throwable)e, () -> "Recoverer threw an exception; re-seeking batch");
            this.seeker.handle(thrownException, records, consumer, container);
            return;
        }
        finally {
            consumer.resume((Collection)consumer.assignment());
        }
    }

    private String tpos(ConsumerRecords<?, ?> records) {
        StringBuffer sb = new StringBuffer();
        records.spliterator().forEachRemaining(rec -> sb.append(rec.topic()).append('-').append(rec.partition()).append('@').append(rec.offset()).append(','));
        sb.deleteCharAt(sb.length() - 1);
        return sb.toString();
    }
}

