package org.apache.flink.streaming.runtime.tasks;

import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.operators.StreamSourceV2;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
import org.apache.flink.streaming.runtime.io.StreamArbitraryInputProcessor;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusSubMaintainer;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/ArbitraryInputStreamTask.class */
public class ArbitraryInputStreamTask<OUT> extends StreamTask<OUT, StreamOperator<OUT>> {
    private StreamArbitraryInputProcessor processor;

    public ArbitraryInputStreamTask(Environment environment) {
        super(environment);
    }

    public ArbitraryInputStreamTask(Environment environment, @Nullable ProcessingTimeService processingTimeService) {
        super(environment, processingTimeService);
    }

    /* JADX WARN: Type inference failed for: r5v12, types: [org.apache.flink.runtime.io.network.partition.consumer.InputGate[], org.apache.flink.runtime.io.network.partition.consumer.InputGate[][]] */
    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void init() throws Exception {
        StreamTaskConfigSnapshot streamTaskConfig = getStreamTaskConfig();
        ClassLoader userCodeClassLoader = getUserCodeClassLoader();
        this.processor = new StreamArbitraryInputProcessor(getEnvironment().getIOManager(), getCheckpointLock(), this.operatorChain, getEnvironment().getMetricGroup(), getEnvironment().getAllInputGates().length > 0 ? InputProcessorUtil.createCheckpointBarrierHandler(streamTaskConfig.isCheckpointingEnabled(), this, streamTaskConfig.getCheckpointMode(), getEnvironment().getIOManager(), getEnvironment().getTaskManagerInfo().getConfiguration(), (InputGate[][]) new InputGate[]{getEnvironment().getAllInputGates()}) : null);
        BitSet bitSet = new BitSet();
        Iterator<Integer> it = streamTaskConfig.getChainedHeadNodeIds().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            StreamOperator headOperator = this.operatorChain.getHeadOperator(intValue);
            if (headOperator instanceof StreamSource) {
                throw new UnsupportedOperationException("Source operator v1 is not supported in ArbitraryInputStreamTask");
            }
            if (headOperator instanceof StreamSourceV2) {
                StreamSourceV2 streamSourceV2 = (StreamSourceV2) headOperator;
                StreamStatusSubMaintainer streamStatusSubMaintainer = new StreamStatusSubMaintainer(this.operatorChain, bitSet, 0);
                this.processor.bindSourceOperator(intValue, streamSourceV2, (OneInputStreamOperator) this.operatorChain.getOperatorProxy(intValue), StreamSourceContexts.getSourceContext(streamSourceV2.getOperatorConfig().getTimeCharacteristic(), getProcessingTimeService(), getCheckpointLock(), streamStatusSubMaintainer, streamSourceV2.getOutput(), streamSourceV2.getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(), -1L), streamStatusSubMaintainer);
            }
        }
        Map<Integer, StreamConfig> chainedNodeConfigs = streamTaskConfig.getChainedNodeConfigs();
        int i = 0;
        List<StreamEdge> inStreamEdgesOfChain = streamTaskConfig.getInStreamEdgesOfChain();
        for (int i2 = 0; i2 < inStreamEdgesOfChain.size(); i2++) {
            StreamEdge streamEdge = inStreamEdgesOfChain.get(i2);
            InputGate inputGate = getEnvironment().getInputGate(i2);
            Object operatorProxy = this.operatorChain.getOperatorProxy(streamEdge.getTargetId());
            Preconditions.checkNotNull(operatorProxy);
            StreamConfig streamConfig = chainedNodeConfigs.get(Integer.valueOf(streamEdge.getTargetId()));
            Preconditions.checkNotNull(streamConfig);
            StreamStatusSubMaintainer streamStatusSubMaintainer2 = new StreamStatusSubMaintainer(this.operatorChain, bitSet, 0 + i2);
            if (operatorProxy instanceof OneInputStreamOperator) {
                this.processor.bindOneInputOperator(streamEdge, inputGate, i, (OneInputStreamOperator) operatorProxy, streamConfig.getTypeSerializerIn1(userCodeClassLoader), streamStatusSubMaintainer2, getExecutionConfig().isObjectReuseEnabled(), getEnvironment().getTaskManagerInfo().getConfiguration());
            } else {
                if (!(operatorProxy instanceof TwoInputStreamOperator)) {
                    throw new RuntimeException("Unsupported of " + operatorProxy + " yet");
                }
                if (streamEdge.getTypeNumber() == 1) {
                    this.processor.bindFirstOfTwoInputOperator(streamEdge, inputGate, i, (TwoInputStreamOperator) operatorProxy, streamConfig.getTypeSerializerIn1(userCodeClassLoader), streamStatusSubMaintainer2, getExecutionConfig().isObjectReuseEnabled(), getEnvironment().getTaskManagerInfo().getConfiguration());
                } else {
                    this.processor.bindSecondOfTwoInputOperator(streamEdge, inputGate, i, (TwoInputStreamOperator) operatorProxy, streamConfig.getTypeSerializerIn2(userCodeClassLoader), streamStatusSubMaintainer2, getExecutionConfig().isObjectReuseEnabled(), getEnvironment().getTaskManagerInfo().getConfiguration());
                }
            }
            i += inputGate.getNumberOfInputChannels();
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void run() throws Exception {
        this.processor.process();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cleanup() throws Exception {
        if (this.processor != null) {
            this.processor.cleanup();
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cancelTask() {
        if (this.processor != null) {
            this.processor.stop();
        }
    }
}
