package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ScheduledFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.SimpleHistogram;
import org.apache.flink.runtime.metrics.SumAndCount;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.OutputTag;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSource.class */
public class StreamSource<OUT, SRC extends SourceFunction<OUT>> extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {
    private static final long serialVersionUID = 1;
    private transient SourceFunction.SourceContext<OUT> ctx;
    private volatile transient boolean canceledOrStopped;
    private transient boolean enableTracingMetrics;
    private transient int tracingMetricsInterval;
    private transient SumAndCount taskLatency;
    private transient Histogram sourceLatency;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSource$LatencyMarksEmitter.class */
    public static class LatencyMarksEmitter<OUT> {
        private final ScheduledFuture<?> latencyMarkTimer;

        public LatencyMarksEmitter(ProcessingTimeService processingTimeService, final Output<StreamRecord<OUT>> output, long j, final OperatorID operatorID, final int i) {
            this.latencyMarkTimer = processingTimeService.scheduleAtFixedRate(new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.api.operators.StreamSource.LatencyMarksEmitter.1
                @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback
                public void onProcessingTime(long j2) throws Exception {
                    try {
                        output.emitLatencyMarker(new LatencyMarker(j2, operatorID, i));
                    } catch (Throwable th) {
                        AbstractStreamOperator.LOG.warn("Error while emitting latency marker.", th);
                    }
                }
            }, 0L, j);
        }

