/*
 * Decompiled with CFR 0.152.
 */
package reactor.kafka.receiver.internals;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.receiver.internals.AckMode;
import reactor.kafka.receiver.internals.CommittableBatch;
import reactor.kafka.receiver.internals.ConsumerFactory;
import reactor.kafka.receiver.internals.ConsumerHandler;
import reactor.kafka.receiver.observation.KafkaReceiverObservation;
import reactor.kafka.receiver.observation.KafkaRecordReceiverContext;
import reactor.kafka.sender.TransactionManager;

public class DefaultKafkaReceiver<K, V>
implements KafkaReceiver<K, V> {
    private static final Logger log = LoggerFactory.getLogger(DefaultKafkaReceiver.class);
    private final ConsumerFactory consumerFactory;
    private final ReceiverOptions<K, V> receiverOptions;
    private final String receiverId;
    Predicate<Throwable> isRetriableException = t -> RetriableCommitFailedException.class.isInstance(t) || RebalanceInProgressException.class.isInstance(t);
    final AtomicReference<ConsumerHandler<K, V>> consumerHandlerRef = new AtomicReference();

    public DefaultKafkaReceiver(ConsumerFactory consumerFactory, ReceiverOptions<K, V> receiverOptions) {
        this.consumerFactory = consumerFactory;
        this.receiverOptions = receiverOptions;
        this.receiverId = Optional.ofNullable(receiverOptions.clientId()).filter(clientId -> !clientId.isEmpty()).orElse("reactor-kafka-receiver-" + System.identityHashCode(this));
    }

    @Override
    public Flux<ReceiverRecord<K, V>> receive(Integer prefetch) {
        return this.withHandler(AckMode.MANUAL_ACK, (scheduler, handler) -> {
            int prefetchCalculated = this.preparePublishOnQueueSize(prefetch);
            return handler.receive().publishOn(scheduler, prefetchCalculated).flatMapIterable(it -> it, prefetchCalculated).doOnNext(this::observerRecord).map(record -> new ReceiverRecord(record, handler.toCommittableOffset(record)));
        });
    }

    @Override
    public Flux<Flux<ReceiverRecord<K, V>>> receiveBatch(Integer prefetch) {
        return this.withHandler(AckMode.MANUAL_ACK, (scheduler, handler) -> {
            int prefetchCalculated = this.preparePublishOnQueueSize(prefetch);
            return handler.receive().filter(it -> !it.isEmpty()).publishOn(scheduler, prefetchCalculated).map(records -> Flux.fromIterable((Iterable)records).map(record -> new ReceiverRecord(record, handler.toCommittableOffset(record))));
        });
    }

    @Override
    public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(Integer prefetch) {
        return this.withHandler(AckMode.AUTO_ACK, (scheduler, handler) -> handler.receive().filter(it -> !it.isEmpty()).publishOn(scheduler, this.preparePublishOnQueueSize(prefetch)).map(consumerRecords -> Flux.fromIterable((Iterable)consumerRecords).doOnNext(this::observerRecord).doAfterTerminate(() -> {
            for (ConsumerRecord r : consumerRecords) {
                handler.acknowledge(r);
            }
        })));
    }

    @Override
    public Flux<ConsumerRecord<K, V>> receiveAtmostOnce(Integer prefetch) {
        return this.withHandler(AckMode.ATMOST_ONCE, (scheduler, handler) -> handler.receive().concatMap(records -> Flux.fromIterable((Iterable)records).doOnNext(this::observerRecord).concatMap(r -> handler.commit(r).thenReturn(r)).publishOn(scheduler, 1), this.preparePublishOnQueueSize(prefetch)));
    }

    @Override
    public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(TransactionManager transactionManager, Integer prefetch) {
        return this.withHandler(AckMode.EXACTLY_ONCE, (scheduler, handler) -> {
            Flux resultFlux = handler.receive().filter(it -> !it.isEmpty()).map(consumerRecords -> {
                CommittableBatch offsetBatch = new CommittableBatch();
                for (ConsumerRecord r : consumerRecords) {
                    offsetBatch.updateOffset(new TopicPartition(r.topic(), r.partition()), r.offset());
                }
                return transactionManager.begin().thenMany((Publisher)Flux.defer(() -> {
                    handler.awaitingTransaction.getAndSet(true);
                    return Flux.fromIterable((Iterable)consumerRecords);
                })).concatWith(transactionManager.sendOffsets(offsetBatch.getAndClearOffsets().offsets(), handler.consumer.groupMetadata())).doOnNext(this::observerRecord).doAfterTerminate(() -> handler.awaitingTransaction.set(false));
            });
            return resultFlux.publishOn(transactionManager.scheduler(), this.preparePublishOnQueueSize(prefetch));
        });
    }

    private <R extends ConsumerRecord<K, V>> void observerRecord(R record) {
        KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(this.receiverOptions.observationConvention(), KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE, () -> new KafkaRecordReceiverContext(record, this.receiverId, this.receiverOptions.bootstrapServers()), this.receiverOptions.observationRegistry()).observe(() -> log.trace("[{}] received: {}", (Object)this.receiverId, (Object)record));
    }

    @Override
    public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
        ConsumerHandler<K, V> consumerHandler = this.consumerHandlerRef.get();
        if (consumerHandler == null) {
            return Mono.error((Throwable)new IllegalStateException("You must call one of receive*() methods before using doOnConsumer"));
        }
        return consumerHandler.doOnConsumer(function);
    }

    private <T> Flux<T> withHandler(AckMode ackMode, BiFunction<Scheduler, ConsumerHandler<K, V>, Flux<T>> function) {
        return Flux.usingWhen((Publisher)Mono.fromCallable(() -> {
            ConsumerHandler<K, V> consumerHandler = new ConsumerHandler<K, V>(this.receiverOptions, this.consumerFactory.createConsumer(this.receiverOptions), e -> this.isRetriableException.test((Throwable)e), ackMode);
            this.consumerHandlerRef.set(consumerHandler);
            return consumerHandler;
        }), handler -> Flux.using(() -> Schedulers.single((Scheduler)this.receiverOptions.schedulerSupplier().get()), scheduler -> (Flux)function.apply((Scheduler)scheduler, (ConsumerHandler)handler), Scheduler::dispose), handler -> handler.close().doFinally(__ -> this.consumerHandlerRef.compareAndSet((ConsumerHandler<K, ConsumerHandler>)handler, (ConsumerHandler<K, ConsumerHandler>)null)));
    }

    private int preparePublishOnQueueSize(Integer prefetch) {
        return prefetch != null ? prefetch : 1;
    }
}

