/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.source.reader;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaPartitionSplitReader<T>
implements SplitReader<Tuple3<T, Long, Long>, KafkaPartitionSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaPartitionSplitReader.class);
    private static final long POLL_TIMEOUT = 10000L;
    private final KafkaConsumer<byte[], byte[]> consumer;
    private final KafkaRecordDeserializer<T> deserializationSchema;
    private final Map<TopicPartition, Long> stoppingOffsets;
    private final SimpleCollector<T> collector;
    private final String groupId;
    private final int subtaskId;

    public KafkaPartitionSplitReader(Properties props, KafkaRecordDeserializer<T> deserializationSchema, int subtaskId) {
        Properties consumerProps = new Properties();
        consumerProps.putAll((Map<?, ?>)props);
        consumerProps.setProperty("client.id", this.createConsumerClientId(props));
        this.consumer = new KafkaConsumer(consumerProps);
        this.stoppingOffsets = new HashMap<TopicPartition, Long>();
        this.deserializationSchema = deserializationSchema;
        this.collector = new SimpleCollector();
        this.groupId = consumerProps.getProperty("group.id");
        this.subtaskId = subtaskId;
    }

    public RecordsWithSplitIds<Tuple3<T, Long, Long>> fetch() throws IOException {
        ConsumerRecords consumerRecords;
        KafkaPartitionSplitRecords<Tuple3<T, Long, Long>> recordsBySplits = new KafkaPartitionSplitRecords<Tuple3<T, Long, Long>>();
        try {
            consumerRecords = this.consumer.poll(Duration.ofMillis(10000L));
        }
        catch (WakeupException we) {
            ((KafkaPartitionSplitRecords)recordsBySplits).prepareForRead();
            return recordsBySplits;
        }
        ArrayList<TopicPartition> finishedPartitions = new ArrayList<TopicPartition>();
        block7: for (TopicPartition tp : consumerRecords.partitions()) {
            long stoppingOffset = this.getStoppingOffset(tp);
            String splitId = tp.toString();
            Collection recordsForSplit = ((KafkaPartitionSplitRecords)recordsBySplits).recordsForSplit(splitId);
            for (ConsumerRecord consumerRecord : consumerRecords.records(tp)) {
                if (consumerRecord.offset() >= stoppingOffset) {
                    this.finishSplitAtRecord(tp, stoppingOffset, consumerRecord.offset(), finishedPartitions, recordsBySplits);
                    continue block7;
                }
                try {
                    this.deserializationSchema.deserialize((ConsumerRecord<byte[], byte[]>)consumerRecord, this.collector);
                    ((SimpleCollector)this.collector).getRecords().forEach(r -> recordsForSplit.add(new Tuple3(r, (Object)consumerRecord.offset(), (Object)consumerRecord.timestamp())));
                    if (consumerRecord.offset() != stoppingOffset - 1L) continue;
                    this.finishSplitAtRecord(tp, stoppingOffset, consumerRecord.offset(), finishedPartitions, recordsBySplits);
                }
                catch (Exception e) {
                    throw new IOException("Failed to deserialize consumer record due to", e);
                }
                finally {
                    ((SimpleCollector)this.collector).reset();
                }
            }
        }
        if (!finishedPartitions.isEmpty()) {
            this.unassignPartitions(finishedPartitions);
        }
        ((KafkaPartitionSplitRecords)recordsBySplits).prepareForRead();
        return recordsBySplits;
    }

    public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        ArrayList newPartitionAssignments = new ArrayList();
        HashMap<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets = new HashMap<TopicPartition, Long>();
        ArrayList<TopicPartition> partitionsStartingFromEarliest = new ArrayList<TopicPartition>();
        ArrayList<TopicPartition> partitionsStartingFromLatest = new ArrayList<TopicPartition>();
        ArrayList<TopicPartition> partitionsStoppingAtLatest = new ArrayList<TopicPartition>();
        HashSet<TopicPartition> partitionsStoppingAtCommitted = new HashSet<TopicPartition>();
        splitsChange.splits().forEach(s -> {
            newPartitionAssignments.add(s.getTopicPartition());
            this.parseStartingOffsets((KafkaPartitionSplit)s, (List<TopicPartition>)partitionsStartingFromEarliest, (List<TopicPartition>)partitionsStartingFromLatest, (Map<TopicPartition, Long>)partitionsStartingFromSpecifiedOffsets);
            this.parseStoppingOffsets((KafkaPartitionSplit)s, (List<TopicPartition>)partitionsStoppingAtLatest, (Set<TopicPartition>)partitionsStoppingAtCommitted);
        });
        newPartitionAssignments.addAll(this.consumer.assignment());
        this.consumer.assign(newPartitionAssignments);
        this.seekToStartingOffsets(partitionsStartingFromEarliest, partitionsStartingFromLatest, partitionsStartingFromSpecifiedOffsets);
        this.acquireAndSetStoppingOffsets(partitionsStoppingAtLatest, partitionsStoppingAtCommitted);
        this.removeEmptySplits();
        this.maybeLogSplitChangesHandlingResult(splitsChange);
    }

    public void wakeUp() {
        this.consumer.wakeup();
    }

    public void close() throws Exception {
        this.consumer.close();
    }

    public void notifyCheckpointComplete(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, OffsetCommitCallback offsetCommitCallback) {
        this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
    }

    private void parseStartingOffsets(KafkaPartitionSplit split, List<TopicPartition> partitionsStartingFromEarliest, List<TopicPartition> partitionsStartingFromLatest, Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
        TopicPartition tp = split.getTopicPartition();
        if (split.getStartingOffset() == -2L) {
            partitionsStartingFromEarliest.add(tp);
        } else if (split.getStartingOffset() == -1L) {
            partitionsStartingFromLatest.add(tp);
        } else if (split.getStartingOffset() != -3L) {
            partitionsStartingFromSpecifiedOffsets.put(tp, split.getStartingOffset());
        }
    }

    private void parseStoppingOffsets(KafkaPartitionSplit split, List<TopicPartition> partitionsStoppingAtLatest, Set<TopicPartition> partitionsStoppingAtCommitted) {
        TopicPartition tp = split.getTopicPartition();
        split.getStoppingOffset().ifPresent(stoppingOffset -> {
            if (stoppingOffset >= 0L) {
                this.stoppingOffsets.put(tp, (Long)stoppingOffset);
            } else if (stoppingOffset == -1L) {
                partitionsStoppingAtLatest.add(tp);
            } else if (stoppingOffset == -3L) {
                partitionsStoppingAtCommitted.add(tp);
            } else {
                throw new FlinkRuntimeException(String.format("Invalid stopping offset %d for partition %s", stoppingOffset, tp));
            }
        });
    }

    private void seekToStartingOffsets(List<TopicPartition> partitionsStartingFromEarliest, List<TopicPartition> partitionsStartingFromLatest, Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
        if (!partitionsStartingFromEarliest.isEmpty()) {
            LOG.trace("Seeking starting offsets to beginning: {}", partitionsStartingFromEarliest);
            this.consumer.seekToBeginning(partitionsStartingFromEarliest);
        }
        if (!partitionsStartingFromLatest.isEmpty()) {
            LOG.trace("Seeking starting offsets to end: {}", partitionsStartingFromLatest);
            this.consumer.seekToEnd(partitionsStartingFromLatest);
        }
        if (!partitionsStartingFromSpecifiedOffsets.isEmpty()) {
            LOG.trace("Seeking starting offsets to specified offsets: {}", partitionsStartingFromSpecifiedOffsets);
            partitionsStartingFromSpecifiedOffsets.forEach((arg_0, arg_1) -> this.consumer.seek(arg_0, arg_1));
        }
    }

    private void acquireAndSetStoppingOffsets(List<TopicPartition> partitionsStoppingAtLatest, Set<TopicPartition> partitionsStoppingAtCommitted) {
        Map endOffset = this.consumer.endOffsets(partitionsStoppingAtLatest);
        this.stoppingOffsets.putAll(endOffset);
        this.consumer.committed(partitionsStoppingAtCommitted).forEach((tp, offsetAndMetadata) -> {
            Preconditions.checkNotNull((Object)offsetAndMetadata, (String)String.format("Partition %s should stop at committed offset. But there is no committed offset of this partition for group %s", tp, this.groupId));
            this.stoppingOffsets.put((TopicPartition)tp, offsetAndMetadata.offset());
        });
    }

    private void removeEmptySplits() {
        ArrayList<TopicPartition> emptySplits = new ArrayList<TopicPartition>();
        for (TopicPartition tp : this.consumer.assignment()) {
            if (this.consumer.position(tp) < this.getStoppingOffset(tp)) continue;
            emptySplits.add(tp);
        }
        if (!emptySplits.isEmpty()) {
            this.unassignPartitions(emptySplits);
        }
    }

    private void maybeLogSplitChangesHandlingResult(SplitsChange<KafkaPartitionSplit> splitsChange) {
        if (LOG.isDebugEnabled()) {
            StringJoiner splitsInfo = new StringJoiner(",");
            for (KafkaPartitionSplit split : splitsChange.splits()) {
                long startingOffset = this.consumer.position(split.getTopicPartition());
                long stoppingOffset = this.getStoppingOffset(split.getTopicPartition());
                splitsInfo.add(String.format("[%s, start:%d, stop: %d]", split.getTopicPartition(), startingOffset, stoppingOffset));
            }
            LOG.debug("SplitsChange handling result: {}", (Object)splitsInfo.toString());
        }
    }

    private void unassignPartitions(Collection<TopicPartition> partitionsToUnassign) {
        HashSet newAssignment = new HashSet(this.consumer.assignment());
        newAssignment.removeAll(partitionsToUnassign);
        this.consumer.assign(newAssignment);
    }

    private String createConsumerClientId(Properties props) {
        String prefix = props.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key());
        return prefix + "-" + this.subtaskId;
    }

    private void finishSplitAtRecord(TopicPartition tp, long stoppingOffset, long currentOffset, List<TopicPartition> finishedPartitions, KafkaPartitionSplitRecords<Tuple3<T, Long, Long>> recordsBySplits) {
        LOG.debug("{} has reached stopping offset {}, current offset is {}", new Object[]{tp, stoppingOffset, currentOffset});
        finishedPartitions.add(tp);
        ((KafkaPartitionSplitRecords)recordsBySplits).addFinishedSplit(KafkaPartitionSplit.toSplitId(tp));
    }

    private long getStoppingOffset(TopicPartition tp) {
        return this.stoppingOffsets.getOrDefault(tp, Long.MAX_VALUE);
    }

    private static class SimpleCollector<T>
    implements Collector<T> {
        private final List<T> records = new ArrayList<T>();

        private SimpleCollector() {
        }

        public void collect(T record) {
            this.records.add(record);
        }

        public void close() {
        }

        private List<T> getRecords() {
            return this.records;
        }

        private void reset() {
            this.records.clear();
        }
    }

    private static class KafkaPartitionSplitRecords<T>
    implements RecordsWithSplitIds<T> {
        private final Map<String, Collection<T>> recordsBySplits = new HashMap<String, Collection<T>>();
        private final Set<String> finishedSplits = new HashSet<String>();
        private Iterator<Map.Entry<String, Collection<T>>> splitIterator;
        private String currentSplitId;
        private Iterator<T> recordIterator;

        private KafkaPartitionSplitRecords() {
        }

        private Collection<T> recordsForSplit(String splitId) {
            return this.recordsBySplits.computeIfAbsent(splitId, id -> new ArrayList());
        }

        private void addFinishedSplit(String splitId) {
            this.finishedSplits.add(splitId);
        }

        private void prepareForRead() {
            this.splitIterator = this.recordsBySplits.entrySet().iterator();
        }

        @Nullable
        public String nextSplit() {
            if (this.splitIterator.hasNext()) {
                Map.Entry<String, Collection<T>> entry = this.splitIterator.next();
                this.currentSplitId = entry.getKey();
                this.recordIterator = entry.getValue().iterator();
                return this.currentSplitId;
            }
            this.currentSplitId = null;
            this.recordIterator = null;
            return null;
        }

        @Nullable
        public T nextRecordFromSplit() {
            Preconditions.checkNotNull((Object)this.currentSplitId, (String)"Make sure nextSplit() did not return null before iterate over the records split.");
            if (this.recordIterator.hasNext()) {
                return this.recordIterator.next();
            }
            return null;
        }

        public Set<String> finishedSplits() {
            return this.finishedSplits;
        }
    }
}

