package org.apache.flink.runtime.jobmaster;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.event.ExecutionVertexFailoverEvent;
import org.apache.flink.runtime.event.ExecutionVertexStateChangedEvent;
import org.apache.flink.runtime.event.ResultPartitionConsumableEvent;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionStatusListener;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.ExecutionVertexID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.failover.ExecutionGraphOperationLog;
import org.apache.flink.runtime.jobmaster.failover.FailoverOperationLog;
import org.apache.flink.runtime.jobmaster.failover.InputSplitsOperationLog;
import org.apache.flink.runtime.jobmaster.failover.OperationLog;
import org.apache.flink.runtime.jobmaster.failover.OperationLogManager;
import org.apache.flink.runtime.jobmaster.failover.Replayable;
import org.apache.flink.runtime.jobmaster.failover.ResultDescriptor;
import org.apache.flink.runtime.jobmaster.failover.ResultPartitionOperationLog;
import org.apache.flink.runtime.schedule.ExecutionVertexStatus;
import org.apache.flink.runtime.schedule.GraphManagerPlugin;
import org.apache.flink.runtime.schedule.ResultPartitionStatus;
import org.apache.flink.runtime.schedule.SchedulingConfig;
import org.apache.flink.runtime.schedule.VertexScheduler;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/GraphManager.class */
public class GraphManager implements Replayable, ExecutionStatusListener {
    static final Logger LOG = LoggerFactory.getLogger(GraphManager.class);
    private final GraphManagerPlugin graphManagerPlugin;
    private final OperationLogManager operationLogManager;
    private final ExecutionGraph executionGraph;
    private final List<Collection<ExecutionVertexID>> executionVerticesToBeScheduled = new LinkedList();
    private volatile boolean isReconciling;

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/GraphManager$ExecutionGraphVertexScheduler.class */
    public class ExecutionGraphVertexScheduler implements VertexScheduler {
        public ExecutionGraphVertexScheduler() {
        }

        @Override // org.apache.flink.runtime.schedule.VertexScheduler
        public void scheduleExecutionVertices(Collection<ExecutionVertexID> collection) {
            synchronized (GraphManager.this.executionVerticesToBeScheduled) {
                if (GraphManager.this.isReconciling) {
                    GraphManager.this.executionVerticesToBeScheduled.add(collection);
                } else {
                    GraphManager.this.executionGraph.scheduleVertices(collection);
                }
            }
        }

        @Override // org.apache.flink.runtime.schedule.VertexScheduler
        public ExecutionVertexStatus getExecutionVertexStatus(ExecutionVertexID executionVertexID) {
            Preconditions.checkNotNull(executionVertexID);
            ExecutionJobVertex jobVertex = GraphManager.this.executionGraph.getJobVertex(executionVertexID.getJobVertexID());
            if (jobVertex == null) {
                throw new IllegalArgumentException("Cannot find any vertex with id " + executionVertexID.getJobVertexID());
            }
            return jobVertex.getTaskVertices()[executionVertexID.getSubTaskIndex()].getCurrentStatus();
        }

        @Override // org.apache.flink.runtime.schedule.VertexScheduler
        public ResultPartitionStatus getResultPartitionStatus(IntermediateDataSetID intermediateDataSetID, int i) {
            Preconditions.checkNotNull(intermediateDataSetID);
            IntermediateResult intermediateResult = GraphManager.this.executionGraph.getAllIntermediateResults().get(intermediateDataSetID);
            if (intermediateResult == null) {
                throw new IllegalArgumentException("Cannot find any result with id " + intermediateDataSetID);
            }
            return new ResultPartitionStatus(intermediateDataSetID, i, intermediateResult.getPartitions()[i].isConsumable());
        }

        @Override // org.apache.flink.runtime.schedule.VertexScheduler
        public double getResultConsumablePartitionRatio(IntermediateDataSetID intermediateDataSetID) {
            Preconditions.checkNotNull(intermediateDataSetID);
            IntermediateResult intermediateResult = GraphManager.this.executionGraph.getAllIntermediateResults().get(intermediateDataSetID);
            if (intermediateResult == null) {
                throw new IllegalArgumentException("Cannot find any result with id " + intermediateDataSetID);
            }
            return intermediateResult.getResultConsumablePartitionRatio();
        }
    }

