/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.listener.BatchListenerFailedException;
import org.springframework.kafka.listener.ContainerAwareBatchErrorHandler;
import org.springframework.kafka.listener.FailedRecordProcessor;
import org.springframework.kafka.listener.LoggingCommitCallback;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler;
import org.springframework.kafka.listener.SeekUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.backoff.BackOff;

public class RecoveringBatchErrorHandler
extends FailedRecordProcessor
implements ContainerAwareBatchErrorHandler {
    private static final LoggingCommitCallback LOGGING_COMMIT_CALLBACK = new LoggingCommitCallback();
    private final SeekToCurrentBatchErrorHandler fallbackHandler = new SeekToCurrentBatchErrorHandler();

    public RecoveringBatchErrorHandler() {
        this(null, (BackOff)SeekUtils.DEFAULT_BACK_OFF);
    }

    public RecoveringBatchErrorHandler(BackOff backOff) {
        this(null, backOff);
    }

    public RecoveringBatchErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
        this(recoverer, (BackOff)SeekUtils.DEFAULT_BACK_OFF);
    }

    public RecoveringBatchErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff) {
        super(recoverer, backOff);
        this.fallbackHandler.setBackOff(backOff);
    }

    @Override
    public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container) {
        Throwable cause = thrownException.getCause();
        if (!(cause instanceof BatchListenerFailedException)) {
            this.logger.debug(cause, (CharSequence)"Expected a BatchListenerFailedException; re-seeking batch");
            this.fallbackHandler.handle(thrownException, data, consumer, container);
        } else {
            int index;
            ConsumerRecord<?, ?> record = ((BatchListenerFailedException)((Object)cause)).getRecord();
            int n = index = record != null ? this.findIndex(data, record) : ((BatchListenerFailedException)((Object)cause)).getIndex();
            if (index < 0 || index >= data.count()) {
                this.logger.warn(cause, () -> String.format("Record not found in batch: %s-%d@%d; re-seeking batch", record.topic(), record.partition(), record.offset()));
                this.fallbackHandler.handle(thrownException, data, consumer, container);
            } else {
                this.seekOrRecover(thrownException, data, consumer, container, index);
            }
        }
    }

    private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
        ConsumerRecord candidate;
        if (record == null) {
            return -1;
        }
        int i = 0;
        Iterator iterator = data.iterator();
        while (iterator.hasNext() && (!(candidate = (ConsumerRecord)iterator.next()).topic().equals(record.topic()) || candidate.partition() != record.partition() || candidate.offset() != record.offset())) {
            ++i;
        }
        return i;
    }

    private void seekOrRecover(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container, int indexArg) {
        Iterator iterator = data.iterator();
        ArrayList<ConsumerRecord> toCommit = new ArrayList<ConsumerRecord>();
        ArrayList remaining = new ArrayList();
        int index = indexArg;
        while (iterator.hasNext()) {
            ConsumerRecord record = (ConsumerRecord)iterator.next();
            if (index-- > 0) {
                toCommit.add(record);
                continue;
            }
            remaining.add(record);
        }
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        toCommit.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()), (key, val) -> new OffsetAndMetadata(rec.offset() + 1L)));
        if (offsets.size() > 0) {
            this.commit(consumer, container, offsets);
        }
        if (remaining.size() > 0) {
            SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false, this.getSkipPredicate(remaining, thrownException), this.logger, this.getLogLevel());
            ConsumerRecord recovered = (ConsumerRecord)remaining.get(0);
            this.commit(consumer, container, Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()), new OffsetAndMetadata(recovered.offset() + 1L)));
            if (remaining.size() > 1) {
                throw new KafkaException("Seek to current after exception", this.getLogLevel(), thrownException);
            }
        }
    }

    private void commit(Consumer<?, ?> consumer, MessageListenerContainer container, Map<TopicPartition, OffsetAndMetadata> offsets) {
        boolean syncCommits = container.getContainerProperties().isSyncCommits();
        Duration timeout = container.getContainerProperties().getSyncCommitTimeout();
        if (syncCommits) {
            consumer.commitSync(offsets, timeout);
        } else {
            OffsetCommitCallback commitCallback = container.getContainerProperties().getCommitCallback();
            if (commitCallback == null) {
                commitCallback = LOGGING_COMMIT_CALLBACK;
            }
            consumer.commitAsync(offsets, commitCallback);
        }
    }
}

