/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Random;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OutputWithChainingCheck;
import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.XORShiftRandom;

class BroadcastingOutputCollector<T>
implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
    protected final OutputWithChainingCheck<StreamRecord<T>>[] outputs;
    private final Random random = new XORShiftRandom();
    private final WatermarkGauge watermarkGauge = new WatermarkGauge();
    protected final Counter numRecordsOutForTask;

    public BroadcastingOutputCollector(OutputWithChainingCheck<StreamRecord<T>>[] outputs, Counter numRecordsOutForTask) {
        this.outputs = outputs;
        this.numRecordsOutForTask = numRecordsOutForTask;
    }

    @Override
    public void emitWatermark(Watermark mark) {
        this.watermarkGauge.setCurrentWatermark(mark.getTimestamp());
        for (OutputWithChainingCheck<StreamRecord<T>> output : this.outputs) {
            output.emitWatermark(mark);
        }
    }

    @Override
    public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
        for (OutputWithChainingCheck<StreamRecord<T>> output : this.outputs) {
            output.emitWatermarkStatus(watermarkStatus);
        }
    }

    @Override
    public void emitLatencyMarker(LatencyMarker latencyMarker) {
        if (this.outputs.length > 0) {
            if (this.outputs.length == 1) {
                this.outputs[0].emitLatencyMarker(latencyMarker);
            } else {
                this.outputs[this.random.nextInt(this.outputs.length)].emitLatencyMarker(latencyMarker);
            }
        }
    }

    @Override
    public Gauge<Long> getWatermarkGauge() {
        return this.watermarkGauge;
    }

    public void collect(StreamRecord<T> record) {
        boolean emitted = false;
        for (OutputWithChainingCheck<StreamRecord<T>> output : this.outputs) {
            emitted |= output.collectAndCheckIfChained(record);
        }
        if (emitted) {
            this.numRecordsOutForTask.inc();
        }
    }

    @Override
    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
        boolean emitted = false;
        for (OutputWithChainingCheck<StreamRecord<T>> output : this.outputs) {
            emitted |= output.collectAndCheckIfChained(outputTag, record);
        }
        if (emitted) {
            this.numRecordsOutForTask.inc();
        }
    }

    @Override
    public void close() {
        for (OutputWithChainingCheck<StreamRecord<T>> output : this.outputs) {
            output.close();
        }
    }
}

