/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierBehaviourController;
import org.apache.flink.util.Preconditions;

@Internal
public class AlignedController
implements CheckpointBarrierBehaviourController {
    private final CheckpointableInput[] inputs;
    private final Map<InputChannelInfo, Boolean> blockedChannels;

    public AlignedController(CheckpointableInput ... inputs) {
        this.inputs = inputs;
        this.blockedChannels = Arrays.stream(inputs).flatMap(gate -> gate.getChannelInfos().stream()).collect(Collectors.toMap(Function.identity(), info -> false));
    }

    @Override
    public void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) {
        Preconditions.checkState((this.blockedChannels.put(channelInfo, true) == false ? 1 : 0) != 0, (Object)("Stream corrupt: Repeated barrier for same checkpoint on input " + channelInfo));
        CheckpointableInput input = this.inputs[channelInfo.getGateIdx()];
        input.blockConsumption(channelInfo);
    }

    @Override
    public boolean preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) {
        return false;
    }

    @Override
    public boolean postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException {
        this.resumeConsumption();
        return true;
    }

    @Override
    public void abortPendingCheckpoint(long cancelledId, CheckpointException exception) throws IOException {
        this.resumeConsumption();
    }

    @Override
    public void obsoleteBarrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException {
        this.resumeConsumption(channelInfo);
    }

    private void resumeConsumption() throws IOException {
        for (Map.Entry<InputChannelInfo, Boolean> blockedChannel : this.blockedChannels.entrySet()) {
            if (blockedChannel.getValue().booleanValue()) {
                this.resumeConsumption(blockedChannel.getKey());
            }
            blockedChannel.setValue(false);
        }
    }

    private void resumeConsumption(InputChannelInfo channelInfo) throws IOException {
        CheckpointableInput input = this.inputs[channelInfo.getGateIdx()];
        input.resumeConsumption(channelInfo);
    }
}

