/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.operators.coordination.AcknowledgeCheckpointEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.CountingOutput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector;
import org.apache.flink.streaming.runtime.tasks.ChainingOutput;
import org.apache.flink.streaming.runtime.tasks.CopyingBroadcastingOutputCollector;
import org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput;
import org.apache.flink.streaming.runtime.tasks.FinishedOnRestoreInput;
import org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.StreamTaskFinishedOnRestoreSourceInput;
import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>>
implements BoundedMultiInput,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
    protected final RecordWriterOutput<?>[] streamOutputs;
    protected final WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput;
    @Nullable
    protected final StreamOperatorWrapper<OUT, OP> mainOperatorWrapper;
    @Nullable
    protected final StreamOperatorWrapper<?, ?> firstOperatorWrapper;
    @Nullable
    protected final StreamOperatorWrapper<?, ?> tailOperatorWrapper;
    protected final Map<StreamConfig.SourceInputConfig, ChainedSource> chainedSources;
    protected final int numOperators;
    protected final OperatorEventDispatcherImpl operatorEventDispatcher;
    protected final Closer closer = Closer.create();
    @Nullable
    protected final FinishedOnRestoreInput finishedOnRestoreInput;
    protected boolean isClosed;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public OperatorChain(StreamTask<OUT, OP> containingTask, RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {
        this.operatorEventDispatcher = new OperatorEventDispatcherImpl(containingTask.getEnvironment().getUserCodeClassLoader().asClassLoader(), containingTask.getEnvironment().getOperatorCoordinatorEventGateway());
        ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
        StreamConfig configuration = containingTask.getConfiguration();
        Object operatorFactory = configuration.getStreamOperatorFactory(userCodeClassloader);
        Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);
        List<NonChainedOutput> outputsInOrder = configuration.getVertexNonChainedOutputs(userCodeClassloader);
        HashMap recordWriterOutputs = new HashMap(outputsInOrder.size());
        this.streamOutputs = new RecordWriterOutput[outputsInOrder.size()];
        this.finishedOnRestoreInput = this.isTaskDeployedAsFinished() ? new FinishedOnRestoreInput(this.streamOutputs, configuration.getInputs(userCodeClassloader).length) : null;
        boolean success = false;
        try {
            this.createChainOutputs(outputsInOrder, recordWriterDelegate, chainedConfigs, containingTask, recordWriterOutputs);
            ArrayList allOpWrappers = new ArrayList(chainedConfigs.size());
            this.mainOperatorOutput = this.createOutputCollector(containingTask, configuration, chainedConfigs, userCodeClassloader, recordWriterOutputs, allOpWrappers, containingTask.getMailboxExecutorFactory(), operatorFactory != null);
            if (operatorFactory != null) {
                Tuple2 mainOperatorAndTimeService = StreamOperatorFactoryUtil.createOperator(operatorFactory, containingTask, configuration, this.mainOperatorOutput, this.operatorEventDispatcher);
                StreamOperator mainOperator = (StreamOperator)mainOperatorAndTimeService.f0;
                mainOperator.getMetricGroup().gauge("currentOutputWatermark", this.mainOperatorOutput.getWatermarkGauge());
                this.mainOperatorWrapper = this.createOperatorWrapper(mainOperator, containingTask, configuration, (Optional)mainOperatorAndTimeService.f1, true);
                allOpWrappers.add(this.mainOperatorWrapper);
                this.tailOperatorWrapper = (StreamOperatorWrapper)allOpWrappers.get(0);
            } else {
                Preconditions.checkState((allOpWrappers.size() == 0 ? 1 : 0) != 0);
                this.mainOperatorWrapper = null;
                this.tailOperatorWrapper = null;
            }
            this.chainedSources = this.createChainedSources(containingTask, configuration.getInputs(userCodeClassloader), chainedConfigs, userCodeClassloader, allOpWrappers);
            this.numOperators = allOpWrappers.size();
            this.firstOperatorWrapper = this.linkOperatorWrappers(allOpWrappers);
            return;
        }
        catch (Throwable throwable) {
            if (success) throw throwable;
            for (int i = 0; i < this.streamOutputs.length; ++i) {
                if (this.streamOutputs[i] != null) {
                    this.streamOutputs[i].close();
                }
                this.streamOutputs[i] = null;
            }
            throw throwable;
        }
    }

    @VisibleForTesting
    OperatorChain(List<StreamOperatorWrapper<?, ?>> allOperatorWrappers, RecordWriterOutput<?>[] streamOutputs, WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput, StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) {
        this.streamOutputs = streamOutputs;
        this.finishedOnRestoreInput = null;
        this.mainOperatorOutput = (WatermarkGaugeExposingOutput)Preconditions.checkNotNull(mainOperatorOutput);
        this.operatorEventDispatcher = null;
        Preconditions.checkState((allOperatorWrappers != null && allOperatorWrappers.size() > 0 ? 1 : 0) != 0);
        this.mainOperatorWrapper = (StreamOperatorWrapper)Preconditions.checkNotNull(mainOperatorWrapper);
        this.tailOperatorWrapper = allOperatorWrappers.get(0);
        this.numOperators = allOperatorWrappers.size();
        this.chainedSources = Collections.emptyMap();
        this.firstOperatorWrapper = this.linkOperatorWrappers(allOperatorWrappers);
    }

    public abstract boolean isTaskDeployedAsFinished();

    public abstract void dispatchOperatorEvent(OperatorID var1, SerializedValue<OperatorEvent> var2) throws FlinkException;

    public abstract void prepareSnapshotPreBarrier(long var1) throws Exception;

    @Override
    public abstract void endInput(int var1) throws Exception;

    public abstract void initializeStateAndOpenOperators(StreamTaskStateInitializer var1) throws Exception;

    public abstract void finishOperators(StreamTaskActionExecutor var1, StopMode var2) throws Exception;

    public abstract void notifyCheckpointComplete(long var1) throws Exception;

    public abstract void notifyCheckpointAborted(long var1) throws Exception;

    public abstract void notifyCheckpointSubsumed(long var1) throws Exception;

    public abstract void snapshotState(Map<OperatorID, OperatorSnapshotFutures> var1, CheckpointMetaData var2, CheckpointOptions var3, Supplier<Boolean> var4, ChannelStateWriter.ChannelStateWriteResult var5, CheckpointStreamFactory var6) throws Exception;

    public OperatorEventDispatcher getOperatorEventDispatcher() {
        return this.operatorEventDispatcher;
    }

    public void broadcastEvent(AbstractEvent event) throws IOException {
        this.broadcastEvent(event, false);
    }

    public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
        for (RecordWriterOutput<?> streamOutput : this.streamOutputs) {
            streamOutput.broadcastEvent(event, isPriorityEvent);
        }
    }

    public void alignedBarrierTimeout(long checkpointId) throws IOException {
        for (RecordWriterOutput<?> streamOutput : this.streamOutputs) {
            streamOutput.alignedBarrierTimeout(checkpointId);
        }
    }

    public void abortCheckpoint(long checkpointId, CheckpointException cause) {
        for (RecordWriterOutput<?> streamOutput : this.streamOutputs) {
            streamOutput.abortCheckpoint(checkpointId, cause);
        }
    }

    public void closeAllOperators() throws Exception {
        this.isClosed = true;
    }

    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.streamOutputs;
    }

    @VisibleForTesting
    public Iterable<StreamOperatorWrapper<?, ?>> getAllOperators() {
        return this.getAllOperators(false);
    }

    protected Iterable<StreamOperatorWrapper<?, ?>> getAllOperators(boolean reverse) {
        return reverse ? new StreamOperatorWrapper.ReadIterator(this.tailOperatorWrapper, true) : new StreamOperatorWrapper.ReadIterator(this.mainOperatorWrapper, false);
    }

    public Input getFinishedOnRestoreInputOrDefault(Input defaultInput) {
        return this.finishedOnRestoreInput == null ? defaultInput : this.finishedOnRestoreInput;
    }

    public int getNumberOfOperators() {
        return this.numOperators;
    }

    public WatermarkGaugeExposingOutput<StreamRecord<OUT>> getMainOperatorOutput() {
        return this.mainOperatorOutput;
    }

    public ChainedSource getChainedSource(StreamConfig.SourceInputConfig sourceInput) {
        Preconditions.checkArgument((boolean)this.chainedSources.containsKey(sourceInput), (String)"Chained source with sourcedId = [%s] was not found", (Object[])new Object[]{sourceInput});
        return this.chainedSources.get(sourceInput);
    }

    public List<Output<StreamRecord<?>>> getChainedSourceOutputs() {
        return this.chainedSources.values().stream().map(ChainedSource::getSourceOutput).collect(Collectors.toList());
    }

    public StreamTaskSourceInput<?> getSourceTaskInput(StreamConfig.SourceInputConfig sourceInput) {
        Preconditions.checkArgument((boolean)this.chainedSources.containsKey(sourceInput), (String)"Chained source with sourcedId = [%s] was not found", (Object[])new Object[]{sourceInput});
        return this.chainedSources.get(sourceInput).getSourceTaskInput();
    }

    public List<StreamTaskSourceInput<?>> getSourceTaskInputs() {
        return this.chainedSources.values().stream().map(ChainedSource::getSourceTaskInput).collect(Collectors.toList());
    }

    public void flushOutputs() throws IOException {
        for (RecordWriterOutput<?> streamOutput : this.getStreamOutputs()) {
            streamOutput.flush();
        }
    }

    @Override
    public void close() throws IOException {
        this.closer.close();
    }

    @Nullable
    public OP getMainOperator() {
        return this.mainOperatorWrapper == null ? null : (OP)this.mainOperatorWrapper.getStreamOperator();
    }

    @Nullable
    protected StreamOperator<?> getTailOperator() {
        return this.tailOperatorWrapper == null ? null : (StreamOperator<?>)this.tailOperatorWrapper.getStreamOperator();
    }

    protected void snapshotChannelStates(StreamOperator<?> op, ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, OperatorSnapshotFutures snapshotInProgress) {
        if (op == this.getMainOperator()) {
            snapshotInProgress.setInputChannelStateFuture((Future<SnapshotResult<StateObjectCollection<InputChannelStateHandle>>>)((Object)((CompletableFuture)channelStateWriteResult.getInputChannelStateHandles().thenApply(StateObjectCollection::new)).thenApply(SnapshotResult::of)));
        }
        if (op == this.getTailOperator()) {
            snapshotInProgress.setResultSubpartitionStateFuture((Future<SnapshotResult<StateObjectCollection<ResultSubpartitionStateHandle>>>)((Object)((CompletableFuture)channelStateWriteResult.getResultSubpartitionStateHandles().thenApply(StateObjectCollection::new)).thenApply(SnapshotResult::of)));
        }
    }

    public boolean isClosed() {
        return this.isClosed;
    }

    private void createChainOutputs(List<NonChainedOutput> outputsInOrder, RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate, Map<Integer, StreamConfig> chainedConfigs, StreamTask<OUT, OP> containingTask, Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs) {
        for (int i = 0; i < outputsInOrder.size(); ++i) {
            NonChainedOutput output = outputsInOrder.get(i);
            RecordWriterOutput<OUT> recordWriterOutput = this.createStreamOutput(recordWriterDelegate.getRecordWriter(i), output, chainedConfigs.get(output.getSourceNodeId()), containingTask.getEnvironment());
            this.streamOutputs[i] = recordWriterOutput;
            recordWriterOutputs.put(output.getDataSetId(), recordWriterOutput);
        }
    }

    private RecordWriterOutput<OUT> createStreamOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter, NonChainedOutput streamOutput, StreamConfig upStreamConfig, Environment taskEnvironment) {
        OutputTag<?> sideOutputTag = streamOutput.getOutputTag();
        TypeSerializer outSerializer = streamOutput.getOutputTag() != null ? upStreamConfig.getTypeSerializerSideOut(streamOutput.getOutputTag(), taskEnvironment.getUserCodeClassLoader().asClassLoader()) : upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserCodeClassLoader().asClassLoader());
        return (RecordWriterOutput)this.closer.register(new RecordWriterOutput<OUT>(recordWriter, outSerializer, sideOutputTag, streamOutput.supportsUnalignedCheckpoints()));
    }

    private Map<StreamConfig.SourceInputConfig, ChainedSource> createChainedSources(StreamTask<OUT, OP> containingTask, StreamConfig.InputConfig[] configuredInputs, Map<Integer, StreamConfig> chainedConfigs, ClassLoader userCodeClassloader, List<StreamOperatorWrapper<?, ?>> allOpWrappers) {
        if (Arrays.stream(configuredInputs).noneMatch(input -> input instanceof StreamConfig.SourceInputConfig)) {
            return Collections.emptyMap();
        }
        Preconditions.checkState((boolean)(this.mainOperatorWrapper.getStreamOperator() instanceof MultipleInputStreamOperator), (Object)"Creating chained input is only supported with MultipleInputStreamOperator and MultipleInputStreamTask");
        HashMap<StreamConfig.SourceInputConfig, ChainedSource> chainedSourceInputs = new HashMap<StreamConfig.SourceInputConfig, ChainedSource>();
        MultipleInputStreamOperator multipleInputOperator = (MultipleInputStreamOperator)this.mainOperatorWrapper.getStreamOperator();
        List<Input> operatorInputs = multipleInputOperator.getInputs();
        int sourceInputGateIndex = Arrays.stream(containingTask.getEnvironment().getAllInputGates()).mapToInt(IndexedInputGate::getInputGateIndex).max().orElse(-1) + 1;
        for (int inputId = 0; inputId < configuredInputs.length; ++inputId) {
            StreamTaskSourceInput streamTaskSourceInput;
            if (!(configuredInputs[inputId] instanceof StreamConfig.SourceInputConfig)) continue;
            StreamConfig.SourceInputConfig sourceInput = (StreamConfig.SourceInputConfig)configuredInputs[inputId];
            int sourceEdgeId = sourceInput.getInputEdge().getSourceId();
            StreamConfig sourceInputConfig = chainedConfigs.get(sourceEdgeId);
            OutputTag outputTag = sourceInput.getInputEdge().getOutputTag();
            WatermarkGaugeExposingOutput<StreamRecord<?>> chainedSourceOutput = this.createChainedSourceOutput(containingTask, sourceInputConfig, userCodeClassloader, this.getFinishedOnRestoreInputOrDefault(operatorInputs.get(inputId)), multipleInputOperator.getMetricGroup(), outputTag);
            SourceOperator sourceOperator = (SourceOperator)this.createOperator(containingTask, sourceInputConfig, userCodeClassloader, chainedSourceOutput, allOpWrappers, true);
            if (this.isTaskDeployedAsFinished()) {
                ++sourceInputGateIndex;
                streamTaskSourceInput = new StreamTaskFinishedOnRestoreSourceInput(sourceOperator, sourceInputGateIndex, inputId);
            } else {
                ++sourceInputGateIndex;
                streamTaskSourceInput = new StreamTaskSourceInput(sourceOperator, sourceInputGateIndex, inputId);
            }
            chainedSourceInputs.put(sourceInput, new ChainedSource(chainedSourceOutput, streamTaskSourceInput));
        }
        return chainedSourceInputs;
    }

    @Nullable
    private Counter getOperatorRecordsOutCounter(StreamTask<?, ?> containingTask, StreamConfig operatorConfig) {
        ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
        Object operatorFactory = operatorConfig.getStreamOperatorFactory(userCodeClassloader);
        if (operatorFactory instanceof SinkWriterOperatorFactory) {
            return null;
        }
        InternalOperatorMetricGroup operatorMetricGroup = containingTask.getEnvironment().getMetricGroup().getOrAddOperator(operatorConfig.getOperatorID(), operatorConfig.getOperatorName());
        if (operatorConfig.isChainEnd()) {
            operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
        }
        return operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
    }

    private WatermarkGaugeExposingOutput<StreamRecord> createChainedSourceOutput(StreamTask<?, OP> containingTask, StreamConfig sourceInputConfig, ClassLoader userCodeClassloader, Input input, OperatorMetricGroup metricGroup, OutputTag outputTag) {
        ChainingOutput chainedSourceOutput;
        Counter recordsOutCounter = this.getOperatorRecordsOutCounter(containingTask, sourceInputConfig);
        if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
            chainedSourceOutput = new ChainingOutput(input, recordsOutCounter, metricGroup, outputTag);
        } else {
            TypeSerializer inSerializer = sourceInputConfig.getTypeSerializerOut(userCodeClassloader);
            chainedSourceOutput = new CopyingChainingOutput(input, inSerializer, recordsOutCounter, metricGroup, outputTag);
        }
        return (WatermarkGaugeExposingOutput)this.closer.register(chainedSourceOutput);
    }

    private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(StreamTask<?, ?> containingTask, StreamConfig operatorConfig, Map<Integer, StreamConfig> chainedConfigs, ClassLoader userCodeClassloader, Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs, List<StreamOperatorWrapper<?, ?>> allOperatorWrappers, MailboxExecutorFactory mailboxExecutorFactory, boolean shouldAddMetric) {
        Counter recordsOutCounter;
        WatermarkGaugeExposingOutput result;
        ArrayList<WatermarkGaugeExposingOutput<StreamRecord<Object>>> allOutputs = new ArrayList<WatermarkGaugeExposingOutput<StreamRecord<Object>>>(4);
        for (NonChainedOutput streamOutput : operatorConfig.getOperatorNonChainedOutputs(userCodeClassloader)) {
            RecordWriterOutput<?> recordWriterOutput = recordWriterOutputs.get(streamOutput.getDataSetId());
            allOutputs.add(recordWriterOutput);
        }
        for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
            int outputId = outputEdge.getTargetId();
            StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
            WatermarkGaugeExposingOutput output = this.createOperatorChain(containingTask, operatorConfig, chainedOpConfig, chainedConfigs, userCodeClassloader, recordWriterOutputs, allOperatorWrappers, outputEdge.getOutputTag(), mailboxExecutorFactory, shouldAddMetric);
            allOutputs.add(output);
            shouldAddMetric = false;
        }
        if (allOutputs.size() == 1) {
            result = (CountingOutput)allOutputs.get(0);
        } else {
            Output[] asArray = new Output[allOutputs.size()];
            for (int i = 0; i < allOutputs.size(); ++i) {
                asArray[i] = (Output)allOutputs.get(i);
            }
            result = containingTask.getExecutionConfig().isObjectReuseEnabled() ? (WatermarkGaugeExposingOutput)this.closer.register(new CopyingBroadcastingOutputCollector(asArray)) : (WatermarkGaugeExposingOutput)this.closer.register(new BroadcastingOutputCollector(asArray));
        }
        if (shouldAddMetric && (recordsOutCounter = this.getOperatorRecordsOutCounter(containingTask, operatorConfig)) != null) {
            result = new CountingOutput(result, recordsOutCounter);
        }
        return result;
    }

    private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createOperatorChain(StreamTask<OUT, ?> containingTask, StreamConfig prevOperatorConfig, StreamConfig operatorConfig, Map<Integer, StreamConfig> chainedConfigs, ClassLoader userCodeClassloader, Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs, List<StreamOperatorWrapper<?, ?>> allOperatorWrappers, OutputTag<IN> outputTag, MailboxExecutorFactory mailboxExecutorFactory, boolean shouldAddMetricForPrevOperator) {
        WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = this.createOutputCollector(containingTask, operatorConfig, chainedConfigs, userCodeClassloader, recordWriterOutputs, allOperatorWrappers, mailboxExecutorFactory, true);
        OneInputStreamOperator chainedOperator = (OneInputStreamOperator)this.createOperator(containingTask, operatorConfig, userCodeClassloader, chainedOperatorOutput, allOperatorWrappers, false);
        return this.wrapOperatorIntoOutput(chainedOperator, containingTask, prevOperatorConfig, operatorConfig, userCodeClassloader, outputTag, shouldAddMetricForPrevOperator);
    }

    private <OUT, OP extends StreamOperator<OUT>> OP createOperator(StreamTask<OUT, ?> containingTask, StreamConfig operatorConfig, ClassLoader userCodeClassloader, WatermarkGaugeExposingOutput<StreamRecord<OUT>> output, List<StreamOperatorWrapper<?, ?>> allOperatorWrappers, boolean isHead) {
        Tuple2 chainedOperatorAndTimeService = StreamOperatorFactoryUtil.createOperator(operatorConfig.getStreamOperatorFactory(userCodeClassloader), containingTask, operatorConfig, output, this.operatorEventDispatcher);
        StreamOperator chainedOperator = (StreamOperator)chainedOperatorAndTimeService.f0;
        allOperatorWrappers.add(this.createOperatorWrapper(chainedOperator, containingTask, operatorConfig, (Optional)chainedOperatorAndTimeService.f1, isHead));
        chainedOperator.getMetricGroup().gauge("currentOutputWatermark", () -> output.getWatermarkGauge().getValue());
        return (OP)chainedOperator;
    }

    private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> wrapOperatorIntoOutput(OneInputStreamOperator<IN, OUT> operator, StreamTask<OUT, ?> containingTask, StreamConfig prevOperatorConfig, StreamConfig operatorConfig, ClassLoader userCodeClassloader, OutputTag<IN> outputTag, boolean shouldAddMetricForPrevOperator) {
        ChainingOutput<IN> currentOperatorOutput;
        Counter recordsOutCounter = null;
        if (shouldAddMetricForPrevOperator) {
            recordsOutCounter = this.getOperatorRecordsOutCounter(containingTask, prevOperatorConfig);
        }
        if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
            currentOperatorOutput = new ChainingOutput<IN>(operator, recordsOutCounter, operator.getMetricGroup(), outputTag);
        } else {
            TypeSerializer inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
            currentOperatorOutput = new CopyingChainingOutput<IN>(operator, inSerializer, recordsOutCounter, operator.getMetricGroup(), outputTag);
        }
        operator.getMetricGroup().gauge("currentInputWatermark", () -> currentOperatorOutput.getWatermarkGauge().getValue());
        return (WatermarkGaugeExposingOutput)this.closer.register(currentOperatorOutput);
    }

    private StreamOperatorWrapper<?, ?> linkOperatorWrappers(List<StreamOperatorWrapper<?, ?>> allOperatorWrappers) {
        StreamOperatorWrapper<?, ?> previous = null;
        for (StreamOperatorWrapper<?, ?> current : allOperatorWrappers) {
            if (previous != null) {
                previous.setPrevious(current);
            }
            current.setNext(previous);
            previous = current;
        }
        return previous;
    }

    private <T, P extends StreamOperator<T>> StreamOperatorWrapper<T, P> createOperatorWrapper(P operator, StreamTask<?, ?> containingTask, StreamConfig operatorConfig, Optional<ProcessingTimeService> processingTimeService, boolean isHead) {
        return new StreamOperatorWrapper(operator, processingTimeService, containingTask.getMailboxExecutorFactory().createExecutor(operatorConfig.getChainIndex()), isHead);
    }

    protected void sendAcknowledgeCheckpointEvent(long checkpointId) {
        if (this.operatorEventDispatcher == null) {
            return;
        }
        this.operatorEventDispatcher.getRegisteredOperators().forEach(x -> this.operatorEventDispatcher.getOperatorEventGateway((OperatorID)x).sendEventToCoordinator((OperatorEvent)new AcknowledgeCheckpointEvent(checkpointId)));
    }

    public static class ChainedSource {
        private final WatermarkGaugeExposingOutput<StreamRecord<?>> chainedSourceOutput;
        private final StreamTaskSourceInput<?> sourceTaskInput;

        public ChainedSource(WatermarkGaugeExposingOutput<StreamRecord<?>> chainedSourceOutput, StreamTaskSourceInput<?> sourceTaskInput) {
            this.chainedSourceOutput = chainedSourceOutput;
            this.sourceTaskInput = sourceTaskInput;
        }

        public WatermarkGaugeExposingOutput<StreamRecord<?>> getSourceOutput() {
            return this.chainedSourceOutput;
        }

        public StreamTaskSourceInput<?> getSourceTaskInput() {
            return this.sourceTaskInput;
        }
    }
}