    public GraphManager(GraphManagerPlugin graphManagerPlugin, JobMasterGateway jobMasterGateway, OperationLogManager operationLogManager, ExecutionGraph executionGraph) {
        this.graphManagerPlugin = (GraphManagerPlugin) Preconditions.checkNotNull(graphManagerPlugin);
        this.operationLogManager = (OperationLogManager) Preconditions.checkNotNull(operationLogManager);
        this.executionGraph = (ExecutionGraph) Preconditions.checkNotNull(executionGraph);
    }

    public void open(JobGraph jobGraph, SchedulingConfig schedulingConfig) {
        this.graphManagerPlugin.open(new ExecutionGraphVertexScheduler(), jobGraph, schedulingConfig);
    }

    public void dispose() {
        this.graphManagerPlugin.close();
    }

    public void reset() {
        this.graphManagerPlugin.reset();
        this.operationLogManager.stop();
        this.operationLogManager.clear();
        this.operationLogManager.start();
    }

    public boolean isReplaying() {
        return this.operationLogManager.isReplaying();
    }

    public boolean isReconciling() {
        return this.isReconciling;
    }

    public void enterReconcile() {
        this.isReconciling = true;
    }

    public void leaveReconcile() {
        synchronized (this.executionVerticesToBeScheduled) {
            Iterator<Collection<ExecutionVertexID>> it = this.executionVerticesToBeScheduled.iterator();
            while (it.hasNext()) {
                this.executionGraph.scheduleVertices(it.next());
            }
            this.isReconciling = false;
        }
    }

    public boolean allowLazyDeployment() {
        return this.graphManagerPlugin.allowLazyDeployment();
    }

    public void startScheduling() {
        LOG.info("Start scheduling execution graph with graph manager plugin: {}", this.graphManagerPlugin.getClass().getName());
        this.graphManagerPlugin.onSchedulingStarted();
    }

    public void notifyExecutionVertexFailover(List<ExecutionVertexID> list) {
        if (!this.operationLogManager.isReplaying()) {
            this.operationLogManager.writeOpLog(new FailoverOperationLog(list));
        }
        this.graphManagerPlugin.onExecutionVertexFailover(new ExecutionVertexFailoverEvent(list));
    }

    public void notifyResultPartitionConsumable(ExecutionVertexID executionVertexID, IntermediateDataSetID intermediateDataSetID, int i, TaskManagerLocation taskManagerLocation) {
        if (!this.operationLogManager.isReplaying() && this.executionGraph.getAllIntermediateResults().get(intermediateDataSetID).getResultType().isPipelined()) {
            this.operationLogManager.writeOpLog(new ResultPartitionOperationLog(executionVertexID, intermediateDataSetID, taskManagerLocation));
        }
        this.graphManagerPlugin.onResultPartitionConsumable(new ResultPartitionConsumableEvent(intermediateDataSetID, i));
    }

    public void notifyInputSplitsCreated(JobVertexID jobVertexID, Map<OperatorID, InputSplit[]> map) {
        if (this.operationLogManager.isReplaying()) {
            return;
        }
        this.operationLogManager.writeOpLog(new InputSplitsOperationLog(jobVertexID, map));
    }

