package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.FormatUtil;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.MultiInputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.OperatorDescriptor;
import org.apache.flink.runtime.jobgraph.OperatorEdgeDescriptor;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.DamBehavior;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.util.CursorableLinkedList;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.ArbitraryInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.streaming.runtime.tasks.StreamTaskConfig;
import org.apache.flink.streaming.runtime.tasks.StreamTaskConfigCache;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.class */
public class StreamingJobGraphGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
    private static final long DEFAULT_RESTART_DELAY = 0;
    public static final String JOB_VERTEX_TO_STREAM_NODE_MAP = "jobVertexToStreamNodeMap";
    private final StreamGraph streamGraph;
    private final JobGraph jobGraph;
    private final StreamGraphHasher defaultStreamGraphHasher = new StreamGraphHasherV2();
    private final List<StreamGraphHasher> legacyStreamGraphHashers = Collections.singletonList(new StreamGraphUserHashHasher());
    private final Map<Integer, JobVertex> nodeToJobVertexMap = new HashMap();
    private final List<StreamEdge> transitiveOutEdges = new ArrayList();
    private final Map<Integer, List<Integer>> chainedNodeIdsMap = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$api$common$ExecutionMode;
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$io$network$DataExchangeMode = new int[DataExchangeMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$io$network$DataExchangeMode[DataExchangeMode.AUTO.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$io$network$DataExchangeMode[DataExchangeMode.PIPELINED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$io$network$DataExchangeMode[DataExchangeMode.BATCH.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$io$network$DataExchangeMode[DataExchangeMode.PIPELINE_WITH_BATCH_FALLBACK.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$org$apache$flink$api$common$ExecutionMode = new int[ExecutionMode.values().length];
            try {
                $SwitchMap$org$apache$flink$api$common$ExecutionMode[ExecutionMode.PIPELINED.ordinal()] = 1;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$ExecutionMode[ExecutionMode.BATCH.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator$ChainingStreamNode.class */
    public static class ChainingStreamNode {
        private final Integer nodeId;
        private final int inEdgeCnt;
        private final int topologicalOrder;
        private final int layerNumber;
        private int depthFirstNumber;
        private Map<StreamNode.ReadPriority, Set<Integer>> downPriorityMap = new HashMap();
        private Map<Integer, StreamNode.ReadPriority> readPriorityMap = new HashMap();
        private Map<StreamNode.ReadPriority, Integer> priorityInEdgeNumMap = new HashMap();
        private Integer coarsenedId;
        private Set<Integer> chainableToSet;
        private Boolean allowMultiHeadChaining;

        ChainingStreamNode(Integer num, int i, int i2, int i3) {
            this.nodeId = num;
            this.inEdgeCnt = i;
            this.topologicalOrder = i2;
            this.layerNumber = i3;
        }

        int getNodeId() {
            return this.nodeId.intValue();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public int getTopologicalOrder() {
            return this.topologicalOrder;
        }

        int getLayerNumber() {
            return this.layerNumber;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public int getDepthFirstNumber() {
            return this.depthFirstNumber;
        }

        void setDepthFirstNumber(int i) {
            this.depthFirstNumber = i;
        }

        StreamNode.ReadPriority getTransitivePriority() {
            StreamNode.ReadPriority readPriority;
            if (isDownPriorityConflicting()) {
                readPriority = StreamNode.ReadPriority.DYNAMIC;
            } else if (this.downPriorityMap.size() == 1) {
                readPriority = this.downPriorityMap.keySet().iterator().next();
            } else {
                if (this.downPriorityMap.size() != 0) {
                    throw new IllegalStateException("This is an internal error.");
                }
                readPriority = null;
            }
            return readPriority;
        }

        boolean isDownPriorityConflicting() {
            return this.downPriorityMap.size() > 1 || this.downPriorityMap.getOrDefault(StreamNode.ReadPriority.DYNAMIC, Collections.EMPTY_SET).size() > 1;
        }

        Set<Integer> getDownPriorityNodes(StreamNode.ReadPriority readPriority) {
            return this.downPriorityMap.get(readPriority);
        }

        void setDownPriority(Integer num, StreamNode.ReadPriority readPriority) {
            Preconditions.checkState(readPriority != null);
            this.downPriorityMap.computeIfAbsent(readPriority, readPriority2 -> {
                return new HashSet();
            }).add(num);
        }

        StreamNode.ReadPriority getReadPriority(Integer num) {
            return this.readPriorityMap.get(num);
        }

        boolean isReadPriorityConflicting() {
            return this.priorityInEdgeNumMap.size() > 1 || this.priorityInEdgeNumMap.getOrDefault(StreamNode.ReadPriority.DYNAMIC, 0).intValue() > 1;
        }

        void setReadPriority(Integer num, StreamNode.ReadPriority readPriority) {
            Preconditions.checkState(readPriority != null);
            this.readPriorityMap.put(num, readPriority);
            this.priorityInEdgeNumMap.put(readPriority, Integer.valueOf(this.priorityInEdgeNumMap.getOrDefault(readPriority, 0).intValue() + 1));
        }

        Integer getCoarsenedId() {
            return this.coarsenedId;
        }

        void setCoarsenedId(Integer num) {
            this.coarsenedId = num;
        }

        boolean isChainHeadNode() {
            if (this.inEdgeCnt != 0) {
                if (this.inEdgeCnt <= (this.chainableToSet == null ? 0 : this.chainableToSet.size())) {
                    return false;
                }
            }
            return true;
        }

        boolean isChainTo(Integer num) {
            return this.chainableToSet != null && this.chainableToSet.contains(num);
        }

        void setAllowMultiHeadChaining(Boolean bool) {
            Preconditions.checkState(this.allowMultiHeadChaining == null || this.allowMultiHeadChaining == bool, "The flag allowMultiHeadChaining can not be changed (nodeId: %s).", new Object[]{this.nodeId});
            this.allowMultiHeadChaining = bool;
        }

        void chainTo(ChainingStreamNode chainingStreamNode, StreamEdge streamEdge, StreamNode streamNode, StreamNode streamNode2, boolean z, boolean z2, ExecutionMode executionMode) {
            if (!((chainingStreamNode.allowMultiHeadChaining.booleanValue() && z) ? isChainableOnMultiHeadMode(streamEdge, streamNode, streamNode2, z2, executionMode) : isChainable(streamEdge, streamNode, streamNode2, z2, executionMode))) {
                setAllowMultiHeadChaining(Boolean.TRUE);
            } else {
                addChainableToNode(chainingStreamNode.nodeId);
                setAllowMultiHeadChaining(chainingStreamNode.allowMultiHeadChaining);
            }
        }

        private void addChainableToNode(Integer num) {
            if (this.chainableToSet == null) {
                this.chainableToSet = new HashSet();
            }
            this.chainableToSet.add(num);
        }

        void removeChainableToNode(Integer num) {
            if (this.chainableToSet != null) {
                this.chainableToSet.remove(num);
            }
        }

        private boolean isChainable(StreamEdge streamEdge, StreamNode streamNode, StreamNode streamNode2, boolean z, ExecutionMode executionMode) {
            return streamNode2.getInEdges().size() == 1 && isChainableOnMultiHeadMode(streamEdge, streamNode, streamNode2, z, executionMode);
        }

        private boolean isChainableOnMultiHeadMode(StreamEdge streamEdge, StreamNode streamNode, StreamNode streamNode2, boolean z, ExecutionMode executionMode) {
            StreamOperator<?> operator = streamNode2.getOperator();
            StreamOperator<?> operator2 = streamNode.getOperator();
            return operator != null && operator2 != null && streamNode2.isSameSlotSharingGroup(streamNode) && operator.getChainingStrategy() == ChainingStrategy.ALWAYS && (operator2.getChainingStrategy() == ChainingStrategy.HEAD || operator2.getChainingStrategy() == ChainingStrategy.ALWAYS) && (((streamEdge.getPartitioner() instanceof ForwardPartitioner) || (streamNode2.getParallelism() == 1 && z)) && streamNode2.getParallelism() == streamNode.getParallelism() && streamEdge.getDataExchangeMode() != DataExchangeMode.BATCH);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator$CoarsenedNode.class */
    public static class CoarsenedNode {
        private final Integer id;
        private final Set<Integer> originalNodes = new HashSet();
        private final Map<Integer, Boolean> predecessorsNodes = new HashMap();
        private final Set<Integer> sucessorNodes = new HashSet();

        public CoarsenedNode(StreamNode streamNode, ChainingStreamNode chainingStreamNode) {
            this.id = Integer.valueOf(streamNode.getId());
            this.originalNodes.add(Integer.valueOf(streamNode.getId()));
            Iterator<StreamEdge> it = streamNode.getInEdges().iterator();
            while (it.hasNext()) {
                Integer valueOf = Integer.valueOf(it.next().getSourceId());
                this.predecessorsNodes.put(valueOf, Boolean.valueOf(chainingStreamNode.isChainTo(valueOf)));
            }
            Iterator<StreamEdge> it2 = streamNode.getOutEdges().iterator();
            while (it2.hasNext()) {
                this.sucessorNodes.add(Integer.valueOf(it2.next().getTargetId()));
            }
        }

        public Integer getId() {
            return this.id;
        }

        public Set<Integer> getOriginalNodes() {
            return this.originalNodes;
        }

        public Map<Integer, Boolean> getPredecessorsNodes() {
            return this.predecessorsNodes;
        }

        public Set<Integer> getSucessorNodes() {
            return this.sucessorNodes;
        }

        public void merge(CoarsenedNode coarsenedNode, Map<Integer, CoarsenedNode> map) {
            this.originalNodes.addAll(coarsenedNode.originalNodes);
            Iterator<Map.Entry<Integer, Boolean>> it = coarsenedNode.predecessorsNodes.entrySet().iterator();
            while (it.hasNext()) {
                Integer key = it.next().getKey();
                this.predecessorsNodes.put(key, Boolean.valueOf(coarsenedNode.predecessorsNodes.get(key).booleanValue() && this.predecessorsNodes.getOrDefault(key, Boolean.TRUE).booleanValue()));
                Set<Integer> set = map.get(key).sucessorNodes;
                set.remove(coarsenedNode.id);
                set.add(this.id);
            }
            for (Integer num : coarsenedNode.sucessorNodes) {
                this.sucessorNodes.add(num);
                Map<Integer, Boolean> map2 = map.get(num).predecessorsNodes;
                map2.put(this.id, Boolean.valueOf(map2.get(coarsenedNode.id).booleanValue() && map2.getOrDefault(this.id, Boolean.TRUE).booleanValue()));
                map2.remove(coarsenedNode.id);
            }
            this.predecessorsNodes.remove(this.id);
            this.sucessorNodes.remove(this.id);
            this.predecessorsNodes.remove(coarsenedNode.id);
            this.sucessorNodes.remove(coarsenedNode.id);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator$CreatingChainIntermediateStorager.class */
    public static class CreatingChainIntermediateStorager {
        ResourceSpec chainedMinResources;
        ResourceSpec chainedPreferredResources;
        JobVertex createdVertex;
        final Set<Integer> allBuiltNodes = new HashSet();
        final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        final Map<Integer, StreamConfig> chainedConfigMap = new HashMap();
        final List<Integer> chainedHeadNodeIdsInOrder = new ArrayList();
        final List<StreamEdge> chainInEdgesInOrder = new ArrayList();
        final List<StreamEdge> chainOutEdgesInOrder = new ArrayList();
        final Map<OperatorID, InputFormat> chainInputFormatMap = new HashMap();
        final Map<OperatorID, OutputFormat> chainOutputFormatMap = new HashMap();
        final Map<Integer, String> chainedNameMap = new HashMap();
        final StreamTaskConfigCache vertexConfigCache = new StreamTaskConfigCache(this.classLoader);
        final List<Integer> chainedNodeIdsInOrder = new ArrayList();

        CreatingChainIntermediateStorager() {
        }

        void resetForNewChain() {
            this.chainedConfigMap.clear();
            this.chainedHeadNodeIdsInOrder.clear();
            this.chainInEdgesInOrder.clear();
            this.chainOutEdgesInOrder.clear();
            this.chainInputFormatMap.clear();
            this.chainOutputFormatMap.clear();
            this.chainedNameMap.clear();
            this.chainedMinResources = null;
            this.chainedPreferredResources = null;
            this.createdVertex = null;
            this.vertexConfigCache.clear();
            this.chainedNodeIdsInOrder.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator$SequenceGenerator.class */
    public static class SequenceGenerator {
        private int sequence = 0;

        SequenceGenerator() {
        }

        public int get() {
            int i = this.sequence;
            this.sequence = i + 1;
            return i;
        }

        public int last() {
            return this.sequence;
        }
    }

    public static JobGraph createJobGraph(StreamGraph streamGraph) {
        return new StreamingJobGraphGenerator(streamGraph).createJobGraph();
    }

    private StreamingJobGraphGenerator(StreamGraph streamGraph) {
        this.streamGraph = streamGraph;
        this.jobGraph = new JobGraph(streamGraph.getJobName());
    }

    private JobGraph createJobGraph() {
        this.jobGraph.addCustomConfiguration(this.streamGraph.getCustomConfiguration());
        Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes = this.defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(this.streamGraph);
        ArrayList arrayList = new ArrayList(this.legacyStreamGraphHashers.size());
        Iterator<StreamGraphHasher> it = this.legacyStreamGraphHashers.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().traverseStreamGraphAndGenerateHashes(this.streamGraph));
        }
        setChaining(traverseStreamGraphAndGenerateHashes, arrayList);
        connectEdges();
        setSlotSharing();
        configureCheckpointing();
        setSchedulerConfiguration();
        for (Tuple2<String, DistributedCache.DistributedCacheEntry> tuple2 : this.streamGraph.getCachedFiles()) {
            this.jobGraph.addUserArtifact((String) tuple2.f0, (DistributedCache.DistributedCacheEntry) tuple2.f1);
        }
        try {
            this.jobGraph.setExecutionConfig(this.streamGraph.getExecutionConfig());
            return this.jobGraph;
        } catch (IOException e) {
            throw new IllegalConfigurationException("Could not serialize the ExecutionConfig.This indicates that non-serializable types (like custom serializers) were registered");
        }
    }

    private void setSchedulerConfiguration() {
        Configuration schedulingConfiguration = this.jobGraph.getSchedulingConfiguration();
        setVertexToStreamNodesMap(schedulingConfiguration);
        schedulingConfiguration.addAll(this.streamGraph.getCustomConfiguration());
    }

    private void setVertexToStreamNodesMap(Configuration configuration) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<Integer, List<Integer>> entry : this.chainedNodeIdsMap.entrySet()) {
            hashMap.put(this.nodeToJobVertexMap.get(entry.getKey()).getID(), entry.getValue() == null ? Collections.emptyList() : entry.getValue());
        }
        try {
            InstantiationUtil.writeObjectToConfig(hashMap, configuration, JOB_VERTEX_TO_STREAM_NODE_MAP);
        } catch (IOException e) {
            throw new FlinkRuntimeException("Could not serialize job vertex to stream node map", e);
        }
    }

    private void setChaining(Map<Integer, byte[]> map, List<Map<Integer, byte[]>> list) {
        Map<Integer, ChainingStreamNode> map2;
        List list2 = (List) this.streamGraph.getSourceIDs().stream().sorted(Comparator.comparing(num -> {
            StreamOperator<?> operator = this.streamGraph.getStreamNode(num).getOperator();
            return Integer.valueOf((operator == null || (operator instanceof StreamSource)) ? 0 : 1);
        }).thenComparingInt(num2 -> {
            return num2.intValue();
        })).collect(Collectors.toList());
        List<ChainingStreamNode> sortTopologicalNodes = sortTopologicalNodes(this.streamGraph, list2);
        Map<Integer, ChainingStreamNode> map3 = (Map) sortTopologicalNodes.stream().collect(Collectors.toMap((v0) -> {
            return v0.getNodeId();
        }, chainingStreamNode -> {
            return chainingStreamNode;
        }));
        SequenceGenerator sequenceGenerator = new SequenceGenerator();
        HashSet hashSet = new HashSet();
        Iterator it = list2.iterator();
        while (it.hasNext()) {
            setDepthFirstNumber((Integer) it.next(), map3, this.streamGraph, sequenceGenerator, hashSet);
        }
        if (this.streamGraph.isChainingEnabled()) {
            splitChain(sortTopologicalNodes, map3);
            map2 = map3;
        } else {
            map2 = null;
        }
        CreatingChainIntermediateStorager creatingChainIntermediateStorager = new CreatingChainIntermediateStorager();
        Iterator<ChainingStreamNode> it2 = sortTopologicalNodes.iterator();
        while (it2.hasNext()) {
            Integer valueOf = Integer.valueOf(it2.next().getNodeId());
            if (createChain(valueOf, valueOf, new SequenceGenerator(), map2, map, list, creatingChainIntermediateStorager)) {
                Iterator<Integer> it3 = creatingChainIntermediateStorager.chainedNodeIdsInOrder.iterator();
                while (it3.hasNext()) {
                    this.nodeToJobVertexMap.put(it3.next(), creatingChainIntermediateStorager.createdVertex);
                }
                this.transitiveOutEdges.addAll(creatingChainIntermediateStorager.chainOutEdgesInOrder);
                this.chainedNodeIdsMap.put(valueOf, new ArrayList(creatingChainIntermediateStorager.chainedNodeIdsInOrder));
                this.jobGraph.addVertex(creatingChainIntermediateStorager.createdVertex);
            }
            creatingChainIntermediateStorager.resetForNewChain();
        }
        this.transitiveOutEdges.sort(Comparator.comparingInt(streamEdge -> {
            return ((ChainingStreamNode) map3.get(Integer.valueOf(streamEdge.getTargetId()))).getDepthFirstNumber();
        }).thenComparingInt(streamEdge2 -> {
            return this.streamGraph.getStreamNode(Integer.valueOf(streamEdge2.getTargetId())).getInEdges().indexOf(streamEdge2);
        }));
    }

    static List<ChainingStreamNode> sortTopologicalNodes(StreamGraph streamGraph, List<Integer> list) {
        boolean z;
        ArrayList arrayList = new ArrayList();
        ArrayDeque arrayDeque = new ArrayDeque();
        HashMap hashMap = new HashMap();
        for (Integer num : list) {
            arrayDeque.add(num);
            hashMap.put(num, new Integer[]{0, -1});
        }
        int i = -1;
        SequenceGenerator sequenceGenerator = new SequenceGenerator();
        HashSet hashSet = new HashSet();
        do {
            int size = arrayDeque.size();
            if (size <= 0) {
                break;
            }
            i++;
            int i2 = i % 2;
            int i3 = (i + 1) % 2;
            hashSet.clear();
            z = false;
            for (int i4 = 0; i4 < size; i4++) {
                Integer num2 = (Integer) arrayDeque.pollFirst();
                Integer num3 = ((Integer[]) hashMap.get(num2))[i2];
                if (num3.intValue() == 0) {
                    StreamNode streamNode = streamGraph.getStreamNode(num2);
                    arrayList.add(new ChainingStreamNode(num2, streamNode.getInEdges().size(), sequenceGenerator.get(), i));
                    Iterator<StreamEdge> it = streamNode.getOutEdges().iterator();
                    while (it.hasNext()) {
                        Integer valueOf = Integer.valueOf(it.next().getTargetId());
                        if (hashSet.contains(valueOf)) {
                            Integer[] numArr = (Integer[]) hashMap.get(valueOf);
                            Integer num4 = numArr[i3];
                            numArr[i3] = Integer.valueOf(numArr[i3].intValue() - 1);
                        } else {
                            arrayDeque.addLast(valueOf);
                            Integer[] numArr2 = (Integer[]) hashMap.get(valueOf);
                            if (numArr2 == null) {
                                Integer[] numArr3 = new Integer[2];
                                numArr3[0] = -1;
                                numArr3[1] = -1;
                                numArr3[i3] = Integer.valueOf(streamGraph.getStreamNode(valueOf).getInEdges().size() - 1);
                                hashMap.put(valueOf, numArr3);
                            } else {
                                numArr2[i3] = Integer.valueOf(numArr2[i2].intValue() - 1);
                            }
                            hashSet.add(valueOf);
                        }
                    }
                    hashMap.remove(num2);
                    z = true;
                } else {
                    if (num3.intValue() <= 0) {
                        throw new RuntimeException("remainingInputNum for the node (id: " + num2 + ") should be greater than 0");
                    }
                    if (!hashSet.contains(num2)) {
                        arrayDeque.addLast(num2);
                        Integer[] numArr4 = (Integer[]) hashMap.get(num2);
                        numArr4[i3] = numArr4[i2];
                        hashSet.add(num2);
                    }
                }
            }
        } while (z);
        if (arrayList.size() < streamGraph.getStreamNodes().size()) {
            throw new RuntimeException("The stream graph is cyclic.");
        }
        return arrayList;
    }

    static void setDepthFirstNumber(Integer num, Map<Integer, ChainingStreamNode> map, StreamGraph streamGraph, SequenceGenerator sequenceGenerator, Set<Integer> set) {
        if (set.contains(num)) {
            return;
        }
        set.add(num);
        Iterator<StreamEdge> it = streamGraph.getStreamNode(num).getOutEdges().iterator();
        while (it.hasNext()) {
            setDepthFirstNumber(Integer.valueOf(it.next().getTargetId()), map, streamGraph, sequenceGenerator, set);
        }
        map.get(num).setDepthFirstNumber(sequenceGenerator.get());
    }

    private void splitChain(List<ChainingStreamNode> list, Map<Integer, ChainingStreamNode> map) {
        splitUpInitialChains(map, list, this.streamGraph);
        if (this.streamGraph.isMultiHeadChainMode()) {
            breakOffChainForNoDeadlock(map, list, this.streamGraph);
            breakOffChainForAcyclicJobGraph(map, list, this.streamGraph);
        }
    }

    static void splitUpInitialChains(Map<Integer, ChainingStreamNode> map, List<ChainingStreamNode> list, StreamGraph streamGraph) {
        for (ChainingStreamNode chainingStreamNode : list) {
            StreamNode streamNode = streamGraph.getStreamNode(Integer.valueOf(chainingStreamNode.getNodeId()));
            if (streamNode.getInEdges().size() == 0) {
                StreamOperator<?> operator = streamNode.getOperator();
                chainingStreamNode.setAllowMultiHeadChaining((operator == null || (operator instanceof StreamSource)) ? Boolean.FALSE : Boolean.TRUE);
            }
            for (StreamEdge streamEdge : streamNode.getOutEdges()) {
                map.get(Integer.valueOf(streamEdge.getTargetId())).chainTo(chainingStreamNode, streamEdge, streamGraph.getStreamNode(Integer.valueOf(streamEdge.getSourceId())), streamGraph.getStreamNode(Integer.valueOf(streamEdge.getTargetId())), streamGraph.isMultiHeadChainMode(), streamGraph.isChainEagerlyEnabled(), streamGraph.getExecutionConfig().getExecutionMode());
            }
        }
    }

    static void breakOffChainForNoDeadlock(Map<Integer, ChainingStreamNode> map, List<ChainingStreamNode> list, StreamGraph streamGraph) {
        inferReadPriority(map, list, streamGraph);
        for (ChainingStreamNode chainingStreamNode : list) {
            StreamNode streamNode = streamGraph.getStreamNode(Integer.valueOf(chainingStreamNode.getNodeId()));
            if (chainingStreamNode.isReadPriorityConflicting()) {
                for (StreamEdge streamEdge : streamNode.getInEdges()) {
                    Integer valueOf = Integer.valueOf(streamEdge.getSourceId());
                    if (!StreamNode.ReadPriority.HIGHER.equals(chainingStreamNode.getReadPriority(valueOf)) && needToBreakOffChain(streamEdge, map, streamGraph)) {
                        chainingStreamNode.removeChainableToNode(valueOf);
                    }
                }
            }
        }
    }

    static void inferReadPriority(Map<Integer, ChainingStreamNode> map, List<ChainingStreamNode> list, StreamGraph streamGraph) {
        ArrayList arrayList = new ArrayList();
        for (int size = list.size() - 1; size >= 0; size--) {
            ChainingStreamNode chainingStreamNode = list.get(size);
            Integer valueOf = Integer.valueOf(chainingStreamNode.getNodeId());
            StreamNode streamNode = streamGraph.getStreamNode(valueOf);
            for (StreamEdge streamEdge : streamNode.getInEdges()) {
                if (streamEdge.getDataExchangeMode() == DataExchangeMode.BATCH) {
                    arrayList.add(streamEdge);
                } else {
                    Integer valueOf2 = Integer.valueOf(streamEdge.getSourceId());
                    ChainingStreamNode chainingStreamNode2 = map.get(valueOf2);
                    StreamNode.ReadPriority readPriorityHint = streamNode.getReadPriorityHint(streamEdge);
                    if (readPriorityHint == null) {
                        readPriorityHint = chainingStreamNode.getTransitivePriority();
                    } else {
                        StreamNode.ReadPriority transitivePriority = chainingStreamNode.getTransitivePriority();
                        if (transitivePriority != null && !readPriorityHint.equals(transitivePriority)) {
                            readPriorityHint = StreamNode.ReadPriority.DYNAMIC;
                        }
                    }
                    if (readPriorityHint != null) {
                        chainingStreamNode.setReadPriority(valueOf2, readPriorityHint);
                        chainingStreamNode2.setDownPriority(valueOf, readPriorityHint);
                    } else {
                        arrayList.add(streamEdge);
                    }
                }
            }
        }
        for (int i = 0; i < arrayList.size(); i++) {
            StreamEdge streamEdge2 = (StreamEdge) arrayList.get(i);
            Integer valueOf3 = Integer.valueOf(streamEdge2.getSourceId());
            Integer valueOf4 = Integer.valueOf(streamEdge2.getTargetId());
            ChainingStreamNode chainingStreamNode3 = map.get(valueOf3);
            ChainingStreamNode chainingStreamNode4 = map.get(valueOf4);
            StreamNode.ReadPriority transitivePriority2 = chainingStreamNode3.getTransitivePriority();
            if (transitivePriority2 == null) {
                transitivePriority2 = StreamNode.ReadPriority.HIGHER;
            }
            chainingStreamNode4.setReadPriority(valueOf3, transitivePriority2);
            chainingStreamNode3.setDownPriority(valueOf4, transitivePriority2);
        }
    }

    private static boolean needToBreakOffChain(StreamEdge streamEdge, Map<Integer, ChainingStreamNode> map, StreamGraph streamGraph) {
        boolean z = !map.get(Integer.valueOf(streamEdge.getTargetId())).isChainTo(Integer.valueOf(streamEdge.getSourceId()));
        boolean z2 = false;
        boolean z3 = false;
        boolean z4 = false;
        ArrayDeque arrayDeque = new ArrayDeque();
        arrayDeque.addLast(new ImmutablePair(streamEdge, Boolean.valueOf(z)));
        int i = z ? 1 : 0;
        while (true) {
            if (arrayDeque.size() <= 0) {
                break;
            }
            if (!z2 && i == arrayDeque.size()) {
                z2 = true;
            }
            Pair pair = (Pair) arrayDeque.pollFirst();
            StreamEdge streamEdge2 = (StreamEdge) pair.getLeft();
            Integer valueOf = Integer.valueOf(streamEdge2.getSourceId());
            boolean booleanValue = ((Boolean) pair.getRight()).booleanValue();
            if (booleanValue) {
                i--;
            }
            if (z2 && !z4) {
                break;
            }
            if (!z4 && !booleanValue && DamBehavior.FULL_DAM.equals(streamEdge2.getDamBehavior())) {
                z4 = true;
            }
            if (z4 && map.get(valueOf).isDownPriorityConflicting()) {
                z3 = true;
                break;
            }
            for (StreamEdge streamEdge3 : streamGraph.getStreamNode(valueOf).getInEdges()) {
                if (!booleanValue) {
                    booleanValue = !map.get(Integer.valueOf(streamEdge3.getTargetId())).isChainTo(Integer.valueOf(streamEdge3.getSourceId()));
                }
                if (booleanValue) {
                    i++;
                }
                arrayDeque.addLast(new ImmutablePair(streamEdge3, Boolean.valueOf(booleanValue)));
            }
        }
        return z4 && z3;
    }

    static void breakOffChainForAcyclicJobGraph(Map<Integer, ChainingStreamNode> map, List<ChainingStreamNode> list, StreamGraph streamGraph) {
        CoarsenedNode coarsenedNode;
        CursorableLinkedList.Cursor<CoarsenedNode> tryToMergeCoarsenedNode;
        if (map.size() == 0) {
            return;
        }
        Map<Integer, CoarsenedNode> map2 = (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            Integer num = (Integer) entry.getKey();
            return new CoarsenedNode(streamGraph.getStreamNode(num), (ChainingStreamNode) map.get(num));
        }));
        CursorableLinkedList cursorableLinkedList = new CursorableLinkedList();
        Iterator<ChainingStreamNode> it = list.iterator();
        while (it.hasNext()) {
            cursorableLinkedList.add(map2.get(Integer.valueOf(it.next().getNodeId())));
        }
        HashMap hashMap = new HashMap();
        CursorableLinkedList.Cursor cursor = cursorableLinkedList.cursor();
        CoarsenedNode coarsenedNode2 = (CoarsenedNode) cursor.next();
        while (true) {
            CoarsenedNode coarsenedNode3 = coarsenedNode2;
            Integer id = coarsenedNode3.getId();
            hashMap.put(id, cursorableLinkedList.cursor(cursor));
            CoarsenedNode coarsenedNode4 = cursor.hasNext() ? (CoarsenedNode) cursor.next() : null;
            for (Integer num : coarsenedNode3.getPredecessorsNodes().keySet()) {
                if (coarsenedNode3.getPredecessorsNodes().get(num).booleanValue() && (tryToMergeCoarsenedNode = tryToMergeCoarsenedNode(cursorableLinkedList, coarsenedNode3, (coarsenedNode = map2.get(num)), hashMap)) != null) {
                    coarsenedNode.merge(coarsenedNode3, map2);
                    hashMap.put(num, tryToMergeCoarsenedNode);
                    map2.remove(id);
                    hashMap.remove(id);
                    coarsenedNode3 = coarsenedNode;
                    id = num;
                }
            }
            if (coarsenedNode4 == null) {
                break;
            } else {
                coarsenedNode2 = coarsenedNode4;
            }
        }
        HashMap hashMap2 = new HashMap();
        CursorableLinkedList.Cursor cursor2 = cursorableLinkedList.cursor();
        while (cursor2.hasNext()) {
            CoarsenedNode coarsenedNode5 = (CoarsenedNode) cursor2.next();
            Integer id2 = coarsenedNode5.getId();
            Iterator<Integer> it2 = coarsenedNode5.getOriginalNodes().iterator();
            while (it2.hasNext()) {
                hashMap2.put(it2.next(), id2);
            }
        }
        for (ChainingStreamNode chainingStreamNode : list) {
            Integer valueOf = Integer.valueOf(chainingStreamNode.getNodeId());
            Integer num2 = (Integer) hashMap2.get(valueOf);
            Iterator<StreamEdge> it3 = streamGraph.getStreamNode(valueOf).getInEdges().iterator();
            while (it3.hasNext()) {
                Integer valueOf2 = Integer.valueOf(it3.next().getSourceId());
                if (chainingStreamNode.isChainTo(valueOf2) && !((Integer) hashMap2.get(valueOf2)).equals(num2)) {
                    chainingStreamNode.removeChainableToNode(valueOf2);
                }
            }
            chainingStreamNode.setCoarsenedId(num2);
        }
    }

    private static CursorableLinkedList.Cursor<CoarsenedNode> tryToMergeCoarsenedNode(CursorableLinkedList<CoarsenedNode> cursorableLinkedList, CoarsenedNode coarsenedNode, CoarsenedNode coarsenedNode2, Map<Integer, CursorableLinkedList.Cursor<CoarsenedNode>> map) {
        Integer id = coarsenedNode.getId();
        Integer id2 = coarsenedNode2.getId();
        CursorableLinkedList.Cursor<CoarsenedNode> cursor = cursorableLinkedList.cursor(map.get(id2));
        CoarsenedNode value = cursor.getValue();
        Map<Integer, Boolean> predecessorsNodes = coarsenedNode.getPredecessorsNodes();
        int i = 0;
        int i2 = 0;
        while (true) {
            Integer id3 = value.getId();
            if (predecessorsNodes.containsKey(id3)) {
                i = i2;
            }
            if (id3.equals(id)) {
                CursorableLinkedList.Cursor<CoarsenedNode> cursor2 = cursorableLinkedList.cursor(map.get(id2));
                int i3 = 0;
                Set<Integer> sucessorNodes = coarsenedNode2.getSucessorNodes();
                while (cursor2.hasNext()) {
                    i3++;
                    Integer id4 = cursor2.next().getId();
                    if (sucessorNodes.contains(id4)) {
                        break;
                    }
                    if (id4.equals(id)) {
                        throw new IllegalStateException("An internal error is occurred");
                    }
                }
                if (i3 <= i) {
                    return null;
                }
                CursorableLinkedList.Cursor<CoarsenedNode> cursor3 = map.get(id2);
                if (i3 > 1) {
                    cursor3.moveNodeTo(cursor2);
                }
                map.get(id).remove();
                return cursor3;
            }
            if (!cursor.hasNext()) {
                throw new IllegalStateException("An internal error is occurred.");
            }
            i2++;
            value = cursor.next();
        }
    }

    private boolean createChain(Integer num, Integer num2, SequenceGenerator sequenceGenerator, @Nullable Map<Integer, ChainingStreamNode> map, Map<Integer, byte[]> map2, List<Map<Integer, byte[]>> list, CreatingChainIntermediateStorager creatingChainIntermediateStorager) {
        if (creatingChainIntermediateStorager.allBuiltNodes.contains(num2)) {
            return false;
        }
        creatingChainIntermediateStorager.allBuiltNodes.add(num2);
        StreamNode streamNode = this.streamGraph.getStreamNode(num2);
        int i = sequenceGenerator.get();
        byte[] bArr = map2.get(num2);
        boolean z = map == null || map.get(num2).isChainHeadNode();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (StreamEdge streamEdge : streamNode.getOutEdges()) {
            Integer valueOf = Integer.valueOf(streamEdge.getTargetId());
            ChainingStreamNode chainingStreamNode = map == null ? null : map.get(valueOf);
            if (map == null || !chainingStreamNode.isChainTo(num2)) {
                arrayList2.add(streamEdge);
            } else {
                arrayList.add(streamEdge);
                createChain(num, valueOf, sequenceGenerator, map, map2, list, creatingChainIntermediateStorager);
            }
        }
        creatingChainIntermediateStorager.chainedNameMap.put(num2, makeChainedName(streamNode.getOperatorName(), arrayList, creatingChainIntermediateStorager.chainedNameMap));
        if (map != null) {
            Iterator<StreamEdge> it = streamNode.getInEdges().iterator();
            while (it.hasNext()) {
                Integer valueOf2 = Integer.valueOf(it.next().getSourceId());
                if (map.get(num2).isChainTo(valueOf2)) {
                    createChain(num, valueOf2, sequenceGenerator, map, map2, list, creatingChainIntermediateStorager);
                }
            }
        }
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        OperatorID operatorID = new OperatorID(bArr);
        if (z) {
            streamConfig.setChainStart();
        }
        streamConfig.setChainIndex(i);
        streamConfig.setOperatorName(streamNode.getOperatorName());
        streamConfig.setOperatorID(operatorID);
        if (arrayList.isEmpty()) {
            streamConfig.setChainEnd();
        }
        ArrayList arrayList3 = new ArrayList();
        for (StreamEdge streamEdge2 : streamNode.getInEdges()) {
            if (map == null) {
                arrayList3.add(streamEdge2);
            } else if (!map.get(num2).isChainTo(Integer.valueOf(streamEdge2.getSourceId()))) {
                arrayList3.add(streamEdge2);
            }
        }
        setupNodeConfig(num2, arrayList3, arrayList, arrayList2, this.streamGraph, streamConfig);
        creatingChainIntermediateStorager.chainedConfigMap.put(num2, streamConfig);
        creatingChainIntermediateStorager.chainedNodeIdsInOrder.add(num2);
        if (z) {
            creatingChainIntermediateStorager.chainedHeadNodeIdsInOrder.add(num2);
        }
        creatingChainIntermediateStorager.chainInEdgesInOrder.addAll(arrayList3);
        creatingChainIntermediateStorager.chainOutEdgesInOrder.addAll(arrayList2);
        if (streamNode.getOutputFormat() != null) {
            creatingChainIntermediateStorager.chainOutputFormatMap.put(operatorID, streamNode.getOutputFormat());
        }
        if (streamNode.getInputFormat() != null) {
            creatingChainIntermediateStorager.chainInputFormatMap.put(operatorID, streamNode.getInputFormat());
        }
        ResourceSpec minResources = streamNode.getMinResources();
        creatingChainIntermediateStorager.chainedMinResources = creatingChainIntermediateStorager.chainedMinResources == null ? minResources : creatingChainIntermediateStorager.chainedMinResources.merge(minResources);
        ResourceSpec preferredResources = streamNode.getPreferredResources();
        creatingChainIntermediateStorager.chainedPreferredResources = creatingChainIntermediateStorager.chainedPreferredResources == null ? preferredResources : creatingChainIntermediateStorager.chainedPreferredResources.merge(preferredResources);
        if (!num2.equals(num)) {
            return true;
        }
        if (map != null) {
            creatingChainIntermediateStorager.chainInEdgesInOrder.sort(Comparator.comparingInt(streamEdge3 -> {
                return ((ChainingStreamNode) map.get(Integer.valueOf(streamEdge3.getTargetId()))).getDepthFirstNumber();
            }).thenComparingInt(streamEdge4 -> {
                return this.streamGraph.getStreamNode(Integer.valueOf(streamEdge4.getTargetId())).getInEdges().indexOf(streamEdge4);
            }));
            creatingChainIntermediateStorager.chainOutEdgesInOrder.sort(Comparator.comparingInt(streamEdge5 -> {
                return ((ChainingStreamNode) map.get(Integer.valueOf(streamEdge5.getTargetId()))).getDepthFirstNumber();
            }).thenComparingInt(streamEdge6 -> {
                return this.streamGraph.getStreamNode(Integer.valueOf(streamEdge6.getTargetId())).getInEdges().indexOf(streamEdge6);
            }));
            creatingChainIntermediateStorager.chainedNodeIdsInOrder.sort(Comparator.comparingInt(num3 -> {
                return ((ChainingStreamNode) map.get(num3)).getDepthFirstNumber();
            }));
            creatingChainIntermediateStorager.chainedHeadNodeIdsInOrder.sort(Comparator.comparingInt(num4 -> {
                return ((ChainingStreamNode) map.get(num4)).getTopologicalOrder();
            }));
        }
        creatingChainIntermediateStorager.createdVertex = createJobVertex(num, map2, list, creatingChainIntermediateStorager);
        setupVertexConfig(streamConfig, creatingChainIntermediateStorager, creatingChainIntermediateStorager.createdVertex.getConfiguration());
        return true;
    }

    private JobVertex createJobVertex(Integer num, Map<Integer, byte[]> map, List<Map<Integer, byte[]>> list, CreatingChainIntermediateStorager creatingChainIntermediateStorager) {
        JobVertex multiInputOutputFormatVertex;
        byte[] bArr = map.get(num);
        if (bArr == null) {
            throw new IllegalStateException("Cannot find node hash (nodeId: " + num + ") . Did you generate them before calling this method?");
        }
        JobVertexID jobVertexID = new JobVertexID(bArr);
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<Map<Integer, byte[]>> it = list.iterator();
        while (it.hasNext()) {
            byte[] bArr2 = it.next().get(num);
            if (null != bArr2) {
                arrayList.add(new JobVertexID(bArr2));
            }
        }
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (Integer num2 : creatingChainIntermediateStorager.chainedNodeIdsInOrder) {
            byte[] bArr3 = map.get(num2);
            for (Map<Integer, byte[]> map2 : list) {
                arrayList2.add(new OperatorID(bArr3));
                byte[] bArr4 = map2.get(num2);
                arrayList3.add(bArr4 != null ? new OperatorID(bArr4) : null);
            }
        }
        String makeJobVertexName = makeJobVertexName(creatingChainIntermediateStorager.chainedHeadNodeIdsInOrder, creatingChainIntermediateStorager.chainedNameMap);
        StreamNode streamNode = this.streamGraph.getStreamNode(num);
        if (creatingChainIntermediateStorager.chainInputFormatMap.size() == 0 && creatingChainIntermediateStorager.chainOutputFormatMap.size() == 0) {
            multiInputOutputFormatVertex = new JobVertex(makeJobVertexName, jobVertexID, arrayList, arrayList2, arrayList3);
        } else {
            multiInputOutputFormatVertex = new MultiInputOutputFormatVertex(makeJobVertexName, jobVertexID, arrayList, arrayList2, arrayList3);
            FormatUtil.MultiFormatStub.setStubFormats(new TaskConfig(multiInputOutputFormatVertex.getConfiguration()), creatingChainIntermediateStorager.chainInputFormatMap.size() == 0 ? null : creatingChainIntermediateStorager.chainInputFormatMap, creatingChainIntermediateStorager.chainOutputFormatMap.size() == 0 ? null : creatingChainIntermediateStorager.chainOutputFormatMap);
        }
        boolean z = false;
        for (Integer num3 : creatingChainIntermediateStorager.chainedNodeIdsInOrder) {
            byte[] bArr5 = map.get(num3);
            StreamNode streamNode2 = this.streamGraph.getStreamNode(num3);
            OperatorID operatorID = new OperatorID(bArr5);
            OperatorDescriptor operatorDescriptor = new OperatorDescriptor(streamNode2.getOperatorName(), operatorID);
            int i = 0;
            for (StreamEdge streamEdge : streamNode2.getInEdges()) {
                operatorDescriptor.addInput(new OperatorEdgeDescriptor(new OperatorID(map.get(Integer.valueOf(streamEdge.getSourceId()))), operatorID, streamEdge.getTypeNumber(), streamEdge.getPartitioner() == null ? "null" : streamEdge.getPartitioner().toString()));
                if (creatingChainIntermediateStorager.chainedConfigMap.containsKey(Integer.valueOf(streamEdge.getSourceId()))) {
                    i++;
                }
            }
            multiInputOutputFormatVertex.addOperatorDescriptor(operatorDescriptor);
            if (!z && i > 1) {
                z = true;
            }
        }
        multiInputOutputFormatVertex.setResources(creatingChainIntermediateStorager.chainedMinResources, creatingChainIntermediateStorager.chainedPreferredResources);
        if (creatingChainIntermediateStorager.chainedHeadNodeIdsInOrder.size() > 1 || z) {
            multiInputOutputFormatVertex.setInvokableClass(ArbitraryInputStreamTask.class);
        } else {
            multiInputOutputFormatVertex.setInvokableClass(streamNode.getJobVertexClass());
        }
        int parallelism = streamNode.getParallelism();
        if (parallelism > 0) {
            multiInputOutputFormatVertex.setParallelism(parallelism);
        } else {
            parallelism = multiInputOutputFormatVertex.getParallelism();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Parallelism set: {} for {}", Integer.valueOf(parallelism), num);
        }
        multiInputOutputFormatVertex.setMaxParallelism(streamNode.getMaxParallelism());
        return multiInputOutputFormatVertex;
    }

    private void setupVertexConfig(StreamConfig streamConfig, CreatingChainIntermediateStorager creatingChainIntermediateStorager, Configuration configuration) {
        StreamTaskConfigCache streamTaskConfigCache = creatingChainIntermediateStorager.vertexConfigCache;
        streamTaskConfigCache.setTimeCharacteristic(streamConfig.getTimeCharacteristic());
        streamTaskConfigCache.setCheckpointingEnabled(streamConfig.isCheckpointingEnabled());
        streamTaskConfigCache.setCheckpointMode(streamConfig.getCheckpointMode());
        streamTaskConfigCache.setStateBackend(streamConfig.getStateBackend(creatingChainIntermediateStorager.classLoader));
        streamTaskConfigCache.setChainedNodeConfigs(creatingChainIntermediateStorager.chainedConfigMap);
        streamTaskConfigCache.setChainedHeadNodeIds(creatingChainIntermediateStorager.chainedHeadNodeIdsInOrder);
        streamTaskConfigCache.setInStreamEdgesOfChain(creatingChainIntermediateStorager.chainInEdgesInOrder);
        streamTaskConfigCache.setOutStreamEdgesOfChain(creatingChainIntermediateStorager.chainOutEdgesInOrder);
        streamTaskConfigCache.serializeTo(new StreamTaskConfig(configuration));
    }

    private static void setupNodeConfig(Integer num, List<StreamEdge> list, List<StreamEdge> list2, List<StreamEdge> list3, StreamGraph streamGraph, StreamConfig streamConfig) {
        StreamNode streamNode = streamGraph.getStreamNode(num);
        streamConfig.setVertexID(num);
        streamConfig.setBufferTimeout(streamNode.getBufferTimeout().longValue());
        streamConfig.setTypeSerializerIn1(streamNode.getTypeSerializerIn1());
        streamConfig.setTypeSerializerIn2(streamNode.getTypeSerializerIn2());
        streamConfig.setTypeSerializerOut(streamNode.getTypeSerializerOut());
        for (StreamEdge streamEdge : list2) {
            if (streamEdge.getOutputTag() != null) {
                streamConfig.setTypeSerializerSideOut(streamEdge.getOutputTag(), streamEdge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig()));
            }
        }
        for (StreamEdge streamEdge2 : list3) {
            if (streamEdge2.getOutputTag() != null) {
                streamConfig.setTypeSerializerSideOut(streamEdge2.getOutputTag(), streamEdge2.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig()));
            }
        }
        streamConfig.setStreamOperator(streamNode.getOperator());
        streamConfig.setOutputSelectors(streamNode.getOutputSelectors());
        streamConfig.setNumberOfInputs(list.size());
        streamConfig.setNumberOfOutputs(list3.size());
        streamConfig.setNonChainedOutputs(list3);
        streamConfig.setChainedOutputs(list2);
        streamConfig.setTimeCharacteristic(streamGraph.getTimeCharacteristic());
        CheckpointConfig checkpointConfig = streamGraph.getCheckpointConfig();
        streamConfig.setStateBackend(streamGraph.getStateBackend());
        streamConfig.setCheckpointingEnabled(checkpointConfig.isCheckpointingEnabled());
        if (checkpointConfig.isCheckpointingEnabled()) {
            streamConfig.setCheckpointMode(checkpointConfig.getCheckpointingMode());
        } else {
            streamConfig.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
        }
        streamConfig.setStatePartitioner(0, streamNode.getStatePartitioner1());
        streamConfig.setStatePartitioner(1, streamNode.getStatePartitioner2());
        streamConfig.setStateKeySerializer(streamNode.getStateKeySerializer());
        Class<? extends AbstractInvokable> jobVertexClass = streamNode.getJobVertexClass();
        if (jobVertexClass.equals(StreamIterationHead.class) || jobVertexClass.equals(StreamIterationTail.class)) {
            streamConfig.setIterationId(streamGraph.getBrokerID(num));
            streamConfig.setIterationWaitTime(streamGraph.getLoopTimeout(num));
        }
        Configuration customConfiguration = streamNode.getCustomConfiguration();
        if (customConfiguration.keySet().size() > 0) {
            streamConfig.setCustomConfiguration(customConfiguration);
        }
    }

    private static String makeChainedName(String str, List<StreamEdge> list, Map<Integer, String> map) {
        if (list.size() <= 1) {
            return list.size() == 1 ? str + " -> " + map.get(Integer.valueOf(list.get(0).getTargetId())) : str;
        }
        ArrayList arrayList = new ArrayList();
        Iterator<StreamEdge> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(map.get(Integer.valueOf(it.next().getTargetId())));
        }
        return str + " -> (" + StringUtils.join(arrayList, ", ") + ")";
    }

    private static String makeJobVertexName(List<Integer> list, Map<Integer, String> map) {
        StringBuilder sb = new StringBuilder();
        int size = list.size();
        if (size > 1) {
            sb.append("[");
        }
        for (int i = 0; i < size; i++) {
            if (i > 0) {
                sb.append(", ");
            }
            sb.append(map.get(list.get(i)));
        }
        if (size > 1) {
            sb.append("]");
        }
        return sb.toString();
    }

    private static ResultPartitionType getEdgeResultPartitionType(DataExchangeMode dataExchangeMode, ExecutionMode executionMode) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$io$network$DataExchangeMode[dataExchangeMode.ordinal()]) {
            case 1:
                switch (AnonymousClass1.$SwitchMap$org$apache$flink$api$common$ExecutionMode[executionMode.ordinal()]) {
                    case 1:
                        return ResultPartitionType.PIPELINED;
                    case 2:
                        return ResultPartitionType.BLOCKING;
                    default:
                        throw new UnsupportedOperationException("Unknown execution mode " + executionMode + ".");
                }
            case 2:
                return ResultPartitionType.PIPELINED;
            case 3:
                return ResultPartitionType.BLOCKING;
            case 4:
                throw new UnsupportedOperationException("Data exchange mode " + dataExchangeMode + " is not supported.");
            default:
                throw new UnsupportedOperationException("Unknown data exchange mode " + dataExchangeMode + ".");
        }
    }

    private void connectEdges() {
        for (StreamEdge streamEdge : this.transitiveOutEdges) {
            JobVertex jobVertex = this.nodeToJobVertexMap.get(Integer.valueOf(streamEdge.getSourceId()));
            JobVertex jobVertex2 = this.nodeToJobVertexMap.get(Integer.valueOf(streamEdge.getTargetId()));
            if (jobVertex.getID().equals(jobVertex2.getID())) {
                throw new RuntimeException("The job graph is cyclic.");
            }
            StreamPartitioner<?> partitioner = streamEdge.getPartitioner();
            IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID(streamEdge.getEdgeID());
            ExecutionMode executionMode = this.streamGraph.getExecutionConfig().getExecutionMode();
            (((partitioner instanceof ForwardPartitioner) || (partitioner instanceof RescalePartitioner)) ? jobVertex2.connectDataSetAsInput(jobVertex, intermediateDataSetID, DistributionPattern.POINTWISE, getEdgeResultPartitionType(streamEdge.getDataExchangeMode(), executionMode)) : jobVertex2.connectDataSetAsInput(jobVertex, intermediateDataSetID, DistributionPattern.ALL_TO_ALL, getEdgeResultPartitionType(streamEdge.getDataExchangeMode(), executionMode))).setShipStrategyName(partitioner.toString());
            if (LOG.isDebugEnabled()) {
                LOG.debug("CONNECTED: {} - {} -> {}", new Object[]{partitioner.getClass().getSimpleName(), Integer.valueOf(streamEdge.getSourceId()), Integer.valueOf(streamEdge.getTargetId())});
            }
        }
    }

    private void setSlotSharing() {
        HashMap hashMap = new HashMap();
        for (Integer num : this.chainedNodeIdsMap.keySet()) {
            JobVertex jobVertex = this.nodeToJobVertexMap.get(num);
            String slotSharingGroup = this.streamGraph.getStreamNode(num).getSlotSharingGroup();
            if (slotSharingGroup != null) {
                SlotSharingGroup slotSharingGroup2 = (SlotSharingGroup) hashMap.get(slotSharingGroup);
                if (slotSharingGroup2 == null) {
                    slotSharingGroup2 = new SlotSharingGroup();
                    hashMap.put(slotSharingGroup, slotSharingGroup2);
                }
                jobVertex.setSlotSharingGroup(slotSharingGroup2);
            }
        }
        for (Tuple2<StreamNode, StreamNode> tuple2 : this.streamGraph.getIterationSourceSinkPairs()) {
            CoLocationGroup coLocationGroup = new CoLocationGroup();
            JobVertex jobVertex2 = this.nodeToJobVertexMap.get(Integer.valueOf(((StreamNode) tuple2.f0).getId()));
            JobVertex jobVertex3 = this.nodeToJobVertexMap.get(Integer.valueOf(((StreamNode) tuple2.f1).getId()));
            coLocationGroup.addVertex(jobVertex2);
            coLocationGroup.addVertex(jobVertex3);
            jobVertex2.updateCoLocationGroup(coLocationGroup);
            jobVertex3.updateCoLocationGroup(coLocationGroup);
        }
    }

    private void configureCheckpointing() {
        CheckpointRetentionPolicy checkpointRetentionPolicy;
        boolean z;
        SerializedValue serializedValue;
        SerializedValue serializedValue2;
        CheckpointConfig checkpointConfig = this.streamGraph.getCheckpointConfig();
        long checkpointInterval = checkpointConfig.getCheckpointInterval();
        if (checkpointInterval > 0) {
            ExecutionConfig executionConfig = this.streamGraph.getExecutionConfig();
            executionConfig.setFailTaskOnCheckpointError(checkpointConfig.isFailOnCheckpointingErrors());
            if (executionConfig.getRestartStrategy() == null) {
                executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
            }
        } else {
            checkpointInterval = Long.MAX_VALUE;
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList(this.chainedNodeIdsMap.size());
        ArrayList arrayList3 = new ArrayList(this.chainedNodeIdsMap.size());
        Iterator<Integer> it = this.chainedNodeIdsMap.keySet().iterator();
        while (it.hasNext()) {
            JobVertex jobVertex = this.nodeToJobVertexMap.get(it.next());
            if (jobVertex.isInputVertex()) {
                arrayList.add(jobVertex.getID());
            }
            arrayList3.add(jobVertex.getID());
            arrayList2.add(jobVertex.getID());
        }
        if (checkpointConfig.isExternalizedCheckpointsEnabled()) {
            CheckpointConfig.ExternalizedCheckpointCleanup externalizedCheckpointCleanup = checkpointConfig.getExternalizedCheckpointCleanup();
            if (externalizedCheckpointCleanup == null) {
                throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode configured.");
            }
            checkpointRetentionPolicy = externalizedCheckpointCleanup.deleteOnCancellation() ? CheckpointRetentionPolicy.RETAIN_ON_FAILURE : CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION;
        } else {
            checkpointRetentionPolicy = CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
        }
        CheckpointingMode checkpointingMode = checkpointConfig.getCheckpointingMode();
        if (checkpointingMode == CheckpointingMode.EXACTLY_ONCE) {
            z = true;
        } else {
            if (checkpointingMode != CheckpointingMode.AT_LEAST_ONCE) {
                throw new IllegalStateException("Unexpected checkpointing mode. Did not expect there to be another checkpointing mode besides exactly-once or at-least-once.");
            }
            z = false;
        }
        ArrayList arrayList4 = new ArrayList();
        Iterator<StreamNode> it2 = this.streamGraph.getStreamNodes().iterator();
        while (it2.hasNext()) {
            StreamOperator<?> operator = it2.next().getOperator();
            if (operator instanceof AbstractUdfStreamOperator) {
                WithMasterCheckpointHook userFunction = ((AbstractUdfStreamOperator) operator).getUserFunction();
                if (userFunction instanceof WithMasterCheckpointHook) {
                    arrayList4.add(new FunctionMasterCheckpointHookFactory(userFunction));
                }
            }
        }
        if (arrayList4.isEmpty()) {
            serializedValue = null;
        } else {
            try {
                serializedValue = new SerializedValue((MasterTriggerRestoreHook.Factory[]) arrayList4.toArray(new MasterTriggerRestoreHook.Factory[arrayList4.size()]));
            } catch (IOException e) {
                throw new FlinkRuntimeException("Trigger/restore hook is not serializable", e);
            }
        }
        if (this.streamGraph.getStateBackend() == null) {
            serializedValue2 = null;
        } else {
            try {
                serializedValue2 = new SerializedValue(this.streamGraph.getStateBackend());
            } catch (IOException e2) {
                throw new FlinkRuntimeException("State backend is not serializable", e2);
            }
        }
        this.jobGraph.setSnapshotSettings(new JobCheckpointingSettings(arrayList, arrayList2, arrayList3, new CheckpointCoordinatorConfiguration(checkpointInterval, checkpointConfig.getCheckpointTimeout(), checkpointConfig.getMinPauseBetweenCheckpoints(), checkpointConfig.getMaxConcurrentCheckpoints(), checkpointRetentionPolicy, z), serializedValue2, serializedValue));
    }
}
