package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.BitSet;
import java.util.Collection;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SerializerManagerUtility;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.SumAndCount;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.class */
public class StreamTwoInputProcessor<IN1, IN2> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
    private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializerOfChannels;
    private RecordDeserializer<DeserializationDelegate<StreamElement>>[] currentRecordDeserializerOfInputs;
    private final DeserializationDelegate<StreamElement>[] deserializationDelegateOfInputs;
    private final InputGate[] inputs;
    private final SelectedReadingBarrierHandler barrierHandler;
    private final Object lock;
    private StreamStatus firstStatus;
    private StreamStatus secondStatus;
    private final StatusWatermarkValve[] statusWatermarkValveOfInputs;
    private final int[] numChannelsOfInputs;
    private final BitSet[] eopChannelsOfInputs;
    private int[] currentChannelOfInputs;
    private final StreamStatusMaintainer streamStatusMaintainer;
    private final TwoInputStreamOperator<IN1, IN2, ?> streamOperator;
    private TwoInputSelection inputSelection;
    private boolean[] isEndInputs;
    private final WatermarkGauge[] watermarkGaugeOfInputs;
    private Counter numRecordsIn;
    private boolean isFinished;
    private boolean enableTracingMetrics;
    private int tracingMetricsInterval;
    private SumAndCount taskLatency;
    private SumAndCount waitInput;
    private IN1 reusedObject1;
    private IN2 reusedObject2;
    private int lastReadingInputIndex = 0;
    private long lastProcessedTime = -1;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor$ForwardingValveOutputHandler1.class */
    private class ForwardingValveOutputHandler1 implements StatusWatermarkValve.ValveOutputHandler {
        private final TwoInputStreamOperator<IN1, IN2, ?> operator;
        private final Object lock;

        private ForwardingValveOutputHandler1(TwoInputStreamOperator<IN1, IN2, ?> twoInputStreamOperator, Object obj) {
            this.operator = (TwoInputStreamOperator) Preconditions.checkNotNull(twoInputStreamOperator);
            this.lock = Preconditions.checkNotNull(obj);
        }

        @Override // org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.ValveOutputHandler
        public void handleWatermark(Watermark watermark) {
            try {
                synchronized (this.lock) {
                    StreamTwoInputProcessor.this.watermarkGaugeOfInputs[0].setCurrentWatermark(watermark.getTimestamp());
                    this.operator.processWatermark1(watermark);
                }
            } catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
            }
        }

        @Override // org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.ValveOutputHandler
        public void handleStreamStatus(StreamStatus streamStatus) {
            try {
                synchronized (this.lock) {
                    StreamTwoInputProcessor.this.firstStatus = streamStatus;
                    if (!streamStatus.equals(StreamTwoInputProcessor.this.streamStatusMaintainer.getStreamStatus())) {
                        if (streamStatus.isActive()) {
                            StreamTwoInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                        } else if (StreamTwoInputProcessor.this.secondStatus.isIdle()) {
                            StreamTwoInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                        }
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor$ForwardingValveOutputHandler2.class */
    private class ForwardingValveOutputHandler2 implements StatusWatermarkValve.ValveOutputHandler {
        private final TwoInputStreamOperator<IN1, IN2, ?> operator;
        private final Object lock;

        private ForwardingValveOutputHandler2(TwoInputStreamOperator<IN1, IN2, ?> twoInputStreamOperator, Object obj) {
            this.operator = (TwoInputStreamOperator) Preconditions.checkNotNull(twoInputStreamOperator);
            this.lock = Preconditions.checkNotNull(obj);
        }

        @Override // org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.ValveOutputHandler
        public void handleWatermark(Watermark watermark) {
            try {
                synchronized (this.lock) {
                    StreamTwoInputProcessor.this.watermarkGaugeOfInputs[1].setCurrentWatermark(watermark.getTimestamp());
                    this.operator.processWatermark2(watermark);
                }
            } catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
            }
        }

        @Override // org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.ValveOutputHandler
        public void handleStreamStatus(StreamStatus streamStatus) {
            try {
                synchronized (this.lock) {
                    StreamTwoInputProcessor.this.secondStatus = streamStatus;
                    if (!streamStatus.equals(StreamTwoInputProcessor.this.streamStatusMaintainer.getStreamStatus())) {
                        if (streamStatus.isActive()) {
                            StreamTwoInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                        } else if (StreamTwoInputProcessor.this.firstStatus.isIdle()) {
                            StreamTwoInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                        }
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
            }
        }
    }

    public StreamTwoInputProcessor(Collection<InputGate> collection, Collection<InputGate> collection2, TypeSerializer<IN1> typeSerializer, TypeSerializer<IN2> typeSerializer2, boolean z, TwoInputStreamTask<IN1, IN2, ?> twoInputStreamTask, CheckpointingMode checkpointingMode, Object obj, IOManager iOManager, Configuration configuration, StreamStatusMaintainer streamStatusMaintainer, TwoInputStreamOperator<IN1, IN2, ?> twoInputStreamOperator, TaskIOMetricGroup taskIOMetricGroup, WatermarkGauge watermarkGauge, WatermarkGauge watermarkGauge2, boolean z2, boolean z3, int i) throws IOException {
        this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(z, twoInputStreamTask, checkpointingMode, iOManager, configuration, (Collection<InputGate>[]) new Collection[]{collection, collection2});
        Preconditions.checkState(this.barrierHandler.getSubInputGateCount() == 2);
        int subInputGateCount = this.barrierHandler.getSubInputGateCount();
        this.inputs = new InputGate[subInputGateCount];
        for (int i2 = 0; i2 < subInputGateCount; i2++) {
            this.inputs[i2] = this.barrierHandler.getSubInputGate(i2);
        }
        this.lock = Preconditions.checkNotNull(obj);
        StreamElementSerializer streamElementSerializer = new StreamElementSerializer(typeSerializer);
        StreamElementSerializer streamElementSerializer2 = new StreamElementSerializer(typeSerializer2);
        DeserializationDelegate<StreamElement>[] deserializationDelegateArr = new DeserializationDelegate[2];
        deserializationDelegateArr[0] = z2 ? new ReusingDeserializationDelegate(streamElementSerializer) : new NonReusingDeserializationDelegate(streamElementSerializer);
        deserializationDelegateArr[1] = z2 ? new ReusingDeserializationDelegate(streamElementSerializer2) : new NonReusingDeserializationDelegate(streamElementSerializer2);
        this.deserializationDelegateOfInputs = deserializationDelegateArr;
        this.reusedObject1 = z2 ? (IN1) typeSerializer.createInstance() : null;
        this.reusedObject2 = z2 ? (IN2) typeSerializer2.createInstance() : null;
        this.barrierHandler.getNumberOfInputChannels();
        this.recordDeserializerOfChannels = new SerializerManagerUtility(configuration).createRecordDeserializers(this.barrierHandler.getAllInputChannels(), iOManager.getSpillingDirectoriesPaths());
        this.numChannelsOfInputs = new int[subInputGateCount];
        for (int i3 = 0; i3 < subInputGateCount; i3++) {
            this.numChannelsOfInputs[i3] = this.inputs[i3].getNumberOfInputChannels();
        }
        this.eopChannelsOfInputs = new BitSet[subInputGateCount];
        for (int i4 = 0; i4 < subInputGateCount; i4++) {
            this.eopChannelsOfInputs[i4] = new BitSet(this.numChannelsOfInputs[i4]);
        }
        this.firstStatus = StreamStatus.ACTIVE;
        this.secondStatus = StreamStatus.ACTIVE;
        this.streamStatusMaintainer = (StreamStatusMaintainer) Preconditions.checkNotNull(streamStatusMaintainer);
        this.streamOperator = (TwoInputStreamOperator) Preconditions.checkNotNull(twoInputStreamOperator);
        this.statusWatermarkValveOfInputs = new StatusWatermarkValve[]{new StatusWatermarkValve(this.numChannelsOfInputs[0], new ForwardingValveOutputHandler1(twoInputStreamOperator, obj)), new StatusWatermarkValve(this.numChannelsOfInputs[1], new ForwardingValveOutputHandler2(twoInputStreamOperator, obj))};
        this.watermarkGaugeOfInputs = new WatermarkGauge[]{watermarkGauge, watermarkGauge2};
        SelectedReadingBarrierHandler selectedReadingBarrierHandler = this.barrierHandler;
        selectedReadingBarrierHandler.getClass();
        taskIOMetricGroup.gauge("checkpointAlignmentTime", selectedReadingBarrierHandler::getAlignmentDurationNanos);
        this.currentRecordDeserializerOfInputs = new RecordDeserializer[subInputGateCount];
        this.currentChannelOfInputs = new int[]{-1, -1};
        this.inputSelection = twoInputStreamOperator.firstInputSelection();
        this.isEndInputs = new boolean[]{false, false};
        this.enableTracingMetrics = z3;
        this.tracingMetricsInterval = i;
    }

    public boolean processInput() throws Exception {
        int inputIndexFromInputSelection;
        BufferOrEvent nextNonBlocked;
        if (this.isFinished) {
            return false;
        }
        if (this.numRecordsIn == null) {
            try {
                this.numRecordsIn = this.streamOperator.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                this.numRecordsIn = new SimpleCounter();
            }
        }
        if (this.enableTracingMetrics) {
            if (this.taskLatency == null) {
                this.taskLatency = new SumAndCount("taskLatency", this.streamOperator.getMetricGroup());
            }
            if (this.waitInput == null) {
                this.waitInput = new SumAndCount("waitInput", this.streamOperator.getMetricGroup());
            }
        }
        while (true) {
            if (this.inputSelection == TwoInputSelection.ANY) {
                int i = (this.currentRecordDeserializerOfInputs[0] == null ? (char) 0 : (char) 1) | (this.currentRecordDeserializerOfInputs[1] == null ? (char) 0 : (char) 2);
                if (i == 3) {
                    inputIndexFromInputSelection = this.lastReadingInputIndex;
                } else if (i > 0) {
                    inputIndexFromInputSelection = i == 1 ? 0 : 1;
                } else {
                    inputIndexFromInputSelection = 0;
                }
            } else {
                inputIndexFromInputSelection = getInputIndexFromInputSelection(this.inputSelection);
            }
            if (this.currentRecordDeserializerOfInputs[inputIndexFromInputSelection] != null) {
                this.lastReadingInputIndex = inputIndexFromInputSelection;
                if (inputIndexFromInputSelection == 0 && this.reusedObject1 != null) {
                    this.deserializationDelegateOfInputs[inputIndexFromInputSelection].setInstance(new StreamRecord(this.reusedObject1));
                } else if (inputIndexFromInputSelection == 1 && this.reusedObject2 != null) {
                    this.deserializationDelegateOfInputs[inputIndexFromInputSelection].setInstance(new StreamRecord(this.reusedObject2));
                }
                RecordDeserializer.DeserializationResult nextRecord = this.currentRecordDeserializerOfInputs[inputIndexFromInputSelection].getNextRecord(this.deserializationDelegateOfInputs[inputIndexFromInputSelection]);
                if (nextRecord.isBufferConsumed()) {
                    this.currentRecordDeserializerOfInputs[inputIndexFromInputSelection].getCurrentBuffer().recycleBuffer();
                    this.currentRecordDeserializerOfInputs[inputIndexFromInputSelection] = null;
                }
                if (nextRecord.isFullRecord()) {
                    StreamElement streamElement = (StreamElement) this.deserializationDelegateOfInputs[inputIndexFromInputSelection].getInstance();
                    if (streamElement.isWatermark()) {
                        this.statusWatermarkValveOfInputs[inputIndexFromInputSelection].inputWatermark(streamElement.asWatermark(), this.currentChannelOfInputs[inputIndexFromInputSelection] - (inputIndexFromInputSelection == 0 ? 0 : this.numChannelsOfInputs[0]));
                    } else if (streamElement.isStreamStatus()) {
                        this.statusWatermarkValveOfInputs[inputIndexFromInputSelection].inputStreamStatus(streamElement.asStreamStatus(), this.currentChannelOfInputs[inputIndexFromInputSelection] - (inputIndexFromInputSelection == 0 ? 0 : this.numChannelsOfInputs[0]));
                    } else {
                        if (!streamElement.isLatencyMarker()) {
                            if (inputIndexFromInputSelection == 0) {
                                this.reusedObject1 = (IN1) ((StreamRecord) streamElement).getValue();
                                StreamRecord<?> asRecord = streamElement.asRecord();
                                if (!this.enableTracingMetrics || this.numRecordsIn.getCount() % this.tracingMetricsInterval != 0) {
                                    synchronized (this.lock) {
                                        this.numRecordsIn.inc();
                                        this.streamOperator.setKeyContextElement1(asRecord);
                                        this.inputSelection = this.streamOperator.processElement1(asRecord);
                                    }
                                    return true;
                                }
                                long nanoTime = System.nanoTime();
                                this.waitInput.update(nanoTime - this.lastProcessedTime);
                                synchronized (this.lock) {
                                    this.numRecordsIn.inc();
                                    this.streamOperator.setKeyContextElement1(asRecord);
                                    this.inputSelection = this.streamOperator.processElement1(asRecord);
                                    this.lastProcessedTime = System.nanoTime();
                                    this.taskLatency.update(this.lastProcessedTime - nanoTime);
                                }
                                return true;
                            }
                            this.reusedObject2 = (IN2) ((StreamRecord) streamElement).getValue();
                            StreamRecord<?> asRecord2 = streamElement.asRecord();
                            if (!this.enableTracingMetrics || this.numRecordsIn.getCount() % this.tracingMetricsInterval != 0) {
                                synchronized (this.lock) {
                                    this.numRecordsIn.inc();
                                    this.streamOperator.setKeyContextElement2(asRecord2);
                                    this.inputSelection = this.streamOperator.processElement2(asRecord2);
                                }
                                return true;
                            }
                            long nanoTime2 = System.nanoTime();
                            this.waitInput.update(nanoTime2 - this.lastProcessedTime);
                            synchronized (this.lock) {
                                this.numRecordsIn.inc();
                                this.streamOperator.setKeyContextElement2(asRecord2);
                                this.inputSelection = this.streamOperator.processElement2(asRecord2);
                                this.lastProcessedTime = System.nanoTime();
                                this.taskLatency.update(this.lastProcessedTime - nanoTime2);
                            }
                            return true;
                        }
                        synchronized (this.lock) {
                            if (inputIndexFromInputSelection == 0) {
                                this.streamOperator.processLatencyMarker1(streamElement.asLatencyMarker());
                            } else {
                                this.streamOperator.processLatencyMarker2(streamElement.asLatencyMarker());
                            }
                        }
                    }
                }
            }
            if (this.inputSelection == TwoInputSelection.ANY) {
                nextNonBlocked = this.barrierHandler.getNextNonBlocked();
            } else {
                if (this.isEndInputs[inputIndexFromInputSelection]) {
                    throw new IOException("Unexpected reading selection: " + this.inputSelection + ", because the input has finished.");
                }
                nextNonBlocked = this.barrierHandler.getNextNonBlocked(this.inputs[inputIndexFromInputSelection]);
            }
            if (nextNonBlocked == null) {
                this.isFinished = true;
                if (this.barrierHandler.isEmpty()) {
                    return false;
                }
                throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
            }
            if (nextNonBlocked.isBuffer()) {
                int channelIndex = nextNonBlocked.getChannelIndex();
                int inputIndexFromChannel = getInputIndexFromChannel(channelIndex);
                this.currentRecordDeserializerOfInputs[inputIndexFromChannel] = this.recordDeserializerOfChannels[channelIndex];
                this.currentRecordDeserializerOfInputs[inputIndexFromChannel].setNextBuffer(nextNonBlocked.getBuffer());
                this.currentChannelOfInputs[inputIndexFromChannel] = channelIndex;
                this.lastReadingInputIndex = inputIndexFromChannel;
            } else {
                AbstractEvent event = nextNonBlocked.getEvent();
                if (event.getClass() != EndOfPartitionEvent.class) {
                    throw new IOException("Unexpected event: " + event);
                }
                int channelIndex2 = nextNonBlocked.getChannelIndex();
                int inputIndexFromChannel2 = getInputIndexFromChannel(channelIndex2);
                this.eopChannelsOfInputs[inputIndexFromChannel2].set(channelIndex2);
                if (this.eopChannelsOfInputs[inputIndexFromChannel2].cardinality() == this.numChannelsOfInputs[inputIndexFromChannel2]) {
                    synchronized (this.lock) {
                        if (inputIndexFromChannel2 == 0) {
                            this.streamOperator.endInput1();
                        } else {
                            this.streamOperator.endInput2();
                        }
                        this.isEndInputs[inputIndexFromChannel2] = true;
                        this.inputSelection = TwoInputSelection.ANY;
                    }
                } else {
                    continue;
                }
            }
        }
    }

    private int getInputIndexFromChannel(int i) {
        return i < this.numChannelsOfInputs[0] ? 0 : 1;
    }

    private int getInputIndexFromInputSelection(TwoInputSelection twoInputSelection) {
        if (twoInputSelection == TwoInputSelection.FIRST) {
            return 0;
        }
        return twoInputSelection == TwoInputSelection.SECOND ? 1 : -1;
    }

    public void cleanup() throws IOException {
        for (RecordDeserializer<DeserializationDelegate<StreamElement>> recordDeserializer : this.recordDeserializerOfChannels) {
            Buffer currentBuffer = recordDeserializer.getCurrentBuffer();
            if (currentBuffer != null && !currentBuffer.isRecycled()) {
                currentBuffer.recycleBuffer();
            }
            recordDeserializer.clear();
        }
        this.barrierHandler.cleanup();
    }
}
