package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamSourceV2;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
import org.apache.flink.streaming.runtime.tasks.InputSelector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.XORShiftRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain.class */
public class OperatorChain implements StreamStatusMaintainer, InputSelector {
    private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
    private final AbstractInvokable containingTask;
    private final Map<Integer, AbstractStreamOperatorProxy<?>> allOperators;
    private final Deque<StreamOperator<?>> allOperatorsTopologySorted;
    private final RecordWriterOutput<?>[] streamOutputs;
    private final StreamTaskConfigSnapshot streamTaskConfig;
    private final Map<Integer, WatermarkGaugeExposingOutput<StreamRecord<?>>> chainEntryPoints = new HashMap();
    private final Map<Integer, StreamOperator> headOperators = new HashMap();
    private final Map<Integer, AbstractStreamOperatorProxy<?>> sourceHeadOperators = new HashMap();
    private StreamStatus streamStatus = StreamStatus.ACTIVE;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$AbstractStreamOperatorProxy.class */
    public static abstract class AbstractStreamOperatorProxy<OUT> implements StreamOperator<OUT> {
        private final StreamOperator<OUT> operator;
        protected final List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>> successors;

        AbstractStreamOperatorProxy(StreamOperator<OUT> streamOperator, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>> list) {
            this.operator = streamOperator;
            this.successors = list;
        }

        public StreamOperator<OUT> getOperator() {
            return this.operator;
        }

        public static AbstractStreamOperatorProxy proxy(StreamOperator streamOperator, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>> list) {
            if (streamOperator instanceof OneInputStreamOperator) {
                return new OneInputStreamOperatorProxy((OneInputStreamOperator) streamOperator, list);
            }
            if (streamOperator instanceof TwoInputStreamOperator) {
                return new TwoInputStreamOperatorProxy((TwoInputStreamOperator) streamOperator, list);
            }
            if (streamOperator instanceof StreamSource) {
                return new SourceStreamOperatorProxy((StreamSource) streamOperator, list);
            }
            if (streamOperator instanceof StreamSourceV2) {
                return new SourceV2StreamOperatorProxy(streamOperator, list);
            }
            throw new RuntimeException("Unknown input stream operator " + streamOperator);
        }

        public abstract void addInputEdge(StreamEdge streamEdge);

        public abstract void endInput(StreamEdge streamEdge) throws Exception;

        public boolean isSelected(@Nullable StreamEdge streamEdge, @Nullable Map<StreamEdge, Boolean> map) {
            Boolean bool;
            if (streamEdge != null && map != null && (bool = map.get(streamEdge)) != null) {
                return bool.booleanValue();
            }
            for (Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge> tuple2 : this.successors) {
                if (!((AbstractStreamOperatorProxy) tuple2.f0).isSelected((StreamEdge) tuple2.f1, map)) {
                    if (streamEdge == null || map == null) {
                        return false;
                    }
                    map.put(streamEdge, false);
                    return false;
                }
            }
            if (streamEdge == null || map == null) {
                return true;
            }
            map.put(streamEdge, true);
            return true;
        }

        public void endSuccessorsInput() throws Exception {
            for (Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge> tuple2 : this.successors) {
                ((AbstractStreamOperatorProxy) tuple2.f0).endInput((StreamEdge) tuple2.f1);
            }
        }

        @Override // org.apache.flink.streaming.api.operators.KeyContext
        public void setCurrentKey(Object obj) {
            this.operator.setCurrentKey(obj);
        }

