/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.util.Preconditions;

@Internal
public final class StreamTaskNetworkInput<T>
implements StreamTaskInput<T> {
    private final CheckpointedInputGate checkpointedInputGate;
    private final DeserializationDelegate<StreamElement> deserializationDelegate;
    private final Map<InputChannelInfo, RecordDeserializer<DeserializationDelegate<StreamElement>>> recordDeserializers;
    private final Map<InputChannelInfo, Integer> flattenedChannelIndices;
    private final StatusWatermarkValve statusWatermarkValve;
    private final int inputIndex;
    private InputChannelInfo lastChannel = null;
    private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer = null;

    public StreamTaskNetworkInput(CheckpointedInputGate checkpointedInputGate, TypeSerializer<?> inputSerializer, IOManager ioManager, StatusWatermarkValve statusWatermarkValve, int inputIndex) {
        this.checkpointedInputGate = checkpointedInputGate;
        this.deserializationDelegate = new NonReusingDeserializationDelegate(new StreamElementSerializer(inputSerializer));
        this.recordDeserializers = checkpointedInputGate.getChannelInfos().stream().collect(Collectors.toMap(Function.identity(), unused -> new SpillingAdaptiveSpanningRecordDeserializer(ioManager.getSpillingDirectoriesPaths())));
        this.flattenedChannelIndices = new HashMap<InputChannelInfo, Integer>();
        for (InputChannelInfo i : checkpointedInputGate.getChannelInfos()) {
            this.flattenedChannelIndices.put(i, this.flattenedChannelIndices.size());
        }
        this.statusWatermarkValve = (StatusWatermarkValve)Preconditions.checkNotNull((Object)statusWatermarkValve);
        this.inputIndex = inputIndex;
    }

    @VisibleForTesting
    StreamTaskNetworkInput(CheckpointedInputGate checkpointedInputGate, TypeSerializer<?> inputSerializer, StatusWatermarkValve statusWatermarkValve, int inputIndex, RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers) {
        Preconditions.checkArgument((checkpointedInputGate.getChannelInfos().stream().map(InputChannelInfo::getGateIdx).distinct().count() <= 1L ? 1 : 0) != 0);
        Preconditions.checkArgument((checkpointedInputGate.getNumberOfInputChannels() == recordDeserializers.length ? 1 : 0) != 0);
        this.checkpointedInputGate = checkpointedInputGate;
        this.deserializationDelegate = new NonReusingDeserializationDelegate(new StreamElementSerializer(inputSerializer));
        this.recordDeserializers = checkpointedInputGate.getChannelInfos().stream().collect(Collectors.toMap(Function.identity(), info -> recordDeserializers[info.getInputChannelIdx()]));
        this.flattenedChannelIndices = new HashMap<InputChannelInfo, Integer>();
        for (InputChannelInfo i : checkpointedInputGate.getChannelInfos()) {
            this.flattenedChannelIndices.put(i, this.flattenedChannelIndices.size());
        }
        this.statusWatermarkValve = statusWatermarkValve;
        this.inputIndex = inputIndex;
    }

    @Override
    public InputStatus emitNext(PushingAsyncDataInput.DataOutput<T> output) throws Exception {
        block7: {
            Optional<BufferOrEvent> bufferOrEvent;
            while (true) {
                if (this.currentRecordDeserializer != null) {
                    RecordDeserializer.DeserializationResult result;
                    try {
                        result = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);
                    }
                    catch (IOException e) {
                        throw new IOException(String.format("Can't get next record for channel %s", this.lastChannel), e);
                    }
                    if (result.isBufferConsumed()) {
                        this.currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                        this.currentRecordDeserializer = null;
                    }
                    if (result.isFullRecord()) {
                        this.processElement((StreamElement)this.deserializationDelegate.getInstance(), output);
                        return InputStatus.MORE_AVAILABLE;
                    }
                }
                if (!(bufferOrEvent = this.checkpointedInputGate.pollNext()).isPresent()) break block7;
                if (!bufferOrEvent.get().isBuffer()) break;
                this.processBuffer(bufferOrEvent.get());
            }
            this.processEvent(bufferOrEvent.get());
            return InputStatus.MORE_AVAILABLE;
        }
        if (this.checkpointedInputGate.isFinished()) {
            Preconditions.checkState((boolean)this.checkpointedInputGate.getAvailableFuture().isDone(), (Object)"Finished BarrierHandler should be available");
            return InputStatus.END_OF_INPUT;
        }
        return InputStatus.NOTHING_AVAILABLE;
    }

    private void processElement(StreamElement recordOrMark, PushingAsyncDataInput.DataOutput<T> output) throws Exception {
        if (recordOrMark.isRecord()) {
            output.emitRecord(recordOrMark.asRecord());
        } else if (recordOrMark.isWatermark()) {
            this.statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), this.flattenedChannelIndices.get(this.lastChannel), output);
        } else if (recordOrMark.isLatencyMarker()) {
            output.emitLatencyMarker(recordOrMark.asLatencyMarker());
        } else if (recordOrMark.isStreamStatus()) {
            this.statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), this.flattenedChannelIndices.get(this.lastChannel), output);
        } else {
            throw new UnsupportedOperationException("Unknown type of StreamElement");
        }
    }

    private void processEvent(BufferOrEvent bufferOrEvent) {
        AbstractEvent event = bufferOrEvent.getEvent();
        if (event.getClass() == EndOfPartitionEvent.class) {
            this.releaseDeserializer(bufferOrEvent.getChannelInfo());
        }
    }

    private void processBuffer(BufferOrEvent bufferOrEvent) throws IOException {
        this.lastChannel = bufferOrEvent.getChannelInfo();
        Preconditions.checkState((this.lastChannel != null ? 1 : 0) != 0);
        this.currentRecordDeserializer = this.recordDeserializers.get(this.lastChannel);
        Preconditions.checkState((this.currentRecordDeserializer != null ? 1 : 0) != 0, (Object)"currentRecordDeserializer has already been released");
        this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
    }

    @Override
    public int getInputIndex() {
        return this.inputIndex;
    }

    public CompletableFuture<?> getAvailableFuture() {
        if (this.currentRecordDeserializer != null) {
            return AVAILABLE;
        }
        return this.checkpointedInputGate.getAvailableFuture();
    }

    @Override
    public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException {
        for (Map.Entry<InputChannelInfo, RecordDeserializer<DeserializationDelegate<StreamElement>>> e : this.recordDeserializers.entrySet()) {
            channelStateWriter.addInputData(checkpointId, e.getKey(), -2, e.getValue().getUnconsumedBuffer());
        }
        return this.checkpointedInputGate.getAllBarriersReceivedFuture(checkpointId);
    }

    @Override
    public void close() throws IOException {
        for (InputChannelInfo channelInfo : new HashSet<InputChannelInfo>(this.recordDeserializers.keySet())) {
            this.releaseDeserializer(channelInfo);
        }
        this.checkpointedInputGate.close();
    }

    private void releaseDeserializer(InputChannelInfo channelInfo) {
        RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer = this.recordDeserializers.get(channelInfo);
        if (deserializer != null) {
            Buffer buffer = deserializer.getCurrentBuffer();
            if (buffer != null && !buffer.isRecycled()) {
                buffer.recycleBuffer();
            }
            deserializer.clear();
            this.recordDeserializers.remove(channelInfo);
        }
    }
}