        public void close() {
            this.latencyMarkTimer.cancel(true);
        }
    }

    public StreamSource(SRC src) {
        super(src);
        this.canceledOrStopped = false;
        this.enableTracingMetrics = false;
        this.chainingStrategy = ChainingStrategy.HEAD;
    }

    public void run(Object obj, StreamStatusMaintainer streamStatusMaintainer) throws Exception {
        run(obj, streamStatusMaintainer, this.output);
    }

    public void run(Object obj, StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<OUT>> output) throws Exception {
        TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
        this.enableTracingMetrics = getRuntimeContext().getExecutionConfig().isTracingMetricsEnabled();
        if (this.enableTracingMetrics) {
            if (this.taskLatency == null) {
                this.taskLatency = new SumAndCount("taskLatency", getRuntimeContext().getMetricGroup());
            }
            if (this.sourceLatency == null) {
                this.sourceLatency = getRuntimeContext().getMetricGroup().histogram("sourceLatency", new SimpleHistogram());
            }
            this.tracingMetricsInterval = getRuntimeContext().getExecutionConfig().getTracingMetricsInterval();
        }
        LatencyMarksEmitter latencyMarksEmitter = null;
        if (getExecutionConfig().isLatencyTrackingEnabled()) {
            latencyMarksEmitter = new LatencyMarksEmitter(getProcessingTimeService(), output, getExecutionConfig().getLatencyTrackingInterval(), getOperatorID(), getRuntimeContext().getIndexOfThisSubtask());
        }
        this.ctx = getSourceContext(timeCharacteristic, getProcessingTimeService(), obj, streamStatusMaintainer, output, getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
        try {
            ((SourceFunction) this.userFunction).run(this.ctx);
            if (!isCanceledOrStopped()) {
                this.ctx.emitWatermark(Watermark.MAX_WATERMARK);
            }
        } finally {
            this.ctx.close();
            if (latencyMarksEmitter != null) {
                latencyMarksEmitter.close();
            }
        }
    }

    @VisibleForTesting
    protected SourceFunction.SourceContext<OUT> getSourceContext(TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService, Object obj, StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<OUT>> output, boolean z, int i, SumAndCount sumAndCount, Histogram histogram, long j) {
        return getSourceContext(timeCharacteristic, processingTimeService, obj, streamStatusMaintainer, getOutputWithTaskLatency(output, z, i, sumAndCount, histogram), j);
    }

    private SourceFunction.SourceContext<OUT> getSourceContext(TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService, Object obj, StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<OUT>> output, long j) {
        return StreamSourceContexts.getSourceContext(timeCharacteristic, processingTimeService, obj, streamStatusMaintainer, getOutputWithTaskLatency(output, this.enableTracingMetrics, this.tracingMetricsInterval, this.taskLatency, this.sourceLatency), j, -1L);
    }

    private Output<StreamRecord<OUT>> getOutputWithTaskLatency(final Output<StreamRecord<OUT>> output, final boolean z, final int i, final SumAndCount sumAndCount, final Histogram histogram) {
        return new Output<StreamRecord<OUT>>() { // from class: org.apache.flink.streaming.api.operators.StreamSource.1
            private long lastEmitTime = 0;
            private long emitCounter = 0;

            @Override // org.apache.flink.streaming.api.operators.Output
            public void emitWatermark(Watermark watermark) {
                output.emitWatermark(watermark);
            }

            @Override // org.apache.flink.streaming.api.operators.Output
            public void emitLatencyMarker(LatencyMarker latencyMarker) {
                output.emitLatencyMarker(latencyMarker);
            }

            /*  JADX ERROR: Failed to decode insn: 0x000C: MOVE_MULTI, method: org.apache.flink.streaming.api.operators.StreamSource.1.collect(org.apache.flink.streaming.runtime.streamrecord.StreamRecord<OUT>):void
                java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
                	at java.base/java.lang.System.arraycopy(Native Method)
                	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
                	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
                	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
                	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
                	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
                	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
                	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
                	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
                	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
                	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
                	at jadx.core.ProcessClass.process(ProcessClass.java:70)
                	at jadx.core.ProcessClass.generateCode(ProcessClass.java:110)
                	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
                	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
                	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
                */
            public void collect(org.apache.flink.streaming.runtime.streamrecord.StreamRecord<OUT> r9) {
                /*
                    r8 = this;
                    r0 = r8
                    boolean r0 = r7
                    if (r0 == 0) goto L25
                    r0 = r8
                    r1 = r0
                    long r1 = r1.emitCounter
                    // decode failed: arraycopy: source index -1 out of bounds for object array[8]
                    r2 = 1
                    long r1 = r1 + r2
                    r0.emitCounter = r1
                    r0 = r8
                    int r0 = r8
                    long r0 = (long) r0
                    long r-1 = r-1 % r0
                    r0 = 0
                    int r-1 = (r-1 > r0 ? 1 : (r-1 == r0 ? 0 : -1))
                    if (r-1 != 0) goto L25
                    r-1 = r8
                    r0 = r9
                    r-1.collectWithMetrics(r0)
                    goto L2f
                    r0 = r8
                    org.apache.flink.streaming.api.operators.Output r0 = r6
                    r1 = r9
                    r0.collect(r1)
                    return
                */
                throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.api.operators.StreamSource.AnonymousClass1.collect(org.apache.flink.streaming.runtime.streamrecord.StreamRecord):void");
            }

            public void collectWithMetrics(StreamRecord<OUT> streamRecord) {
                long nanoTime = System.nanoTime();
                if (this.lastEmitTime > 0) {
                    histogram.update(nanoTime - this.lastEmitTime);
                }
                output.collect(streamRecord);
                this.lastEmitTime = System.nanoTime();
                sumAndCount.update(this.lastEmitTime - nanoTime);
            }

            /*  JADX ERROR: Failed to decode insn: 0x000C: MOVE_MULTI, method: org.apache.flink.streaming.api.operators.StreamSource.1.collect(org.apache.flink.util.OutputTag<X>, org.apache.flink.streaming.runtime.streamrecord.StreamRecord<X>):void
                java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
                	at java.base/java.lang.System.arraycopy(Native Method)
                	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
                	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
                	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
                	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
                	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
                	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
                	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
                	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
                	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
                	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
                	at jadx.core.ProcessClass.process(ProcessClass.java:70)
                	at jadx.core.ProcessClass.generateCode(ProcessClass.java:110)
                	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
                	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
                	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
                */
            @Override // org.apache.flink.streaming.api.operators.Output
            public <X> void collect(org.apache.flink.util.OutputTag<X> r9, org.apache.flink.streaming.runtime.streamrecord.StreamRecord<X> r10) {
                /*
                    r8 = this;
                    r0 = r8
                    boolean r0 = r7
                    if (r0 == 0) goto L26
                    r0 = r8
                    r1 = r0
                    long r1 = r1.emitCounter
                    // decode failed: arraycopy: source index -1 out of bounds for object array[8]
                    r2 = 1
                    long r1 = r1 + r2
                    r0.emitCounter = r1
                    r0 = r8
                    int r0 = r8
                    long r0 = (long) r0
                    long r-1 = r-1 % r0
                    r0 = 0
                    int r-1 = (r-1 > r0 ? 1 : (r-1 == r0 ? 0 : -1))
                    if (r-1 != 0) goto L26
                    r-1 = r8
                    r0 = r9
                    r1 = r10
                    r-1.collectWithMetrics(r0, r1)
                    goto L31
                    r0 = r8
                    org.apache.flink.streaming.api.operators.Output r0 = r6
                    r1 = r9
                    r2 = r10
                    r0.collect(r1, r2)
                    return
                */
                throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.api.operators.StreamSource.AnonymousClass1.collect(org.apache.flink.util.OutputTag, org.apache.flink.streaming.runtime.streamrecord.StreamRecord):void");
            }

            public <X> void collectWithMetrics(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
                long nanoTime = System.nanoTime();
                if (this.lastEmitTime > 0) {
                    histogram.update(nanoTime - this.lastEmitTime);
                }
                output.collect(outputTag, streamRecord);
                this.lastEmitTime = System.nanoTime();
                sumAndCount.update(this.lastEmitTime - nanoTime);
            }

            public void close() {
                output.close();
            }
        };
    }

    public void cancel() {
        markCanceledOrStopped();
        ((SourceFunction) this.userFunction).cancel();
        if (this.ctx != null) {
            this.ctx.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void markCanceledOrStopped() {
        this.canceledOrStopped = true;
    }

    public boolean isCanceledOrStopped() {
        return this.canceledOrStopped;
    }
}