        @Override // org.apache.flink.streaming.api.operators.KeyContext
        public Object getCurrentKey() {
            return this.operator.getCurrentKey();
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            this.operator.notifyCheckpointComplete(j);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
            this.operator.setup(streamTask, streamConfig, output);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void open() throws Exception {
            this.operator.open();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void close() throws Exception {
            this.operator.close();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void dispose() throws Exception {
            this.operator.dispose();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void prepareSnapshotPreBarrier(long j) throws Exception {
            this.operator.prepareSnapshotPreBarrier(j);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public OperatorSnapshotFutures snapshotState(long j, long j2, CheckpointOptions checkpointOptions, CheckpointStreamFactory checkpointStreamFactory) throws Exception {
            return this.operator.snapshotState(j, j2, checkpointOptions, checkpointStreamFactory);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void initializeState() throws Exception {
            this.operator.initializeState();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void setKeyContextElement1(StreamRecord<?> streamRecord) throws Exception {
            this.operator.setKeyContextElement1(streamRecord);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void setKeyContextElement2(StreamRecord<?> streamRecord) throws Exception {
            this.operator.setKeyContextElement2(streamRecord);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public ChainingStrategy getChainingStrategy() {
            return this.operator.getChainingStrategy();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void setChainingStrategy(ChainingStrategy chainingStrategy) {
            this.operator.setChainingStrategy(chainingStrategy);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public MetricGroup getMetricGroup() {
            return this.operator.getMetricGroup();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public OperatorID getOperatorID() {
            return this.operator.getOperatorID();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$BroadcastingOutputCollector.class */
    public static class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
        protected final Output<StreamRecord<T>>[] outputs;
        private final StreamStatusProvider streamStatusProvider;
        private final Random random = new XORShiftRandom();
        private final WatermarkGauge watermarkGauge = new WatermarkGauge();

        public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputArr, StreamStatusProvider streamStatusProvider) {
            this.outputs = outputArr;
            this.streamStatusProvider = streamStatusProvider;
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitWatermark(Watermark watermark) {
            this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            if (this.streamStatusProvider.getStreamStatus().isActive()) {
                for (Output<StreamRecord<T>> output : this.outputs) {
                    output.emitWatermark(watermark);
                }
            }
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            if (this.outputs.length <= 0) {
                return;
            }
            if (this.outputs.length == 1) {
                this.outputs[0].emitLatencyMarker(latencyMarker);
            } else {
                this.outputs[this.random.nextInt(this.outputs.length)].emitLatencyMarker(latencyMarker);
            }
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.WatermarkGaugeExposingOutput
        public Gauge<Long> getWatermarkGauge() {
            return this.watermarkGauge;
        }

        @Override // 
        public void collect(StreamRecord<T> streamRecord) {
            for (Output<StreamRecord<T>> output : this.outputs) {
                output.collect(streamRecord);
            }
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
            for (Output<StreamRecord<T>> output : this.outputs) {
                output.collect(outputTag, streamRecord);
            }
        }

        public void close() {
            for (Output<StreamRecord<T>> output : this.outputs) {
                output.close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$ChainingWithFirstInputOfTwoInputStreamOperatorOutput.class */
    public static class ChainingWithFirstInputOfTwoInputStreamOperatorOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
        protected final TwoInputStreamOperator<T, ?, ?> operator;
        protected final Counter numRecordsIn;
        protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
        protected final StreamStatusProvider streamStatusProvider;
        protected final OutputTag<T> outputTag;

        public ChainingWithFirstInputOfTwoInputStreamOperatorOutput(TwoInputStreamOperator<T, ?, ?> twoInputStreamOperator, StreamStatusProvider streamStatusProvider, StreamEdge streamEdge) {
            Counter simpleCounter;
            this.operator = twoInputStreamOperator;
            try {
                simpleCounter = twoInputStreamOperator.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                OperatorChain.LOG.warn("An exception occurred during the metrics setup.", e);
                simpleCounter = new SimpleCounter();
            }
            this.numRecordsIn = simpleCounter;
            this.streamStatusProvider = streamStatusProvider;
            this.outputTag = streamEdge.getOutputTag();
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void collect(StreamRecord<T> streamRecord) {
            if (this.outputTag != null) {
                return;
            }
            pushToOperator(streamRecord);
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
            if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
                return;
            }
            pushToOperator(streamRecord);
        }

        protected <X> void pushToOperator(StreamRecord<X> streamRecord) {
            try {
                this.numRecordsIn.inc();
                this.operator.setKeyContextElement1(streamRecord);
                this.operator.processElement1(streamRecord);
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitWatermark(Watermark watermark) {
            try {
                this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                if (this.streamStatusProvider.getStreamStatus().isActive()) {
                    this.operator.processWatermark1(watermark);
                }
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            try {
                this.operator.processLatencyMarker2(latencyMarker);
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        public void close() {
            try {
                this.operator.close();
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.WatermarkGaugeExposingOutput
        public Gauge<Long> getWatermarkGauge() {
            return this.watermarkGauge;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$ChainingWithOneInputStreamOperatorOutput.class */
    public static class ChainingWithOneInputStreamOperatorOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
        protected final OneInputStreamOperator<T, ?> operator;
        protected final Counter numRecordsIn;
        protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
        protected final StreamStatusProvider streamStatusProvider;
        protected final OutputTag<T> outputTag;

        public ChainingWithOneInputStreamOperatorOutput(OneInputStreamOperator<T, ?> oneInputStreamOperator, StreamStatusProvider streamStatusProvider, StreamEdge streamEdge) {
            Counter simpleCounter;
            this.operator = oneInputStreamOperator;
            try {
                simpleCounter = oneInputStreamOperator.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                OperatorChain.LOG.warn("An exception occurred during the metrics setup.", e);
                simpleCounter = new SimpleCounter();
            }
            this.numRecordsIn = simpleCounter;
            this.streamStatusProvider = streamStatusProvider;
            this.outputTag = streamEdge.getOutputTag();
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void collect(StreamRecord<T> streamRecord) {
            if (this.outputTag != null) {
                return;
            }
            pushToOperator(streamRecord);
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
            if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
                return;
            }
            pushToOperator(streamRecord);
        }

        protected <X> void pushToOperator(StreamRecord<X> streamRecord) {
            try {
                this.numRecordsIn.inc();
                this.operator.setKeyContextElement1(streamRecord);
                this.operator.processElement(streamRecord);
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitWatermark(Watermark watermark) {
            try {
                this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                if (this.streamStatusProvider.getStreamStatus().isActive()) {
                    this.operator.processWatermark(watermark);
                }
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            try {
                this.operator.processLatencyMarker(latencyMarker);
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        public void close() {
            try {
                this.operator.close();
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.WatermarkGaugeExposingOutput
        public Gauge<Long> getWatermarkGauge() {
            return this.watermarkGauge;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$ChainingWithSecondInputOfTwoInputStreamOperatorOutput.class */
    public static class ChainingWithSecondInputOfTwoInputStreamOperatorOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
        protected final TwoInputStreamOperator<?, T, ?> operator;
        protected final Counter numRecordsIn;
        protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
        protected final StreamStatusProvider streamStatusProvider;
        protected final OutputTag<T> outputTag;

        public ChainingWithSecondInputOfTwoInputStreamOperatorOutput(TwoInputStreamOperator<?, T, ?> twoInputStreamOperator, StreamStatusProvider streamStatusProvider, StreamEdge streamEdge) {
            Counter simpleCounter;
            this.operator = twoInputStreamOperator;
            try {
                simpleCounter = twoInputStreamOperator.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                OperatorChain.LOG.warn("An exception occurred during the metrics setup.", e);
                simpleCounter = new SimpleCounter();
            }
            this.numRecordsIn = simpleCounter;
            this.streamStatusProvider = streamStatusProvider;
            this.outputTag = streamEdge.getOutputTag();
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void collect(StreamRecord<T> streamRecord) {
            if (this.outputTag != null) {
                return;
            }
            pushToOperator(streamRecord);
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
            if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
                return;
            }
            pushToOperator(streamRecord);
        }

        protected <X> void pushToOperator(StreamRecord<X> streamRecord) {
            try {
                this.numRecordsIn.inc();
                this.operator.setKeyContextElement2(streamRecord);
                this.operator.processElement2(streamRecord);
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitWatermark(Watermark watermark) {
            try {
                this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                if (this.streamStatusProvider.getStreamStatus().isActive()) {
                    this.operator.processWatermark2(watermark);
                }
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            try {
                this.operator.processLatencyMarker2(latencyMarker);
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        public void close() {
            try {
                this.operator.close();
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.WatermarkGaugeExposingOutput
        public Gauge<Long> getWatermarkGauge() {
            return this.watermarkGauge;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$CopyingBroadcastingOutputCollector.class */
    public static final class CopyingBroadcastingOutputCollector<T> extends BroadcastingOutputCollector<T> {
        public CopyingBroadcastingOutputCollector(Output<StreamRecord<T>>[] outputArr, StreamStatusProvider streamStatusProvider) {
            super(outputArr, streamStatusProvider);
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.BroadcastingOutputCollector
        public void collect(StreamRecord<T> streamRecord) {
            for (int i = 0; i < this.outputs.length - 1; i++) {
                this.outputs[i].collect(streamRecord.copy(streamRecord.getValue()));
            }
            if (this.outputs.length > 0) {
                this.outputs[this.outputs.length - 1].collect(streamRecord);
            }
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.BroadcastingOutputCollector, org.apache.flink.streaming.api.operators.Output
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
            for (int i = 0; i < this.outputs.length - 1; i++) {
                this.outputs[i].collect(outputTag, streamRecord.copy(streamRecord.getValue()));
            }
            if (this.outputs.length > 0) {
                this.outputs[this.outputs.length - 1].collect(outputTag, streamRecord);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$CopyingChainingWithFirstInputOfTwoInputStreamOperatorOutput.class */
    public static final class CopyingChainingWithFirstInputOfTwoInputStreamOperatorOutput<T> extends ChainingWithFirstInputOfTwoInputStreamOperatorOutput<T> {
        private final TypeSerializer<T> serializer;

        public CopyingChainingWithFirstInputOfTwoInputStreamOperatorOutput(TwoInputStreamOperator<T, ?, ?> twoInputStreamOperator, TypeSerializer<T> typeSerializer, StreamEdge streamEdge, StreamStatusProvider streamStatusProvider) {
            super(twoInputStreamOperator, streamStatusProvider, streamEdge);
            this.serializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingWithFirstInputOfTwoInputStreamOperatorOutput
        protected final <X> void pushToOperator(StreamRecord<X> streamRecord) {
            try {
                this.numRecordsIn.inc();
                StreamRecord<?> copy = streamRecord.copy(this.serializer.copy(streamRecord.getValue()));
                this.operator.setKeyContextElement1(copy);
                this.operator.processElement1(copy);
            } catch (ClassCastException e) {
                if (this.outputTag == null) {
                    throw new ExceptionInChainedOperatorException(e);
                }
                throw new ExceptionInChainedOperatorException(new ClassCastException(String.format("%s. Failed to push OutputTag with id '%s' to operator. This can occur when multiple OutputTags with different types but identical names are being used.", e.getMessage(), this.outputTag.getId())));
            } catch (Exception e2) {
                throw new ExceptionInChainedOperatorException(e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$CopyingChainingWithOneInputStreamOperatorOutput.class */
    public static final class CopyingChainingWithOneInputStreamOperatorOutput<T> extends ChainingWithOneInputStreamOperatorOutput<T> {
        private final TypeSerializer<T> serializer;

        public CopyingChainingWithOneInputStreamOperatorOutput(OneInputStreamOperator<T, ?> oneInputStreamOperator, TypeSerializer<T> typeSerializer, StreamEdge streamEdge, StreamStatusProvider streamStatusProvider) {
            super(oneInputStreamOperator, streamStatusProvider, streamEdge);
            this.serializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingWithOneInputStreamOperatorOutput
        protected final <X> void pushToOperator(StreamRecord<X> streamRecord) {
            try {
                this.numRecordsIn.inc();
                StreamRecord<?> copy = streamRecord.copy(this.serializer.copy(streamRecord.getValue()));
                this.operator.setKeyContextElement1(copy);
                this.operator.processElement(copy);
            } catch (ClassCastException e) {
                if (this.outputTag == null) {
                    throw new ExceptionInChainedOperatorException(e);
                }
                throw new ExceptionInChainedOperatorException(new ClassCastException(String.format("%s. Failed to push OutputTag with id '%s' to operator. This can occur when multiple OutputTags with different types but identical names are being used.", e.getMessage(), this.outputTag.getId())));
            } catch (Exception e2) {
                throw new ExceptionInChainedOperatorException(e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$CopyingChainingWithSecondInputOfTwoInputStreamOperatorOutput.class */
    public static final class CopyingChainingWithSecondInputOfTwoInputStreamOperatorOutput<T> extends ChainingWithSecondInputOfTwoInputStreamOperatorOutput<T> {
        private final TypeSerializer<T> serializer;

        public CopyingChainingWithSecondInputOfTwoInputStreamOperatorOutput(TwoInputStreamOperator<?, T, ?> twoInputStreamOperator, TypeSerializer<T> typeSerializer, StreamEdge streamEdge, StreamStatusProvider streamStatusProvider) {
            super(twoInputStreamOperator, streamStatusProvider, streamEdge);
            this.serializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingWithSecondInputOfTwoInputStreamOperatorOutput
        protected final <X> void pushToOperator(StreamRecord<X> streamRecord) {
            try {
                this.numRecordsIn.inc();
                StreamRecord<?> copy = streamRecord.copy(this.serializer.copy(streamRecord.getValue()));
                this.operator.setKeyContextElement2(copy);
                this.operator.processElement2(copy);
            } catch (ClassCastException e) {
                if (this.outputTag == null) {
                    throw new ExceptionInChainedOperatorException(e);
                }
                throw new ExceptionInChainedOperatorException(new ClassCastException(String.format("%s. Failed to push OutputTag with id '%s' to operator. This can occur when multiple OutputTags with different types but identical names are being used.", e.getMessage(), this.outputTag.getId())));
            } catch (Exception e2) {
                throw new ExceptionInChainedOperatorException(e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$OneInputStreamOperatorProxy.class */
    public static class OneInputStreamOperatorProxy<IN, OUT> extends AbstractStreamOperatorProxy<OUT> implements OneInputStreamOperator<IN, OUT> {
        private final OneInputStreamOperator<IN, OUT> operator;
        private volatile int unfinishedInputEdges;

        OneInputStreamOperatorProxy(OneInputStreamOperator<IN, OUT> oneInputStreamOperator, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>> list) {
            super(oneInputStreamOperator, list);
            this.unfinishedInputEdges = 0;
            this.operator = oneInputStreamOperator;
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.AbstractStreamOperatorProxy
        public void addInputEdge(StreamEdge streamEdge) {
            this.unfinishedInputEdges++;
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.AbstractStreamOperatorProxy
        public void endInput(StreamEdge streamEdge) throws Exception {
            endInput();
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void processElement(StreamRecord<IN> streamRecord) throws Exception {
            this.operator.processElement(streamRecord);
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void processWatermark(Watermark watermark) throws Exception {
            this.operator.processWatermark(watermark);
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            this.operator.processLatencyMarker(latencyMarker);
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void endInput() throws Exception {
            int i = this.unfinishedInputEdges - 1;
            this.unfinishedInputEdges = i;
            if (i == 0) {
                this.operator.endInput();
                endSuccessorsInput();
            } else if (this.unfinishedInputEdges < 0) {
                throw new RuntimeException("This input side is finished already, unexpected endInput invoked");
            }
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.AbstractStreamOperatorProxy, org.apache.flink.streaming.api.operators.StreamOperator
        public void prepareSnapshotPreBarrier(long j) throws Exception {
            this.operator.prepareSnapshotPreBarrier(j);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public boolean requireState() {
            return this.operator.requireState();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$SourceStreamOperatorProxy.class */
    public static class SourceStreamOperatorProxy<OUT> extends AbstractStreamOperatorProxy<OUT> implements OneInputStreamOperator<OUT, OUT> {
        private final StreamSource<OUT, ?> operator;

        SourceStreamOperatorProxy(StreamSource<OUT, ?> streamSource, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>> list) {
            super(streamSource, list);
            this.operator = streamSource;
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.AbstractStreamOperatorProxy
        public void addInputEdge(StreamEdge streamEdge) {
            throw new UnsupportedOperationException("There should not be a input edge in source operator");
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.AbstractStreamOperatorProxy
        public void endInput(StreamEdge streamEdge) throws Exception {
            endInput();
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void processElement(StreamRecord<OUT> streamRecord) throws Exception {
            this.operator.getOutput().collect(streamRecord);
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void processWatermark(Watermark watermark) throws Exception {
            this.operator.getOutput().emitWatermark(watermark);
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            this.operator.getOutput().emitLatencyMarker(latencyMarker);
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void endInput() throws Exception {
            endSuccessorsInput();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public boolean requireState() {
            return this.operator.requireState();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$SourceV2StreamOperatorProxy.class */
    public static class SourceV2StreamOperatorProxy<OUT> extends AbstractStreamOperatorProxy<OUT> implements OneInputStreamOperator<OUT, OUT> {
        SourceV2StreamOperatorProxy(StreamOperator<OUT> streamOperator, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>> list) {
            super(streamOperator, list);
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.AbstractStreamOperatorProxy
        public void addInputEdge(StreamEdge streamEdge) {
            throw new UnsupportedOperationException("There should not be a input edge in source operator");
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.AbstractStreamOperatorProxy
        public void endInput(StreamEdge streamEdge) throws Exception {
            endSuccessorsInput();
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void processElement(StreamRecord<OUT> streamRecord) throws Exception {
            throw new UnsupportedOperationException("Should not come to here");
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void processWatermark(Watermark watermark) throws Exception {
            throw new UnsupportedOperationException("Should not come to here");
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            throw new UnsupportedOperationException("Should not come to here");
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void endInput() throws Exception {
            endSuccessorsInput();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public boolean requireState() {
            return getOperator().requireState();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$TwoInputStreamOperatorProxy.class */
    public static class TwoInputStreamOperatorProxy<IN1, IN2, OUT> extends AbstractStreamOperatorProxy<OUT> implements TwoInputStreamOperator<IN1, IN2, OUT> {
        private final TwoInputStreamOperator<IN1, IN2, OUT> operator;
        private volatile int unfinishedInputEdges1;
        private volatile int unfinishedInputEdges2;
        private TwoInputSelection lastSelection;
        private InputSelector.SelectionChangedListener listener;

        TwoInputStreamOperatorProxy(TwoInputStreamOperator<IN1, IN2, OUT> twoInputStreamOperator, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>> list) {
            super(twoInputStreamOperator, list);
            this.unfinishedInputEdges1 = 0;
            this.unfinishedInputEdges2 = 0;
            this.lastSelection = null;
            this.operator = twoInputStreamOperator;
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public TwoInputSelection firstInputSelection() {
            this.lastSelection = this.operator.firstInputSelection();
            return this.lastSelection;
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public TwoInputSelection processElement1(StreamRecord<IN1> streamRecord) throws Exception {
            TwoInputSelection processElement1 = this.operator.processElement1(streamRecord);
            if (processElement1 != this.lastSelection) {
                this.lastSelection = processElement1;
                if (this.listener != null) {
                    this.listener.notifySelectionChanged();
                }
            }
            return this.lastSelection;
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public TwoInputSelection processElement2(StreamRecord<IN2> streamRecord) throws Exception {
            TwoInputSelection processElement2 = this.operator.processElement2(streamRecord);
            if (processElement2 != this.lastSelection) {
                this.lastSelection = processElement2;
                if (this.listener != null) {
                    this.listener.notifySelectionChanged();
                }
            }
            return this.lastSelection;
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public void processWatermark1(Watermark watermark) throws Exception {
            this.operator.processWatermark1(watermark);
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public void processWatermark2(Watermark watermark) throws Exception {
            this.operator.processWatermark2(watermark);
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception {
            this.operator.processLatencyMarker1(latencyMarker);
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception {
            this.operator.processLatencyMarker2(latencyMarker);
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public void endInput1() throws Exception {
            int i = this.unfinishedInputEdges1 - 1;
            this.unfinishedInputEdges1 = i;
            if (i == 0) {
                this.operator.endInput1();
                if (this.lastSelection != TwoInputSelection.SECOND) {
                    this.lastSelection = TwoInputSelection.SECOND;
                    if (this.listener != null) {
                        this.listener.notifySelectionChanged();
                    }
                }
            } else if (this.unfinishedInputEdges1 < 0) {
                throw new RuntimeException("This input side is finished already, unexpected endInput invoked");
            }
            if (this.unfinishedInputEdges1 == 0 && this.unfinishedInputEdges2 == 0) {
                endSuccessorsInput();
            }
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public void endInput2() throws Exception {
            int i = this.unfinishedInputEdges2 - 1;
            this.unfinishedInputEdges2 = i;
            if (i == 0) {
                this.operator.endInput2();
                if (this.lastSelection != TwoInputSelection.FIRST) {
                    this.lastSelection = TwoInputSelection.FIRST;
                    if (this.listener != null) {
                        this.listener.notifySelectionChanged();
                    }
                }
            } else if (this.unfinishedInputEdges2 < 0) {
                throw new RuntimeException("This input side is finished already, unexpected endInput invoked");
            }
            if (this.unfinishedInputEdges1 == 0 && this.unfinishedInputEdges2 == 0) {
                endSuccessorsInput();
            }
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.AbstractStreamOperatorProxy
        public void addInputEdge(StreamEdge streamEdge) {
            if (streamEdge.getTypeNumber() == 1) {
                this.unfinishedInputEdges1++;
            } else {
                if (streamEdge.getTypeNumber() != 2) {
                    throw new RuntimeException("Unknown stream edge type number " + streamEdge.getTypeNumber());
                }
                this.unfinishedInputEdges2++;
            }
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.AbstractStreamOperatorProxy
        public void endInput(StreamEdge streamEdge) throws Exception {
            if (streamEdge.getTypeNumber() == 1) {
                endInput1();
            } else {
                if (streamEdge.getTypeNumber() != 2) {
                    throw new RuntimeException("Unknown stream edge type number " + streamEdge.getTypeNumber());
                }
                endInput2();
            }
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.AbstractStreamOperatorProxy
        public boolean isSelected(StreamEdge streamEdge, Map<StreamEdge, Boolean> map) {
            Boolean bool;
            Preconditions.checkNotNull(streamEdge);
            if (this.lastSelection == null) {
                this.lastSelection = firstInputSelection();
            }
            if ((streamEdge.getTypeNumber() == 1 && this.lastSelection == TwoInputSelection.SECOND) || (streamEdge.getTypeNumber() == 2 && this.lastSelection == TwoInputSelection.FIRST)) {
                if (map == null) {
                    return false;
                }
                map.put(streamEdge, false);
                return false;
            }
            if (map != null && (bool = map.get(streamEdge)) != null) {
                return bool.booleanValue();
            }
            Boolean valueOf = Boolean.valueOf(super.isSelected(streamEdge, map));
            if (map != null) {
                map.put(streamEdge, valueOf);
            }
            return valueOf.booleanValue();
        }

        public void registerSelectionChangedListener(InputSelector.SelectionChangedListener selectionChangedListener) {
            this.listener = selectionChangedListener;
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.AbstractStreamOperatorProxy, org.apache.flink.streaming.api.operators.StreamOperator
        public void prepareSnapshotPreBarrier(long j) throws Exception {
            this.operator.prepareSnapshotPreBarrier(j);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public boolean requireState() {
            return this.operator.requireState();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$WatermarkGaugeExposingOutput.class */
    public interface WatermarkGaugeExposingOutput<T> extends Output<T> {
        Gauge<Long> getWatermarkGauge();
    }

    public OperatorChain(StreamTask streamTask, List<StreamRecordWriter<StreamRecord<?>>> list) {
        this.containingTask = streamTask;
        ClassLoader userCodeClassLoader = streamTask.getUserCodeClassLoader();
        this.streamTaskConfig = streamTask.getStreamTaskConfig();
        Map<Integer, StreamConfig> chainedNodeConfigs = this.streamTaskConfig.getChainedNodeConfigs();
        List<StreamEdge> outStreamEdgesOfChain = this.streamTaskConfig.getOutStreamEdgesOfChain();
        Map<StreamEdge, RecordWriterOutput<?>> hashMap = new HashMap<>(outStreamEdgesOfChain.size());
        this.streamOutputs = new RecordWriterOutput[outStreamEdgesOfChain.size()];
        boolean z = false;
        for (int i = 0; i < outStreamEdgesOfChain.size(); i++) {
            try {
                StreamEdge streamEdge = outStreamEdgesOfChain.get(i);
                RecordWriterOutput<?> createStreamOutput = createStreamOutput(list.get(i), streamEdge, chainedNodeConfigs.get(Integer.valueOf(streamEdge.getSourceId())), streamTask.getEnvironment(), i);
                this.streamOutputs[i] = createStreamOutput;
                hashMap.put(streamEdge, createStreamOutput);
            } catch (Throwable th) {
                if (!z) {
                    for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
                        if (recordWriterOutput != null) {
                            recordWriterOutput.close();
                        }
                    }
                }
                throw th;
            }
        }
        Map<Integer, AbstractStreamOperatorProxy<?>> hashMap2 = new HashMap<>(chainedNodeConfigs.size());
        List<Integer> chainedHeadNodeIds = this.streamTaskConfig.getChainedHeadNodeIds();
        Preconditions.checkNotNull(chainedHeadNodeIds);
        if (!chainedNodeConfigs.isEmpty()) {
            Iterator<Integer> it = chainedHeadNodeIds.iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                if (hashMap2.containsKey(Integer.valueOf(intValue))) {
                    this.headOperators.put(Integer.valueOf(intValue), hashMap2.get(Integer.valueOf(intValue)).getOperator());
                } else {
                    Tuple2<WatermarkGaugeExposingOutput, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>>> createOutputCollector = createOutputCollector(streamTask, chainedNodeConfigs.get(Integer.valueOf(intValue)), chainedNodeConfigs, userCodeClassLoader, hashMap, hashMap2);
                    WatermarkGaugeExposingOutput<StreamRecord<?>> watermarkGaugeExposingOutput = (WatermarkGaugeExposingOutput) createOutputCollector.f0;
                    List list2 = (List) createOutputCollector.f1;
                    StreamOperator streamOperator = chainedNodeConfigs.get(Integer.valueOf(intValue)).getStreamOperator(userCodeClassLoader);
                    if (streamOperator != null) {
                        AbstractStreamOperatorProxy<?> proxy = AbstractStreamOperatorProxy.proxy(streamOperator, list2);
                        proxy.setup(streamTask, chainedNodeConfigs.get(Integer.valueOf(intValue)), watermarkGaugeExposingOutput);
                        proxy.getMetricGroup().gauge("currentOutputWatermark", watermarkGaugeExposingOutput.getWatermarkGauge());
                        this.headOperators.put(Integer.valueOf(intValue), streamOperator);
                        if ((streamOperator instanceof StreamSource) || (streamOperator instanceof StreamSourceV2)) {
                            this.sourceHeadOperators.put(Integer.valueOf(intValue), proxy);
                        }
                        hashMap2.put(Integer.valueOf(intValue), proxy);
                        this.chainEntryPoints.put(Integer.valueOf(intValue), watermarkGaugeExposingOutput);
                    }
                }
            }
        }
        if (!hashMap2.isEmpty()) {
            for (StreamEdge streamEdge2 : this.streamTaskConfig.getInStreamEdgesOfChain()) {
                hashMap2.get(Integer.valueOf(streamEdge2.getTargetId())).addInputEdge(streamEdge2);
            }
        }
        HashMap hashMap3 = new HashMap();
        for (Map.Entry<Integer, AbstractStreamOperatorProxy<?>> entry : hashMap2.entrySet()) {
            hashMap3.put(entry.getKey(), entry.getValue().getOperator());
        }
        this.allOperatorsTopologySorted = getTopologySortedOperators(chainedHeadNodeIds, userCodeClassLoader, hashMap3, chainedNodeConfigs);
        this.allOperators = hashMap2;
        z = true;
        if (1 == 0) {
            for (RecordWriterOutput<?> recordWriterOutput2 : this.streamOutputs) {
                if (recordWriterOutput2 != null) {
                    recordWriterOutput2.close();
                }
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider
    public StreamStatus getStreamStatus() {
        return this.streamStatus;
    }

    @Override // org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer
    public void toggleStreamStatus(StreamStatus streamStatus) {
        if (streamStatus.equals(this.streamStatus)) {
            return;
        }
        this.streamStatus = streamStatus;
        for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
            recordWriterOutput.emitStreamStatus(streamStatus);
        }
    }

    public void broadcastCheckpointBarrier(long j, long j2, CheckpointOptions checkpointOptions) throws IOException {
        try {
            CheckpointBarrier checkpointBarrier = new CheckpointBarrier(j, j2, checkpointOptions);
            for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
                recordWriterOutput.broadcastEvent(checkpointBarrier);
            }
        } catch (InterruptedException e) {
            throw new IOException("Interrupted while broadcasting checkpoint barrier");
        }
    }

    public void broadcastCheckpointCancelMarker(long j) throws IOException {
        try {
            CancelCheckpointMarker cancelCheckpointMarker = new CancelCheckpointMarker(j);
            for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
                recordWriterOutput.broadcastEvent(cancelCheckpointMarker);
            }
        } catch (InterruptedException e) {
            throw new IOException("Interrupted while broadcasting checkpoint cancellation");
        }
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        Iterator<StreamOperator<?>> it = getAllOperatorsTopologySorted().iterator();
        while (it.hasNext()) {
            it.next().prepareSnapshotPreBarrier(j);
        }
    }

    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.streamOutputs;
    }

    public Deque<StreamOperator<?>> getAllOperatorsTopologySorted() {
        return this.allOperatorsTopologySorted;
    }

    public Output<StreamRecord<?>>[] getChainEntryPoints() {
        return (Output[]) this.chainEntryPoints.values().toArray(new Output[0]);
    }

    public void flushOutputs() throws IOException {
        for (RecordWriterOutput<?> recordWriterOutput : getStreamOutputs()) {
            recordWriterOutput.flush();
        }
    }

    public void releaseOutputs() {
        for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
            recordWriterOutput.close();
        }
    }

    public StreamOperator[] getHeadOperators() {
        return (StreamOperator[]) this.headOperators.values().toArray(new StreamOperator[0]);
    }

    public StreamOperator getHeadOperator(int i) {
        return this.headOperators.get(Integer.valueOf(i));
    }

    public AbstractStreamOperatorProxy getOperatorProxy(int i) {
        return this.allOperators.get(Integer.valueOf(i));
    }

    public int getChainLength() {
        if (this.allOperators == null) {
            return 0;
        }
        return this.allOperators.size();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.InputSelector
    public void registerSelectionChangedListener(InputSelector.SelectionChangedListener selectionChangedListener) {
        for (AbstractStreamOperatorProxy<?> abstractStreamOperatorProxy : this.allOperators.values()) {
            if (abstractStreamOperatorProxy instanceof TwoInputStreamOperatorProxy) {
                ((TwoInputStreamOperatorProxy) abstractStreamOperatorProxy).registerSelectionChangedListener(selectionChangedListener);
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.InputSelector
    public List<InputSelector.InputSelection> getNextSelectedInputs() {
        HashMap hashMap = this.allOperators.size() > 16 ? new HashMap(this.allOperators.size()) : null;
        ArrayList arrayList = new ArrayList();
        for (StreamEdge streamEdge : this.streamTaskConfig.getInStreamEdgesOfChain()) {
            if (this.allOperators.get(Integer.valueOf(streamEdge.getTargetId())).isSelected(streamEdge, hashMap)) {
                arrayList.add(InputSelector.EdgeInputSelection.create(streamEdge));
            }
        }
        for (Map.Entry<Integer, AbstractStreamOperatorProxy<?>> entry : this.sourceHeadOperators.entrySet()) {
            if (entry.getValue().isSelected(null, hashMap)) {
                arrayList.add(InputSelector.SourceInputSelection.create(entry.getKey().intValue()));
            }
        }
        return arrayList;
    }

    private <T> Tuple2<WatermarkGaugeExposingOutput, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>>> createOutputCollector(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Map<Integer, StreamConfig> map, ClassLoader classLoader, Map<StreamEdge, RecordWriterOutput<?>> map2, Map<Integer, AbstractStreamOperatorProxy<?>> map3) {
        ArrayList arrayList = new ArrayList(4);
        ArrayList arrayList2 = new ArrayList(4);
        for (StreamEdge streamEdge : streamConfig.getNonChainedOutputs(classLoader)) {
            arrayList.add(new Tuple2(map2.get(streamEdge), streamEdge));
        }
        for (StreamEdge streamEdge2 : streamConfig.getChainedOutputs(classLoader)) {
            int targetId = streamEdge2.getTargetId();
            arrayList.add(new Tuple2(createChainedOperator(streamTask, map.get(Integer.valueOf(targetId)), map, classLoader, map2, map3, streamEdge2), streamEdge2));
            arrayList2.add(Tuple2.of(map3.get(Integer.valueOf(targetId)), streamEdge2));
        }
        List<OutputSelector<T>> outputSelectors = streamConfig.getOutputSelectors(classLoader);
        if (outputSelectors != null && !outputSelectors.isEmpty()) {
            return streamTask.getExecutionConfig().isObjectReuseEnabled() ? Tuple2.of(new CopyingDirectedOutput(outputSelectors, arrayList), arrayList2) : Tuple2.of(new DirectedOutput(outputSelectors, arrayList), arrayList2);
        }
        if (arrayList.size() == 1) {
            return Tuple2.of(((Tuple2) arrayList.get(0)).f0, arrayList2);
        }
        Output[] outputArr = new Output[arrayList.size()];
        for (int i = 0; i < arrayList.size(); i++) {
            outputArr[i] = (Output) ((Tuple2) arrayList.get(i)).f0;
        }
        return streamTask.getExecutionConfig().isObjectReuseEnabled() ? Tuple2.of(new CopyingBroadcastingOutputCollector(outputArr, this), arrayList2) : Tuple2.of(new BroadcastingOutputCollector(outputArr, this), arrayList2);
    }

    private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Map<Integer, StreamConfig> map, ClassLoader classLoader, Map<StreamEdge, RecordWriterOutput<?>> map2, Map<Integer, AbstractStreamOperatorProxy<?>> map3, StreamEdge streamEdge) {
        WatermarkGaugeExposingOutput copyingChainingWithSecondInputOfTwoInputStreamOperatorOutput;
        AbstractStreamOperatorProxy<?> abstractStreamOperatorProxy = map3.get(Integer.valueOf(streamEdge.getTargetId()));
        AbstractStreamOperatorProxy<?> abstractStreamOperatorProxy2 = abstractStreamOperatorProxy;
        if (abstractStreamOperatorProxy == null) {
            Tuple2<WatermarkGaugeExposingOutput, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>>> createOutputCollector = createOutputCollector(streamTask, streamConfig, map, classLoader, map2, map3);
            WatermarkGaugeExposingOutput watermarkGaugeExposingOutput = (WatermarkGaugeExposingOutput) createOutputCollector.f0;
            AbstractStreamOperatorProxy<?> proxy = AbstractStreamOperatorProxy.proxy(streamConfig.getStreamOperator(classLoader), (List) createOutputCollector.f1);
            proxy.setup(streamTask, streamConfig, watermarkGaugeExposingOutput);
            map3.put(Integer.valueOf(streamEdge.getTargetId()), proxy);
            MetricGroup metricGroup = proxy.getMetricGroup();
            Gauge<Long> watermarkGauge = watermarkGaugeExposingOutput.getWatermarkGauge();
            watermarkGauge.getClass();
            metricGroup.gauge("currentOutputWatermark", watermarkGauge::getValue);
            abstractStreamOperatorProxy2 = proxy;
        }
        abstractStreamOperatorProxy2.addInputEdge(streamEdge);
        if (!streamTask.getExecutionConfig().isObjectReuseEnabled()) {
            TypeSerializer typeSerializerIn2 = streamEdge.getTypeNumber() == 2 ? streamConfig.getTypeSerializerIn2(classLoader) : streamConfig.getTypeSerializerIn1(classLoader);
            if (abstractStreamOperatorProxy2 instanceof OneInputStreamOperator) {
                copyingChainingWithSecondInputOfTwoInputStreamOperatorOutput = new CopyingChainingWithOneInputStreamOperatorOutput((OneInputStreamOperator) abstractStreamOperatorProxy2.getOperator(), typeSerializerIn2, streamEdge, this);
            } else {
                if (!(abstractStreamOperatorProxy2 instanceof TwoInputStreamOperator)) {
                    throw new RuntimeException("Unexpected operator type " + abstractStreamOperatorProxy2.getOperator());
                }
                if (streamEdge.getTypeNumber() == 1) {
                    copyingChainingWithSecondInputOfTwoInputStreamOperatorOutput = new CopyingChainingWithFirstInputOfTwoInputStreamOperatorOutput((TwoInputStreamOperator) abstractStreamOperatorProxy2, typeSerializerIn2, streamEdge, this);
                } else {
                    if (streamEdge.getTypeNumber() != 2) {
                        throw new RuntimeException("Unexpected type number of edge " + streamEdge);
                    }
                    copyingChainingWithSecondInputOfTwoInputStreamOperatorOutput = new CopyingChainingWithSecondInputOfTwoInputStreamOperatorOutput((TwoInputStreamOperator) abstractStreamOperatorProxy2, typeSerializerIn2, streamEdge, this);
                }
            }
        } else if (abstractStreamOperatorProxy2 instanceof OneInputStreamOperator) {
            copyingChainingWithSecondInputOfTwoInputStreamOperatorOutput = new ChainingWithOneInputStreamOperatorOutput((OneInputStreamOperator) abstractStreamOperatorProxy2.getOperator(), this, streamEdge);
        } else {
            if (!(abstractStreamOperatorProxy2 instanceof TwoInputStreamOperator)) {
                throw new RuntimeException("Unexpected operator type " + abstractStreamOperatorProxy2.getOperator());
            }
            if (streamEdge.getTypeNumber() == 1) {
                copyingChainingWithSecondInputOfTwoInputStreamOperatorOutput = new ChainingWithFirstInputOfTwoInputStreamOperatorOutput((TwoInputStreamOperator) abstractStreamOperatorProxy2, this, streamEdge);
            } else {
                if (streamEdge.getTypeNumber() != 2) {
                    throw new RuntimeException("Unexpected type number of edge " + streamEdge);
                }
                copyingChainingWithSecondInputOfTwoInputStreamOperatorOutput = new ChainingWithSecondInputOfTwoInputStreamOperatorOutput((TwoInputStreamOperator) abstractStreamOperatorProxy2, this, streamEdge);
            }
        }
        MetricGroup metricGroup2 = abstractStreamOperatorProxy2.getMetricGroup();
        Gauge<Long> watermarkGauge2 = copyingChainingWithSecondInputOfTwoInputStreamOperatorOutput.getWatermarkGauge();
        watermarkGauge2.getClass();
        metricGroup2.gauge("currentInputWatermark", watermarkGauge2::getValue);
        return copyingChainingWithSecondInputOfTwoInputStreamOperatorOutput;
    }

    private RecordWriterOutput<?> createStreamOutput(StreamRecordWriter<StreamRecord<?>> streamRecordWriter, StreamEdge streamEdge, StreamConfig streamConfig, Environment environment, int i) {
        OutputTag outputTag = streamEdge.getOutputTag();
        environment.getWriter(i).setTypeSerializer(new StreamElementSerializer(streamEdge.getOutputTag() != null ? streamConfig.getTypeSerializerSideOut(streamEdge.getOutputTag(), environment.getUserClassLoader()) : streamConfig.getTypeSerializerOut(environment.getUserClassLoader())));
        environment.getWriter(i).setParentTask(this.containingTask);
        return new RecordWriterOutput<>(streamRecordWriter, outputTag, this);
    }

    static Deque<StreamOperator<?>> getTopologySortedOperators(List<Integer> list, ClassLoader classLoader, Map<Integer, ? extends StreamOperator<?>> map, Map<Integer, StreamConfig> map2) {
        if (map == null || map.isEmpty()) {
            return new ArrayDeque();
        }
        ArrayDeque arrayDeque = new ArrayDeque();
        HashMap hashMap = new HashMap();
        Iterator<StreamConfig> it = map2.values().iterator();
        while (it.hasNext()) {
            for (StreamEdge streamEdge : it.next().getChainedOutputs(classLoader)) {
                hashMap.put(Integer.valueOf(streamEdge.getTargetId()), Integer.valueOf(((Integer) hashMap.getOrDefault(Integer.valueOf(streamEdge.getTargetId()), 0)).intValue() + 1));
            }
        }
        Iterator<Integer> it2 = list.iterator();
        while (it2.hasNext()) {
            int intValue = it2.next().intValue();
            if (((Integer) hashMap.getOrDefault(Integer.valueOf(intValue), 0)).intValue() == 0) {
                arrayDeque.add(Integer.valueOf(intValue));
            }
        }
        Preconditions.checkState(!arrayDeque.isEmpty());
        ArrayDeque arrayDeque2 = new ArrayDeque();
        while (!arrayDeque.isEmpty()) {
            int intValue2 = ((Integer) arrayDeque.poll()).intValue();
            arrayDeque2.add(map.get(Integer.valueOf(intValue2)));
            Iterator<StreamEdge> it3 = map2.get(Integer.valueOf(intValue2)).getChainedOutputs(classLoader).iterator();
            while (it3.hasNext()) {
                int targetId = it3.next().getTargetId();
                int intValue3 = ((Integer) hashMap.get(Integer.valueOf(targetId))).intValue() - 1;
                hashMap.put(Integer.valueOf(targetId), Integer.valueOf(intValue3));
                if (intValue3 == 0) {
                    arrayDeque.add(Integer.valueOf(targetId));
                }
            }
        }
        return arrayDeque2;
    }
}