    public boolean reconcileExecutionVertex(JobVertexID jobVertexID, int i, ExecutionState executionState, ExecutionAttemptID executionAttemptID, int i2, long j, ResultPartitionID[] resultPartitionIDArr, boolean[] zArr, Map<OperatorID, List<InputSplit>> map, LogicalSlot logicalSlot) {
        ExecutionJobVertex jobVertex = this.executionGraph.getJobVertex(jobVertexID);
        if (jobVertex == null || jobVertex.getParallelism() <= i) {
            LOG.info("Can not find the execution vertex {}_{}", jobVertexID, Integer.valueOf(i));
            return false;
        }
        try {
            return jobVertex.getTaskVertices()[i].reconcileExecution(executionState, executionAttemptID, i2, j, resultPartitionIDArr, zArr, map, logicalSlot);
        } catch (Throwable th) {
            LOG.info("Fail to reconcile vertex {}_{}.", new Object[]{jobVertexID, Integer.valueOf(i), th});
            return false;
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.failover.Replayable
    public void replayOpLog(OperationLog operationLog) {
        Preconditions.checkArgument(this.isReconciling && JobStatus.CREATED.equals(this.executionGraph.getState()), "Job is in " + this.executionGraph.getState() + " while replaying log.");
        if (operationLog instanceof ExecutionGraphOperationLog) {
            ExecutionGraphOperationLog executionGraphOperationLog = (ExecutionGraphOperationLog) operationLog;
            this.executionGraph.getJobVertex(executionGraphOperationLog.getExecutionVertexID().getJobVertexID()).getTaskVertices()[executionGraphOperationLog.getExecutionVertexID().getSubTaskIndex()].recoverStatus(executionGraphOperationLog.getExecutionState(), executionGraphOperationLog.getConsumedInputs(), executionGraphOperationLog.getResultDescriptor());
            return;
        }
        if (operationLog instanceof ResultPartitionOperationLog) {
            ResultPartitionOperationLog resultPartitionOperationLog = (ResultPartitionOperationLog) operationLog;
            this.executionGraph.getJobVertex(resultPartitionOperationLog.getExecutionVertexID().getJobVertexID()).getTaskVertices()[resultPartitionOperationLog.getExecutionVertexID().getSubTaskIndex()].recoverResultPartitionStatus(resultPartitionOperationLog.getResultID(), resultPartitionOperationLog.getLocation());
            return;
        }
        if (!(operationLog instanceof FailoverOperationLog)) {
            if (!(operationLog instanceof InputSplitsOperationLog)) {
                throw new FlinkRuntimeException("Unsupported operation log " + operationLog);
            }
            InputSplitsOperationLog inputSplitsOperationLog = (InputSplitsOperationLog) operationLog;
            try {
                this.executionGraph.getAllVertices().get(inputSplitsOperationLog.getJobVertexID()).setUpInputSplits(inputSplitsOperationLog.getInputSplitsMap());
                return;
            } catch (Exception e) {
                throw new FlinkRuntimeException("Fail to set up input splits of vertex", e);
            }
        }
        List<ExecutionVertexID> executionVertexIDs = ((FailoverOperationLog) operationLog).getExecutionVertexIDs();
        ArrayList arrayList = new ArrayList(executionVertexIDs.size());
        for (ExecutionVertexID executionVertexID : executionVertexIDs) {
            arrayList.add(this.executionGraph.getJobVertex(executionVertexID.getJobVertexID()).getTaskVertices()[executionVertexID.getSubTaskIndex()]);
        }
        try {
            Iterator<Collection<ExecutionVertexID>> it = this.executionVerticesToBeScheduled.iterator();
            while (it.hasNext()) {
                it.next().removeAll(arrayList);
            }
            this.executionGraph.resetExecutionVerticesAndNotify(this.executionGraph.getGlobalModVersion(), arrayList);
        } catch (Exception e2) {
            throw new FlinkRuntimeException("Fail to reset execution vertex", e2);
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionStatusListener
    public void executionStatusChanged(JobID jobID, JobVertexID jobVertexID, String str, int i, int i2, ExecutionAttemptID executionAttemptID, ExecutionState executionState, long j, String str2) {
        ExecutionVertexID executionVertexID = new ExecutionVertexID(jobVertexID, i2);
        Map<OperatorID, List<InputSplit>> map = null;
        ResultDescriptor resultDescriptor = null;
        Map<String, Accumulator<?, ?>> map2 = null;
        IOMetrics iOMetrics = null;
        switch (executionState) {
            case FINISHED:
                ExecutionVertex executionVertex = this.executionGraph.getJobVertex(jobVertexID).getTaskVertices()[i2];
                map = executionVertex.getAssignedInputSplits();
                ResultPartitionID[] resultPartitionIDArr = new ResultPartitionID[executionVertex.getProducedPartitions().size()];
                int i3 = 0;
                Iterator<IntermediateResultPartitionID> it = executionVertex.getProducedPartitions().keySet().iterator();
                while (it.hasNext()) {
                    int i4 = i3;
                    i3++;
                    resultPartitionIDArr[i4] = new ResultPartitionID(it.next(), executionVertex.getCurrentExecutionAttempt().getAttemptId());
                }
                resultDescriptor = new ResultDescriptor(executionVertex.getCurrentAssignedResourceLocation(), resultPartitionIDArr);
                map2 = executionVertex.getCurrentExecutionAttempt().getUserAccumulators();
                iOMetrics = executionVertex.getCurrentExecutionAttempt().getIOMetrics();
                break;
            case RUNNING:
            case DEPLOYING:
                break;
            default:
                return;
        }
        if (!this.operationLogManager.isReplaying()) {
            this.operationLogManager.writeOpLog(new ExecutionGraphOperationLog(executionVertexID, executionState, map, resultDescriptor, map2, iOMetrics));
        }
        this.graphManagerPlugin.onExecutionVertexStateChanged(new ExecutionVertexStateChangedEvent(executionVertexID, executionState));
    }
}
