package org.apache.flink.streaming.runtime.io;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusSubMaintainer;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/OneInputProcessor.class */
class OneInputProcessor implements InputProcessor, StatusWatermarkValve.ValveOutputHandler {
    private Counter numRecordsIn;
    private final OneInputStreamOperator operator;
    private final WatermarkGauge watermarkGauge = new WatermarkGauge();
    private final StatusWatermarkValve statusWatermarkValve;
    private final Object checkpointLock;
    private final TaskMetricGroup taskMetricGroup;
    private final StreamStatusSubMaintainer streamStatusSubMaintainer;

    public OneInputProcessor(StreamStatusSubMaintainer streamStatusSubMaintainer, OneInputStreamOperator oneInputStreamOperator, Object obj, TaskMetricGroup taskMetricGroup, MinWatermarkGauge minWatermarkGauge, int i) {
        this.streamStatusSubMaintainer = streamStatusSubMaintainer;
        this.checkpointLock = Preconditions.checkNotNull(obj);
        this.taskMetricGroup = (TaskMetricGroup) Preconditions.checkNotNull(taskMetricGroup);
        this.statusWatermarkValve = new StatusWatermarkValve(i, this);
        this.operator = (OneInputStreamOperator) Preconditions.checkNotNull(oneInputStreamOperator);
        oneInputStreamOperator.getMetricGroup().gauge("currentInputWatermark", this.watermarkGauge);
        minWatermarkGauge.addWatermarkGauge(this.watermarkGauge);
        this.numRecordsIn = oneInputStreamOperator.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
    }

    @Override // org.apache.flink.streaming.runtime.io.InputProcessor
    public void processRecord(StreamRecord streamRecord, int i) throws Exception {
        synchronized (this.checkpointLock) {
            this.numRecordsIn.inc();
            this.operator.setKeyContextElement1(streamRecord);
            this.operator.processElement(streamRecord);
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.InputProcessor
    public void processLatencyMarker(LatencyMarker latencyMarker, int i) throws Exception {
        synchronized (this.checkpointLock) {
            this.operator.processLatencyMarker(latencyMarker);
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.InputProcessor
    public void processWatermark(Watermark watermark, int i) throws Exception {
        this.statusWatermarkValve.inputWatermark(watermark, i);
    }

    @Override // org.apache.flink.streaming.runtime.io.InputProcessor
    public void processStreamStatus(StreamStatus streamStatus, int i) throws Exception {
        this.statusWatermarkValve.inputStreamStatus(streamStatus, i);
    }

    @Override // org.apache.flink.streaming.runtime.io.InputProcessor
    public void endInput() throws Exception {
        this.operator.endInput();
    }

    @Override // org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.ValveOutputHandler
    public void handleWatermark(Watermark watermark) {
        try {
            this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            this.operator.processWatermark(watermark);
        } catch (Exception e) {
            throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
        }
    }

    @Override // org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.ValveOutputHandler
    public void handleStreamStatus(StreamStatus streamStatus) {
        this.streamStatusSubMaintainer.updateStreamStatus(streamStatus);
    }

    @Override // org.apache.flink.streaming.runtime.io.InputProcessor
    public void release() {
        this.streamStatusSubMaintainer.release();
    }

    @VisibleForTesting
    WatermarkGauge getWatermarkGauge() {
        return this.watermarkGauge;
    }
}
