/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.sort.SortingDataInput;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.AbstractDataOutput;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.util.Preconditions;

@Internal
public class OneInputStreamTask<IN, OUT>
extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
    private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge();

    public OneInputStreamTask(Environment env) throws Exception {
        super(env);
    }

    @VisibleForTesting
    public OneInputStreamTask(Environment env, @Nullable TimerService timeProvider) throws Exception {
        super(env, timeProvider);
    }

    @Override
    public void init() throws Exception {
        StreamConfig configuration = this.getConfiguration();
        int numberOfInputs = configuration.getNumberOfNetworkInputs();
        if (numberOfInputs > 0) {
            CheckpointedInputGate inputGate = this.createCheckpointedInputGate();
            Counter numRecordsIn = this.setupNumRecordsInCounter(this.mainOperator);
            PushingAsyncDataInput.DataOutput<IN> output = this.createDataOutput(numRecordsIn);
            StreamTaskInput<IN> input = this.createTaskInput(inputGate);
            if (configuration.shouldSortInputs()) {
                Preconditions.checkState((!configuration.isCheckpointingEnabled() ? 1 : 0) != 0, (Object)"Checkpointing is not allowed with sorted inputs.");
                input = this.wrapWithSorted(input);
            }
            this.getEnvironment().getMetricGroup().getIOMetricGroup().reuseRecordsInputCounter(numRecordsIn);
            this.inputProcessor = new StreamOneInputProcessor<IN>(input, output, this.operatorChain);
        }
        ((OneInputStreamOperator)this.mainOperator).getMetricGroup().gauge("currentInputWatermark", (Gauge)this.inputWatermarkGauge);
        this.getEnvironment().getMetricGroup().gauge("currentInputWatermark", this.inputWatermarkGauge::getValue);
    }

    private StreamTaskInput<IN> wrapWithSorted(StreamTaskInput<IN> input) {
        ClassLoader userCodeClassLoader = this.getUserCodeClassLoader();
        return new SortingDataInput(input, this.configuration.getTypeSerializerIn(input.getInputIndex(), userCodeClassLoader), this.configuration.getStateKeySerializer(userCodeClassLoader), this.configuration.getStatePartitioner(input.getInputIndex(), userCodeClassLoader), this.getEnvironment().getMemoryManager(), this.getEnvironment().getIOManager(), this.getExecutionConfig().isObjectReuseEnabled(), this.configuration.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.BATCH_OP, this.getTaskConfiguration(), userCodeClassLoader), this.getJobConfiguration(), this);
    }

    private CheckpointedInputGate createCheckpointedInputGate() {
        IndexedInputGate[] inputGates = this.getEnvironment().getAllInputGates();
        return InputProcessorUtil.createCheckpointedInputGate(this, this.configuration, this.getCheckpointCoordinator(), inputGates, this.getEnvironment().getMetricGroup().getIOMetricGroup(), this.getTaskNameWithSubtaskAndId(), this.mainMailboxExecutor);
    }

    private PushingAsyncDataInput.DataOutput<IN> createDataOutput(Counter numRecordsIn) {
        return new StreamTaskNetworkOutput((OneInputStreamOperator)this.mainOperator, this.getStreamStatusMaintainer(), this.inputWatermarkGauge, numRecordsIn);
    }

    private StreamTaskInput<IN> createTaskInput(CheckpointedInputGate inputGate) {
        int numberOfInputChannels = inputGate.getNumberOfInputChannels();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(numberOfInputChannels);
        TypeSerializer inSerializer = this.configuration.getTypeSerializerIn1(this.getUserCodeClassLoader());
        return new StreamTaskNetworkInput(inputGate, inSerializer, this.getEnvironment().getIOManager(), statusWatermarkValve, 0);
    }

    private static class StreamTaskNetworkOutput<IN>
    extends AbstractDataOutput<IN> {
        private final OneInputStreamOperator<IN, ?> operator;
        private final WatermarkGauge watermarkGauge;
        private final Counter numRecordsIn;

        private StreamTaskNetworkOutput(OneInputStreamOperator<IN, ?> operator, StreamStatusMaintainer streamStatusMaintainer, WatermarkGauge watermarkGauge, Counter numRecordsIn) {
            super(streamStatusMaintainer);
            this.operator = (OneInputStreamOperator)Preconditions.checkNotNull(operator);
            this.watermarkGauge = (WatermarkGauge)Preconditions.checkNotNull((Object)watermarkGauge);
            this.numRecordsIn = (Counter)Preconditions.checkNotNull((Object)numRecordsIn);
        }

        @Override
        public void emitRecord(StreamRecord<IN> record) throws Exception {
            this.numRecordsIn.inc();
            this.operator.setKeyContextElement1(record);
            this.operator.processElement(record);
        }

        @Override
        public void emitWatermark(Watermark watermark) throws Exception {
            this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            this.operator.processWatermark(watermark);
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            this.operator.processLatencyMarker(latencyMarker);
        }
    }
}

