/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.WakeupException;

@Internal
public class KafkaPartitionDiscoverer
extends AbstractPartitionDiscoverer {
    private final Properties kafkaProperties;
    private KafkaConsumer<?, ?> kafkaConsumer;

    public KafkaPartitionDiscoverer(KafkaTopicsDescriptor topicsDescriptor, int indexOfThisSubtask, int numParallelSubtasks, Properties kafkaProperties) {
        super(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks);
        this.kafkaProperties = (Properties)Preconditions.checkNotNull((Object)kafkaProperties);
    }

    @Override
    protected void initializeConnections() {
        this.kafkaConsumer = new KafkaConsumer(this.kafkaProperties);
    }

    @Override
    protected List<String> getAllTopics() throws AbstractPartitionDiscoverer.WakeupException {
        try {
            return new ArrayList<String>(this.kafkaConsumer.listTopics().keySet());
        }
        catch (WakeupException e) {
            throw new AbstractPartitionDiscoverer.WakeupException();
        }
    }

    @Override
    protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) throws AbstractPartitionDiscoverer.WakeupException, RuntimeException {
        LinkedList<KafkaTopicPartition> partitions = new LinkedList<KafkaTopicPartition>();
        try {
            for (String topic : topics) {
                List kafkaPartitions = this.kafkaConsumer.partitionsFor(topic);
                if (kafkaPartitions == null) {
                    throw new RuntimeException(String.format("Could not fetch partitions for %s. Make sure that the topic exists.", topic));
                }
                for (PartitionInfo partitionInfo : kafkaPartitions) {
                    partitions.add(new KafkaTopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                }
            }
        }
        catch (WakeupException e) {
            throw new AbstractPartitionDiscoverer.WakeupException();
        }
        return partitions;
    }

    @Override
    protected void wakeupConnections() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.wakeup();
        }
    }

    @Override
    protected void closeConnections() throws Exception {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.close();
            this.kafkaConsumer = null;
        }
    }
}

