/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientUtils {
    private static final Logger LOG = LoggerFactory.getLogger(ClientUtils.class);

    public static String getSharedAdminClientId(String clientId) {
        return clientId + "-admin";
    }

    public static String getConsumerClientId(String threadClientId) {
        return threadClientId + "-consumer";
    }

    public static String getRestoreConsumerClientId(String threadClientId) {
        return threadClientId + "-restore-consumer";
    }

    public static String getThreadProducerClientId(String threadClientId) {
        return threadClientId + "-producer";
    }

    public static String getTaskProducerClientId(String threadClientId, TaskId taskId) {
        return threadClientId + "-" + taskId + "-producer";
    }

    public static Map<MetricName, Metric> consumerMetrics(Consumer<byte[], byte[]> mainConsumer, Consumer<byte[], byte[]> restoreConsumer) {
        Map consumerMetrics = mainConsumer.metrics();
        Map restoreConsumerMetrics = restoreConsumer.metrics();
        LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<MetricName, Metric>();
        result.putAll(consumerMetrics);
        result.putAll(restoreConsumerMetrics);
        return result;
    }

    public static Map<MetricName, Metric> adminClientMetrics(Admin adminClient) {
        Map adminClientMetrics = adminClient.metrics();
        return new LinkedHashMap<MetricName, Metric>(adminClientMetrics);
    }

    public static Map<MetricName, Metric> producerMetrics(Collection<StreamsProducer> producers) {
        LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<MetricName, Metric>();
        for (StreamsProducer producer : producers) {
            Map<MetricName, ? extends Metric> producerMetrics = producer.metrics();
            if (producerMetrics == null) continue;
            result.putAll(producerMetrics);
        }
        return result;
    }

    public static Map<TopicPartition, Long> fetchCommittedOffsets(Set<TopicPartition> partitions, Consumer<byte[], byte[]> consumer) {
        Map<TopicPartition, Long> committedOffsets;
        if (partitions.isEmpty()) {
            return Collections.emptyMap();
        }
        try {
            committedOffsets = consumer.committed(partitions).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : ((OffsetAndMetadata)e.getValue()).offset()));
        }
        catch (TimeoutException timeoutException) {
            LOG.warn("The committed offsets request timed out, try increasing the consumer client's default.api.timeout.ms", (Throwable)timeoutException);
            throw timeoutException;
        }
        catch (KafkaException fatal) {
            LOG.warn("The committed offsets request failed.", (Throwable)fatal);
            throw new StreamsException(String.format("Failed to retrieve end offsets for %s", partitions), fatal);
        }
        return committedOffsets;
    }

    public static KafkaFuture<Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>> fetchEndOffsetsFuture(Collection<TopicPartition> partitions, Admin adminClient) {
        return adminClient.listOffsets(partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()))).all();
    }

    public static Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> getEndOffsets(KafkaFuture<Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>> endOffsetsFuture) {
        try {
            return (Map)endOffsetsFuture.get();
        }
        catch (InterruptedException | RuntimeException | ExecutionException e) {
            LOG.warn("The listOffsets request failed.", (Throwable)e);
            throw new StreamsException("Unable to obtain end offsets from kafka", e);
        }
    }

    public static Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> fetchEndOffsets(Collection<TopicPartition> partitions, Admin adminClient) {
        if (partitions.isEmpty()) {
            return Collections.emptyMap();
        }
        return ClientUtils.getEndOffsets(ClientUtils.fetchEndOffsetsFuture(partitions, adminClient));
    }

    public static String extractThreadId(String fullThreadName) {
        int index = fullThreadName.indexOf("StreamThread-");
        return fullThreadName.substring(index);
    }

    public static final class QuietConsumerConfig
    extends ConsumerConfig {
        public QuietConsumerConfig(Map<String, Object> props) {
            super(props, false);
        }
    }

    public static final class QuietStreamsConfig
    extends StreamsConfig {
        public QuietStreamsConfig(Map<?, ?> props) {
            super(props, false);
        }
    }
}

