package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.BitSet;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SerializerManagerUtility;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.SumAndCount;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamInputProcessor.class */
public class StreamInputProcessor<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class);
    private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
    private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
    private final DeserializationDelegate<StreamElement> deserializationDelegate;
    private final SelectedReadingBarrierHandler barrierHandler;
    private final Object lock;
    private StatusWatermarkValve statusWatermarkValve;
    private final int numInputChannels;
    private final BitSet channelsWithEndOfPartitionEvents;
    private final StreamStatusMaintainer streamStatusMaintainer;
    private final OneInputStreamOperator<IN, ?> streamOperator;
    private final WatermarkGauge watermarkGauge;
    private Counter numRecordsIn;
    private boolean enableTracingMetrics;
    private int tracingMetricsInterval;
    private SumAndCount taskLatency;
    private SumAndCount waitInput;
    private boolean isFinished;
    private IN reusedObject;
    private int currentChannel = -1;
    private long lastProcessedTime = -1;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamInputProcessor$ForwardingValveOutputHandler.class */
    private class ForwardingValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler {
        private final OneInputStreamOperator<IN, ?> operator;
        private final Object lock;

        private ForwardingValveOutputHandler(OneInputStreamOperator<IN, ?> oneInputStreamOperator, Object obj) {
            this.operator = (OneInputStreamOperator) Preconditions.checkNotNull(oneInputStreamOperator);
            this.lock = Preconditions.checkNotNull(obj);
        }

        @Override // org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.ValveOutputHandler
        public void handleWatermark(Watermark watermark) {
            try {
                synchronized (this.lock) {
                    StreamInputProcessor.this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                    this.operator.processWatermark(watermark);
                }
            } catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
            }
        }

        @Override // org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.ValveOutputHandler
        public void handleStreamStatus(StreamStatus streamStatus) {
            try {
                synchronized (this.lock) {
                    StreamInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(streamStatus);
                }
            } catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
            }
        }
    }

    /* JADX WARN: Type inference failed for: r6v1, types: [org.apache.flink.runtime.io.network.partition.consumer.InputGate[], org.apache.flink.runtime.io.network.partition.consumer.InputGate[][]] */
    public StreamInputProcessor(InputGate[] inputGateArr, TypeSerializer<IN> typeSerializer, boolean z, StreamTask<?, ?> streamTask, CheckpointingMode checkpointingMode, Object obj, IOManager iOManager, Configuration configuration, StreamStatusMaintainer streamStatusMaintainer, OneInputStreamOperator<IN, ?> oneInputStreamOperator, TaskIOMetricGroup taskIOMetricGroup, WatermarkGauge watermarkGauge, boolean z2, boolean z3, int i) throws IOException {
        this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(z, streamTask, checkpointingMode, iOManager, configuration, (InputGate[][]) new InputGate[]{inputGateArr});
        this.lock = Preconditions.checkNotNull(obj);
        StreamElementSerializer streamElementSerializer = new StreamElementSerializer(typeSerializer);
        this.deserializationDelegate = z2 ? new ReusingDeserializationDelegate(streamElementSerializer) : new NonReusingDeserializationDelegate(streamElementSerializer);
        this.reusedObject = z2 ? (IN) typeSerializer.createInstance() : null;
        this.numInputChannels = this.barrierHandler.getNumberOfInputChannels();
        this.recordDeserializers = new SerializerManagerUtility(configuration).createRecordDeserializers(this.barrierHandler.getAllInputChannels(), iOManager.getSpillingDirectoriesPaths());
        this.channelsWithEndOfPartitionEvents = new BitSet(this.numInputChannels);
        this.streamStatusMaintainer = (StreamStatusMaintainer) Preconditions.checkNotNull(streamStatusMaintainer);
        this.streamOperator = (OneInputStreamOperator) Preconditions.checkNotNull(oneInputStreamOperator);
        this.statusWatermarkValve = new StatusWatermarkValve(this.numInputChannels, new ForwardingValveOutputHandler(oneInputStreamOperator, obj));
        this.watermarkGauge = watermarkGauge;
        SelectedReadingBarrierHandler selectedReadingBarrierHandler = this.barrierHandler;
        selectedReadingBarrierHandler.getClass();
        taskIOMetricGroup.gauge("checkpointAlignmentTime", selectedReadingBarrierHandler::getAlignmentDurationNanos);
        this.enableTracingMetrics = z3;
        this.tracingMetricsInterval = i;
    }

    public boolean processInput() throws Exception {
        if (this.isFinished) {
            return false;
        }
        if (this.numRecordsIn == null) {
            try {
                this.numRecordsIn = this.streamOperator.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                this.numRecordsIn = new SimpleCounter();
            }
        }
        if (this.enableTracingMetrics) {
            if (this.taskLatency == null) {
                this.taskLatency = new SumAndCount("taskLatency", this.streamOperator.getMetricGroup());
            }
            if (this.waitInput == null) {
                this.waitInput = new SumAndCount("waitInput", this.streamOperator.getMetricGroup());
            }
        }
        while (true) {
            if (this.currentRecordDeserializer != null) {
                if (this.reusedObject != null) {
                    this.deserializationDelegate.setInstance(new StreamRecord(this.reusedObject));
                }
                RecordDeserializer.DeserializationResult nextRecord = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);
                if (nextRecord.isBufferConsumed()) {
                    this.currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    this.currentRecordDeserializer = null;
                }
                if (nextRecord.isFullRecord()) {
                    StreamElement streamElement = (StreamElement) this.deserializationDelegate.getInstance();
                    if (streamElement.isRecord()) {
                        this.reusedObject = (IN) ((StreamRecord) streamElement).getValue();
                        if (!this.enableTracingMetrics || this.numRecordsIn.getCount() % this.tracingMetricsInterval != 0) {
                            StreamRecord<?> asRecord = streamElement.asRecord();
                            synchronized (this.lock) {
                                this.numRecordsIn.inc();
                                this.streamOperator.setKeyContextElement1(asRecord);
                                this.streamOperator.processElement(asRecord);
                            }
                            return true;
                        }
                        long nanoTime = System.nanoTime();
                        this.waitInput.update(nanoTime - this.lastProcessedTime);
                        StreamRecord<?> asRecord2 = streamElement.asRecord();
                        synchronized (this.lock) {
                            this.numRecordsIn.inc();
                            this.streamOperator.setKeyContextElement1(asRecord2);
                            this.streamOperator.processElement(asRecord2);
                            this.lastProcessedTime = System.nanoTime();
                            this.taskLatency.update(this.lastProcessedTime - nanoTime);
                        }
                        return true;
                    }
                    if (streamElement.isWatermark()) {
                        this.statusWatermarkValve.inputWatermark(streamElement.asWatermark(), this.currentChannel);
                    } else if (streamElement.isStreamStatus()) {
                        this.statusWatermarkValve.inputStreamStatus(streamElement.asStreamStatus(), this.currentChannel);
                    } else {
                        if (!streamElement.isLatencyMarker()) {
                            throw new RuntimeException("Unexpected stream element type " + streamElement);
                        }
                        synchronized (this.lock) {
                            this.streamOperator.processLatencyMarker(streamElement.asLatencyMarker());
                        }
                    }
                }
            }
            BufferOrEvent nextNonBlocked = this.barrierHandler.getNextNonBlocked();
            if (nextNonBlocked == null) {
                this.isFinished = true;
                if (this.barrierHandler.isEmpty()) {
                    return false;
                }
                throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
            }
            if (nextNonBlocked.isBuffer()) {
                this.currentChannel = nextNonBlocked.getChannelIndex();
                this.currentRecordDeserializer = this.recordDeserializers[this.currentChannel];
                this.currentRecordDeserializer.setNextBuffer(nextNonBlocked.getBuffer());
            } else {
                AbstractEvent event = nextNonBlocked.getEvent();
                if (event.getClass() != EndOfPartitionEvent.class) {
                    throw new IOException("Unexpected event: " + event);
                }
                this.channelsWithEndOfPartitionEvents.set(nextNonBlocked.getChannelIndex());
                if (this.channelsWithEndOfPartitionEvents.cardinality() == this.numInputChannels) {
                    synchronized (this.lock) {
                        this.streamOperator.endInput();
                    }
                } else {
                    continue;
                }
            }
        }
    }

    public void cleanup() throws IOException {
        for (RecordDeserializer<DeserializationDelegate<StreamElement>> recordDeserializer : this.recordDeserializers) {
            Buffer currentBuffer = recordDeserializer.getCurrentBuffer();
            if (currentBuffer != null && !currentBuffer.isRecycled()) {
                currentBuffer.recycleBuffer();
            }
            recordDeserializer.clear();
        }
        this.barrierHandler.cleanup();
    }
}
