package org.apache.flink.runtime.taskmanager;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.core.fs.SafetyNetCloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.BlockingShuffleType;
import org.apache.flink.runtime.io.network.partition.InternalResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionMetrics;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateMetrics;
import org.apache.flink.runtime.io.network.partition.consumer.PartitionRequestManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.external.ExternalResultPartition;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.preaggregatedaccumulators.AccumulatorAggregationManager;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.WrappingRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/Task.class */
public class Task implements Runnable, TaskActions, CheckpointListener {
    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    private static final ThreadGroup TASK_THREADS_GROUP = new ThreadGroup("Flink Task Threads");
    private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Task.class, ExecutionState.class, "executionState");
    private final JobID jobId;
    private final String jobName;
    private final JobVertexID vertexId;
    private final ExecutionAttemptID executionId;
    private final AllocationID allocationId;
    private final TaskInfo taskInfo;
    private final String taskNameWithSubtask;
    private final Configuration jobConfiguration;
    private final Configuration taskConfiguration;
    private final Configuration taskManagerConfiguration;
    private final Collection<PermanentBlobKey> requiredJarFiles;
    private final Collection<URL> requiredClasspaths;
    private final String nameOfInvokableClass;
    private final TaskManagerRuntimeInfo taskManagerConfig;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final BroadcastVariableManager broadcastVariableManager;
    private final TaskStateManager taskStateManager;
    private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
    private final ResultPartition[] producedPartitions;
    private final List<InternalResultPartition> internalPartitions;
    private final List<ExternalResultPartition> externalPartitions;
    private final SingleInputGate[] inputGates;
    private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById;
    private final TaskManagerActions taskManagerActions;
    private final InputSplitProvider inputSplitProvider;
    private final CheckpointResponder checkpointResponder;
    private final List<TaskExecutionStateListener> taskExecutionStateListeners;
    private final BlobCacheService blobService;
    private final LibraryCacheManager libraryCache;
    private final FileCache fileCache;
    private final NetworkEnvironment network;
    private final AccumulatorRegistry accumulatorRegistry;
    private final Thread executingThread;
    private final TaskMetricGroup metrics;
    private final PartitionProducerStateChecker partitionProducerStateChecker;
    private final Executor executor;
    private final ExecutorService executorService;
    private final long createTimestamp;
    private final boolean checkPartitionProducerState;
    private final AtomicBoolean invokableHasBeenCanceled;
    private volatile AbstractInvokable invokable;
    private volatile ExecutionState executionState = ExecutionState.CREATED;
    private volatile Throwable failureCause;
    private volatile ExecutorService asyncCallDispatcher;
    private long taskCancellationInterval;
    private long taskCancellationTimeout;
    private ClassLoader userCodeClassLoader;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/Task$TaskCanceler.class */
    public static class TaskCanceler implements Runnable {
        private final Logger logger;
        private final AbstractInvokable invokable;
        private final Thread executer;
        private final String taskName;
        private final List<InternalResultPartition> internalPartitions;
        private final SingleInputGate[] inputGates;

        public TaskCanceler(Logger logger, AbstractInvokable abstractInvokable, Thread thread, String str, List<InternalResultPartition> list, SingleInputGate[] singleInputGateArr) {
            this.logger = logger;
            this.invokable = abstractInvokable;
            this.executer = thread;
            this.taskName = str;
            this.internalPartitions = list;
            this.inputGates = singleInputGateArr;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    this.invokable.cancel();
                } catch (Throwable th) {
                    ExceptionUtils.rethrowIfFatalError(th);
                    this.logger.error("Error while canceling the task {}.", this.taskName, th);
                }
                Iterator<InternalResultPartition> it = this.internalPartitions.iterator();
                while (it.hasNext()) {
                    try {
                        it.next().destroyBufferPool();
                    } catch (Throwable th2) {
                        ExceptionUtils.rethrowIfFatalError(th2);
                        Task.LOG.error("Failed to release result partition buffer pool for task {}.", this.taskName, th2);
                    }
                }
                for (SingleInputGate singleInputGate : this.inputGates) {
                    try {
                        singleInputGate.releaseAllResources();
                    } catch (Throwable th3) {
                        ExceptionUtils.rethrowIfFatalError(th3);
                        Task.LOG.error("Failed to release input gate for task {}.", this.taskName, th3);
                    }
                }
                this.executer.interrupt();
            } catch (Throwable th4) {
                ExceptionUtils.rethrowIfFatalError(th4);
                this.logger.error("Error in the task canceler for task {}.", this.taskName, th4);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/Task$TaskCancelerWatchDog.class */
    public static class TaskCancelerWatchDog implements Runnable {
        private final Logger log;
        private final Thread executerThread;
        private final TaskManagerActions taskManager;
        private final long timeoutMillis;

        TaskCancelerWatchDog(Thread thread, TaskManagerActions taskManagerActions, long j, Logger logger) {
            Preconditions.checkArgument(j > 0);
            this.log = logger;
            this.executerThread = thread;
            this.taskManager = taskManagerActions;
            this.timeoutMillis = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                long nanoTime = System.nanoTime() + (this.timeoutMillis * 1000000);
                while (this.executerThread.isAlive()) {
                    long nanoTime2 = (nanoTime - System.nanoTime()) / 1000000;
                    if (nanoTime2 <= 0) {
                        break;
                    } else {
                        try {
                            this.executerThread.join(nanoTime2);
                        } catch (InterruptedException e) {
                        }
                    }
                }
                if (this.executerThread.isAlive()) {
                    String str = "Task did not exit gracefully within " + (this.timeoutMillis / 1000) + " + seconds.";
                    this.log.error(str);
                    this.taskManager.notifyFatalError(str, null);
                }
            } catch (Throwable th) {
                ExceptionUtils.rethrowIfFatalError(th);
                this.log.error("Error in Task Cancellation Watch Dog", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/Task$TaskInterrupter.class */
    public static final class TaskInterrupter implements Runnable {
        private final Logger log;
        private final Thread executerThread;
        private final String taskName;
        private final long interruptIntervalMillis;

        TaskInterrupter(Logger logger, Thread thread, String str, long j) {
            this.log = logger;
            this.executerThread = thread;
            this.taskName = str;
            this.interruptIntervalMillis = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.executerThread.join(this.interruptIntervalMillis);
                while (this.executerThread.isAlive()) {
                    StackTraceElement[] stackTrace = this.executerThread.getStackTrace();
                    StringBuilder sb = new StringBuilder();
                    for (StackTraceElement stackTraceElement : stackTrace) {
                        sb.append(stackTraceElement).append('\n');
                    }
                    this.log.warn("Task '{}' did not react to cancelling signal for {} seconds, but is stuck in method:\n {}", new Object[]{this.taskName, Long.valueOf(this.interruptIntervalMillis / 1000), sb});
                    this.executerThread.interrupt();
                    try {
                        this.executerThread.join(this.interruptIntervalMillis);
                    } catch (InterruptedException e) {
                    }
                }
            } catch (Throwable th) {
                ExceptionUtils.rethrowIfFatalError(th);
                this.log.error("Error in the task canceler for task {}.", this.taskName, th);
            }
        }
    }

    public Task(JobInformation jobInformation, TaskInformation taskInformation, ExecutionAttemptID executionAttemptID, AllocationID allocationID, int i, int i2, Collection<ResultPartitionDeploymentDescriptor> collection, Collection<InputGateDeploymentDescriptor> collection2, int i3, long j, MemoryManager memoryManager, IOManager iOManager, NetworkEnvironment networkEnvironment, BroadcastVariableManager broadcastVariableManager, AccumulatorAggregationManager accumulatorAggregationManager, TaskStateManager taskStateManager, TaskManagerActions taskManagerActions, InputSplitProvider inputSplitProvider, CheckpointResponder checkpointResponder, BlobCacheService blobCacheService, LibraryCacheManager libraryCacheManager, FileCache fileCache, TaskManagerRuntimeInfo taskManagerRuntimeInfo, @Nonnull TaskMetricGroup taskMetricGroup, ResultPartitionConsumableNotifier resultPartitionConsumableNotifier, PartitionProducerStateChecker partitionProducerStateChecker, Executor executor, ExecutorService executorService) {
        Preconditions.checkNotNull(jobInformation);
        Preconditions.checkNotNull(taskInformation);
        Preconditions.checkArgument(0 <= i, "The subtask index must be positive.");
        Preconditions.checkArgument(0 <= i2, "The attempt number must be positive.");
        Preconditions.checkArgument(0 <= i3, "The target slot number must be positive.");
        this.taskInfo = new TaskInfo(taskInformation.getTaskName(), taskInformation.getMaxNumberOfSubtaks(), i, taskInformation.getNumberOfSubtasks(), i2, String.valueOf(allocationID));
        this.jobId = jobInformation.getJobId();
        this.jobName = jobInformation.getJobName();
        this.vertexId = taskInformation.getJobVertexId();
        this.executionId = (ExecutionAttemptID) Preconditions.checkNotNull(executionAttemptID);
        this.allocationId = (AllocationID) Preconditions.checkNotNull(allocationID);
        this.taskNameWithSubtask = this.taskInfo.getTaskNameWithSubtasks();
        this.jobConfiguration = jobInformation.getJobConfiguration();
        this.taskConfiguration = taskInformation.getTaskConfiguration();
        this.requiredJarFiles = jobInformation.getRequiredJarFileBlobKeys();
        this.requiredClasspaths = jobInformation.getRequiredClasspathURLs();
        this.nameOfInvokableClass = taskInformation.getInvokableClassName();
        this.serializedExecutionConfig = jobInformation.getSerializedExecutionConfig();
        this.taskManagerConfiguration = taskManagerRuntimeInfo.getConfiguration();
        this.taskCancellationInterval = this.taskManagerConfiguration.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
        this.taskCancellationTimeout = this.taskManagerConfiguration.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
        this.checkPartitionProducerState = this.taskManagerConfiguration.getBoolean(TaskManagerOptions.CHECK_PARTITION_PRODUCER_STATE);
        this.createTimestamp = j;
        this.memoryManager = (MemoryManager) Preconditions.checkNotNull(memoryManager);
        this.ioManager = (IOManager) Preconditions.checkNotNull(iOManager);
        this.broadcastVariableManager = (BroadcastVariableManager) Preconditions.checkNotNull(broadcastVariableManager);
        this.taskStateManager = (TaskStateManager) Preconditions.checkNotNull(taskStateManager);
        this.accumulatorRegistry = new AccumulatorRegistry(this.jobId, this.vertexId, i, this.executionId, accumulatorAggregationManager);
        this.inputSplitProvider = (InputSplitProvider) Preconditions.checkNotNull(inputSplitProvider);
        this.checkpointResponder = (CheckpointResponder) Preconditions.checkNotNull(checkpointResponder);
        this.taskManagerActions = (TaskManagerActions) Preconditions.checkNotNull(taskManagerActions);
        this.blobService = (BlobCacheService) Preconditions.checkNotNull(blobCacheService);
        this.libraryCache = (LibraryCacheManager) Preconditions.checkNotNull(libraryCacheManager);
        this.fileCache = (FileCache) Preconditions.checkNotNull(fileCache);
        this.network = (NetworkEnvironment) Preconditions.checkNotNull(networkEnvironment);
        this.taskManagerConfig = (TaskManagerRuntimeInfo) Preconditions.checkNotNull(taskManagerRuntimeInfo);
        this.taskExecutionStateListeners = new CopyOnWriteArrayList();
        this.metrics = taskMetricGroup;
        this.partitionProducerStateChecker = (PartitionProducerStateChecker) Preconditions.checkNotNull(partitionProducerStateChecker);
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        this.executorService = (ExecutorService) Preconditions.checkNotNull(executorService);
        String str = this.taskNameWithSubtask + " (" + this.executionId + ')';
        this.internalPartitions = new ArrayList();
        this.externalPartitions = new ArrayList();
        this.producedPartitions = new ResultPartition[collection.size()];
        if (this.producedPartitions.length > 0) {
            createAllResultPartitions(str, collection, resultPartitionConsumableNotifier);
        }
        this.inputGates = new SingleInputGate[collection2.size()];
        this.inputGatesById = new HashMap();
        if (this.inputGates.length > 0) {
            createAllInputGates(str, collection2);
        }
        this.invokableHasBeenCanceled = new AtomicBoolean(false);
        this.executingThread = new Thread(TASK_THREADS_GROUP, this, this.taskNameWithSubtask);
        this.executingThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
    }

    public JobID getJobID() {
        return this.jobId;
    }

    public String getJobName() {
        return this.jobName;
    }

    public JobVertexID getJobVertexId() {
        return this.vertexId;
    }

    public ExecutionAttemptID getExecutionId() {
        return this.executionId;
    }

    public AllocationID getAllocationId() {
        return this.allocationId;
    }

    public TaskInfo getTaskInfo() {
        return this.taskInfo;
    }

    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    public Configuration getTaskConfiguration() {
        return this.taskConfiguration;
    }

    public SingleInputGate[] getAllInputGates() {
        return this.inputGates;
    }

    public ResultPartition[] getProducedPartitions() {
        return this.producedPartitions;
    }

    public List<InternalResultPartition> getInternalPartitions() {
        return this.internalPartitions;
    }

    public List<ExternalResultPartition> getExternalPartitions() {
        return this.externalPartitions;
    }

    public SingleInputGate getInputGateById(IntermediateDataSetID intermediateDataSetID) {
        return this.inputGatesById.get(intermediateDataSetID);
    }

    public AccumulatorRegistry getAccumulatorRegistry() {
        return this.accumulatorRegistry;
    }

    public TaskMetricGroup getMetricGroup() {
        return this.metrics;
    }

    public Thread getExecutingThread() {
        return this.executingThread;
    }

    @VisibleForTesting
    long getTaskCancellationInterval() {
        return this.taskCancellationInterval;
    }

    @VisibleForTesting
    long getTaskCancellationTimeout() {
        return this.taskCancellationTimeout;
    }

    public long getCreateTimestamp() {
        return this.createTimestamp;
    }

    public InputSplitProvider getInputSplitProvider() {
        return this.inputSplitProvider;
    }

    public ExecutionState getExecutionState() {
        return this.executionState;
    }

    public boolean isCanceledOrFailed() {
        return this.executionState == ExecutionState.CANCELING || this.executionState == ExecutionState.CANCELED || this.executionState == ExecutionState.FAILED;
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    public void startTaskThread() {
        this.executingThread.start();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            ExecutionState executionState = this.executionState;
            if (executionState == ExecutionState.CREATED) {
                if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
                    HashMap hashMap = new HashMap();
                    try {
                        try {
                            LOG.info("Creating FileSystem stream leak safety net for task {}", this);
                            FileSystemSafetyNet.initializeSafetyNetForThread();
                            this.blobService.getPermanentBlobService().registerJob(this.jobId);
                            LOG.info("Loading JAR files for task {}.", this);
                            this.userCodeClassLoader = createUserCodeClassloader();
                            ExecutionConfig executionConfig = (ExecutionConfig) this.serializedExecutionConfig.deserializeValue(this.userCodeClassLoader);
                            if (executionConfig.getTaskCancellationInterval() >= 0) {
                                this.taskCancellationInterval = executionConfig.getTaskCancellationInterval();
                            }
                            if (executionConfig.getTaskCancellationTimeout() >= 0) {
                                this.taskCancellationTimeout = executionConfig.getTaskCancellationTimeout();
                            }
                            if (isCanceledOrFailed()) {
                                throw new CancelTaskException();
                            }
                            LOG.info("Registering task at network: {}.", this);
                            this.network.registerTask(this);
                            this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
                            if (this.taskManagerConfig.getConfiguration().getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS)) {
                                MetricGroup addGroup = this.metrics.getIOMetricGroup().addGroup("Network");
                                MetricGroup addGroup2 = addGroup.addGroup("Output");
                                MetricGroup addGroup3 = addGroup.addGroup("Input");
                                for (int i = 0; i < this.internalPartitions.size(); i++) {
                                    ResultPartitionMetrics.registerQueueLengthMetrics(addGroup2.addGroup(i), this.internalPartitions.get(i));
                                }
                                for (int i2 = 0; i2 < this.inputGates.length; i2++) {
                                    InputGateMetrics.registerQueueLengthMetrics(addGroup3.addGroup(i2), this.inputGates[i2]);
                                }
                            }
                            try {
                                for (Map.Entry entry : DistributedCache.readFileInfoFromConfig(this.jobConfiguration)) {
                                    LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
                                    hashMap.put(entry.getKey(), this.fileCache.createTmpFile((String) entry.getKey(), (DistributedCache.DistributedCacheEntry) entry.getValue(), this.jobId, this.executionId));
                                }
                                if (isCanceledOrFailed()) {
                                    throw new CancelTaskException();
                                }
                                AbstractInvokable loadAndInstantiateInvokable = loadAndInstantiateInvokable(this.userCodeClassLoader, this.nameOfInvokableClass, new RuntimeEnvironment(this.jobId, this.vertexId, this.executionId, executionConfig, this.taskInfo, this.jobConfiguration, this.taskConfiguration, this.userCodeClassLoader, this.memoryManager, this.ioManager, this.broadcastVariableManager, this.taskStateManager, this.accumulatorRegistry, this.network.createKvStateTaskRegistry(this.jobId, getJobVertexId()), this.inputSplitProvider, hashMap, this.producedPartitions, this.inputGates, this.network.getTaskEventDispatcher(), this.checkpointResponder, this.taskManagerConfig, this.metrics, this));
                                this.invokable = loadAndInstantiateInvokable;
                                if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                                    throw new CancelTaskException();
                                }
                                notifyObservers(ExecutionState.RUNNING, null);
                                this.taskManagerActions.updateTaskExecutionState(new TaskExecutionState(this.jobId, this.executionId, ExecutionState.RUNNING));
                                this.executingThread.setContextClassLoader(this.userCodeClassLoader);
                                loadAndInstantiateInvokable.invoke();
                                if (isCanceledOrFailed()) {
                                    throw new CancelTaskException();
                                }
                                for (ResultPartition resultPartition : this.producedPartitions) {
                                    if (resultPartition != null) {
                                        resultPartition.finish();
                                    }
                                }
                                if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
                                    throw new CancelTaskException();
                                }
                                notifyObservers(ExecutionState.FINISHED, null);
                                try {
                                    LOG.info("Freeing task resources for {} ({}).", this.taskNameWithSubtask, this.executionId);
                                    ExecutorService executorService = this.asyncCallDispatcher;
                                    if (executorService != null && !executorService.isShutdown()) {
                                        executorService.shutdownNow();
                                    }
                                    this.network.unregisterTask(this);
                                    if (loadAndInstantiateInvokable != null) {
                                        this.memoryManager.releaseAll(loadAndInstantiateInvokable);
                                    }
                                    this.libraryCache.unregisterTask(this.jobId, this.executionId);
                                    this.fileCache.releaseJob(this.jobId, this.executionId);
                                    this.blobService.getPermanentBlobService().releaseJob(this.jobId);
                                    LOG.info("Ensuring all FileSystem streams are closed for task {}", this);
                                    FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                                    notifyFinalState();
                                } catch (Throwable th) {
                                    String format = String.format("FATAL - exception in resource cleanup of task %s (%s).", this.taskNameWithSubtask, this.executionId);
                                    LOG.error(format, th);
                                    notifyFatalError(format, th);
                                }
                                try {
                                    this.metrics.close();
                                    return;
                                } catch (Throwable th2) {
                                    LOG.error("Error during metrics de-registration of task {} ({}).", new Object[]{this.taskNameWithSubtask, this.executionId, th2});
                                    return;
                                }
                            } catch (Exception e) {
                                throw new Exception(String.format("Exception while adding files to distributed cache of task %s (%s).", this.taskNameWithSubtask, this.executionId), e);
                            }
                        } catch (Throwable th3) {
                            try {
                                LOG.info("Freeing task resources for {} ({}).", this.taskNameWithSubtask, this.executionId);
                                ExecutorService executorService2 = this.asyncCallDispatcher;
                                if (executorService2 != null && !executorService2.isShutdown()) {
                                    executorService2.shutdownNow();
                                }
                                this.network.unregisterTask(this);
                                if (0 != 0) {
                                    this.memoryManager.releaseAll(null);
                                }
                                this.libraryCache.unregisterTask(this.jobId, this.executionId);
                                this.fileCache.releaseJob(this.jobId, this.executionId);
                                this.blobService.getPermanentBlobService().releaseJob(this.jobId);
                                LOG.info("Ensuring all FileSystem streams are closed for task {}", this);
                                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                                notifyFinalState();
                            } catch (Throwable th4) {
                                String format2 = String.format("FATAL - exception in resource cleanup of task %s (%s).", this.taskNameWithSubtask, this.executionId);
                                LOG.error(format2, th4);
                                notifyFatalError(format2, th4);
                            }
                            try {
                                this.metrics.close();
                            } catch (Throwable th5) {
                                LOG.error("Error during metrics de-registration of task {} ({}).", new Object[]{this.taskNameWithSubtask, this.executionId, th5});
                            }
                            throw th3;
                        }
                    } catch (Throwable th6) {
                        th = th6;
                        if (th instanceof WrappingRuntimeException) {
                            th = ((WrappingRuntimeException) th).unwrap();
                        }
                        try {
                            if (ExceptionUtils.isJvmFatalError(th) || ((th instanceof OutOfMemoryError) && this.taskManagerConfig.shouldExitJvmOnOutOfMemoryError())) {
                                try {
                                    LOG.error("Encountered fatal error {} - terminating the JVM", th.getClass().getName(), th);
                                    Runtime.getRuntime().halt(-1);
                                } catch (Throwable th7) {
                                    Runtime.getRuntime().halt(-1);
                                    throw th7;
                                }
                            }
                            while (true) {
                                ExecutionState executionState2 = this.executionState;
                                if (executionState2 != ExecutionState.RUNNING && executionState2 != ExecutionState.DEPLOYING) {
                                    if (executionState2 == ExecutionState.CANCELING) {
                                        if (transitionState(executionState2, ExecutionState.CANCELED)) {
                                            notifyObservers(ExecutionState.CANCELED, null);
                                            break;
                                        }
                                    } else if (executionState2 != ExecutionState.FAILED) {
                                        if (transitionState(executionState2, ExecutionState.FAILED, th)) {
                                            LOG.error("Unexpected state in task {} ({}) during an exception: {}.", new Object[]{this.taskNameWithSubtask, this.executionId, executionState2});
                                            break;
                                        }
                                    } else {
                                        break;
                                    }
                                } else if (!(th instanceof CancelTaskException)) {
                                    if (transitionState(executionState2, ExecutionState.FAILED, th)) {
                                        String format3 = String.format("Execution of %s (%s) failed.", this.taskNameWithSubtask, this.executionId);
                                        this.failureCause = th;
                                        cancelInvokable();
                                        notifyObservers(ExecutionState.FAILED, new Exception(format3, th));
                                        break;
                                    }
                                } else {
                                    if (transitionState(executionState2, ExecutionState.CANCELED)) {
                                        cancelInvokable();
                                        notifyObservers(ExecutionState.CANCELED, null);
                                        break;
                                    }
                                }
                            }
                        } catch (Throwable th8) {
                            String format4 = String.format("FATAL - exception in exception handler of task %s (%s).", this.taskNameWithSubtask, this.executionId);
                            LOG.error(format4, th8);
                            notifyFatalError(format4, th8);
                        }
                        try {
                            LOG.info("Freeing task resources for {} ({}).", this.taskNameWithSubtask, this.executionId);
                            ExecutorService executorService3 = this.asyncCallDispatcher;
                            if (executorService3 != null && !executorService3.isShutdown()) {
                                executorService3.shutdownNow();
                            }
                            this.network.unregisterTask(this);
                            if (0 != 0) {
                                this.memoryManager.releaseAll(null);
                            }
                            this.libraryCache.unregisterTask(this.jobId, this.executionId);
                            this.fileCache.releaseJob(this.jobId, this.executionId);
                            this.blobService.getPermanentBlobService().releaseJob(this.jobId);
                            LOG.info("Ensuring all FileSystem streams are closed for task {}", this);
                            FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                            notifyFinalState();
                        } catch (Throwable th9) {
                            String format5 = String.format("FATAL - exception in resource cleanup of task %s (%s).", this.taskNameWithSubtask, this.executionId);
                            LOG.error(format5, th9);
                            notifyFatalError(format5, th9);
                        }
                        try {
                            this.metrics.close();
                            return;
                        } catch (Throwable th10) {
                            LOG.error("Error during metrics de-registration of task {} ({}).", new Object[]{this.taskNameWithSubtask, this.executionId, th10});
                            return;
                        }
                    }
                }
            } else {
                if (executionState == ExecutionState.FAILED) {
                    notifyFinalState();
                    if (this.metrics != null) {
                        this.metrics.close();
                        return;
                    }
                    return;
                }
                if (executionState != ExecutionState.CANCELING) {
                    if (this.metrics != null) {
                        this.metrics.close();
                    }
                    throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
                }
                if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
                    notifyFinalState();
                    if (this.metrics != null) {
                        this.metrics.close();
                        return;
                    }
                    return;
                }
            }
        }
    }

    private void createAllResultPartitions(String str, Collection<ResultPartitionDeploymentDescriptor> collection, ResultPartitionConsumableNotifier resultPartitionConsumableNotifier) {
        BlockingShuffleType blockingShuffleTypeFromConfiguration = BlockingShuffleType.getBlockingShuffleTypeFromConfiguration(this.taskManagerConfiguration, LOG);
        int i = 0;
        for (ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor : collection) {
            ResultPartitionID resultPartitionID = new ResultPartitionID(resultPartitionDeploymentDescriptor.getPartitionId(), this.executionId);
            ResultPartitionType partitionType = resultPartitionDeploymentDescriptor.getPartitionType();
            if (partitionType.isBlocking() && blockingShuffleTypeFromConfiguration == BlockingShuffleType.YARN) {
                ExternalResultPartition externalResultPartition = new ExternalResultPartition(this.taskManagerConfig.getConfiguration(), str, this.jobId, resultPartitionID, partitionType, resultPartitionDeploymentDescriptor.getNumberOfSubpartitions(), resultPartitionDeploymentDescriptor.getMaxParallelism(), this.memoryManager, this.ioManager);
                this.producedPartitions[i] = externalResultPartition;
                this.externalPartitions.add(externalResultPartition);
                LOG.info("Create external result partition " + str + " " + resultPartitionID);
            } else {
                InternalResultPartition internalResultPartition = new InternalResultPartition(str, this, this.jobId, resultPartitionID, partitionType, resultPartitionDeploymentDescriptor.getNumberOfSubpartitions(), resultPartitionDeploymentDescriptor.getMaxParallelism(), this.network.getResultPartitionManager(), resultPartitionConsumableNotifier, this.ioManager, resultPartitionDeploymentDescriptor.sendScheduleOrUpdateConsumersMessage());
                this.producedPartitions[i] = internalResultPartition;
                this.internalPartitions.add(internalResultPartition);
                LOG.info("Create internal result partition " + str + " " + resultPartitionID);
            }
            i++;
        }
    }

    private void createAllInputGates(String str, Collection<InputGateDeploymentDescriptor> collection) {
        int integer = this.taskManagerConfiguration.getInteger(TaskManagerOptions.TASK_EXTERNAL_SHUFFLE_MAX_CONCURRENT_REQUESTS);
        int max = integer > 0 ? Math.max(this.inputGates.length, integer) : Integer.MAX_VALUE;
        BlockingShuffleType blockingShuffleTypeFromConfiguration = BlockingShuffleType.getBlockingShuffleTypeFromConfiguration(this.taskManagerConfiguration, LOG);
        PartitionRequestManager partitionRequestManager = new PartitionRequestManager(max, this.inputGates.length);
        int i = 0;
        Iterator<InputGateDeploymentDescriptor> it = collection.iterator();
        while (it.hasNext()) {
            SingleInputGate create = SingleInputGate.create(str, this.jobId, this.executionId, it.next(), this.network, this, this.metrics.getIOMetricGroup(), partitionRequestManager, blockingShuffleTypeFromConfiguration, this.executorService);
            this.inputGates[i] = create;
            this.inputGatesById.put(create.getConsumedResultId(), create);
            i++;
        }
    }

    private ClassLoader createUserCodeClassloader() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        this.libraryCache.registerTask(this.jobId, this.executionId, this.requiredJarFiles, this.requiredClasspaths);
        LOG.debug("Getting user code class loader for task {} at library cache manager took {} milliseconds", this.executionId, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        ClassLoader classLoader = this.libraryCache.getClassLoader(this.jobId);
        if (classLoader == null) {
            throw new Exception("No user code classloader available.");
        }
        return classLoader;
    }

    private void notifyFinalState() {
        this.taskManagerActions.notifyFinalState(this.executionId);
    }

    private void notifyFatalError(String str, Throwable th) {
        this.taskManagerActions.notifyFatalError(str, th);
    }

    private boolean transitionState(ExecutionState executionState, ExecutionState executionState2) {
        return transitionState(executionState, executionState2, null);
    }

    private boolean transitionState(ExecutionState executionState, ExecutionState executionState2, Throwable th) {
        if (!STATE_UPDATER.compareAndSet(this, executionState, executionState2)) {
            return false;
        }
        if (th == null) {
            LOG.info("{} ({}) switched from {} to {}.", new Object[]{this.taskNameWithSubtask, this.executionId, executionState, executionState2});
            return true;
        }
        LOG.info("{} ({}) switched from {} to {}.", new Object[]{this.taskNameWithSubtask, this.executionId, executionState, executionState2, th});
        return true;
    }

    public void stopExecution() {
        if (this.invokable == null) {
            throw new IllegalStateException(String.format("Cannot stop task %s (%s) because it is not yet running.", this.taskNameWithSubtask, this.executionId));
        }
        LOG.info("Attempting to stop task {} ({}).", this.taskNameWithSubtask, this.executionId);
        if (!(this.invokable instanceof StoppableTask)) {
            throw new UnsupportedOperationException(String.format("Stopping not supported by task %s (%s).", this.taskNameWithSubtask, this.executionId));
        }
        executeAsyncCallRunnable(new Runnable() { // from class: org.apache.flink.runtime.taskmanager.Task.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ((StoppableTask) Task.this.invokable).stop();
                } catch (RuntimeException e) {
                    Task.LOG.error("Stopping task {} ({}) failed.", new Object[]{Task.this.taskNameWithSubtask, Task.this.executionId, e});
                    Task.this.taskManagerActions.failTask(Task.this.executionId, e);
                }
            }
        }, String.format("Stopping source task %s (%s).", this.taskNameWithSubtask, this.executionId));
    }

    public void cancelExecution() {
        LOG.info("Attempting to cancel task {} ({}).", this.taskNameWithSubtask, this.executionId);
        cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null);
    }

    @Override // org.apache.flink.runtime.taskmanager.TaskActions
    public void failExternally(Throwable th) {
        LOG.info("Attempting to fail task externally {} ({}).", this.taskNameWithSubtask, this.executionId);
        cancelOrFailAndCancelInvokable(ExecutionState.FAILED, th);
    }

    /* JADX WARN: Code restructure failed: missing block: B:41:0x0013, code lost:
    
        org.apache.flink.runtime.taskmanager.Task.LOG.info("Task {} is already in state {}", r10.taskNameWithSubtask, r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:42:0x0023, code lost:
    
        return;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void cancelOrFailAndCancelInvokable(org.apache.flink.runtime.execution.ExecutionState r11, java.lang.Throwable r12) {
        /*
            Method dump skipped, instructions count: 514
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.taskmanager.Task.cancelOrFailAndCancelInvokable(org.apache.flink.runtime.execution.ExecutionState, java.lang.Throwable):void");
    }

    public void registerExecutionListener(TaskExecutionStateListener taskExecutionStateListener) {
        this.taskExecutionStateListeners.add(taskExecutionStateListener);
    }

    private void notifyObservers(ExecutionState executionState, Throwable th) {
        TaskExecutionState taskExecutionState = new TaskExecutionState(this.jobId, this.executionId, executionState, th);
        Iterator<TaskExecutionStateListener> it = this.taskExecutionStateListeners.iterator();
        while (it.hasNext()) {
            it.next().notifyTaskExecutionStateChanged(taskExecutionState);
        }
    }

    @Override // org.apache.flink.runtime.taskmanager.TaskActions
    public void triggerPartitionProducerStateCheck(JobID jobID, IntermediateDataSetID intermediateDataSetID, ResultPartitionID resultPartitionID) {
        if (this.checkPartitionProducerState) {
            this.partitionProducerStateChecker.requestPartitionProducerState(jobID, intermediateDataSetID, resultPartitionID).whenCompleteAsync((executionState, th) -> {
                try {
                    if (executionState != null) {
                        onPartitionStateUpdate(intermediateDataSetID, resultPartitionID, executionState);
                    } else if (th instanceof TimeoutException) {
                        onPartitionStateUpdate(intermediateDataSetID, resultPartitionID, ExecutionState.RUNNING);
                    } else if (th instanceof PartitionProducerDisposedException) {
                        LOG.info(String.format("Producer %s of partition %s disposed. Cancelling execution.", resultPartitionID.getProducerId(), resultPartitionID.getPartitionId()), th);
                        cancelExecution();
                    } else {
                        failExternally(th);
                    }
                } catch (IOException | InterruptedException e) {
                    failExternally(e);
                }
            }, this.executor);
        } else {
            this.executor.execute(() -> {
                try {
                    LOG.debug("Re-trigger partition request of {} directly without checking partition producer state.", resultPartitionID);
                    onPartitionStateUpdate(intermediateDataSetID, resultPartitionID, ExecutionState.RUNNING);
                } catch (IOException | InterruptedException e) {
                    failExternally(e);
                }
            });
        }
    }

    public void triggerCheckpointBarrier(final long j, long j2, final CheckpointOptions checkpointOptions) {
        final AbstractInvokable abstractInvokable = this.invokable;
        final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(j, j2);
        if (this.executionState != ExecutionState.RUNNING || abstractInvokable == null) {
            LOG.debug("Declining checkpoint request for non-running task {} ({}).", this.taskNameWithSubtask, this.executionId);
            this.checkpointResponder.declineCheckpoint(this.jobId, this.executionId, j, new CheckpointDeclineTaskNotReadyException(this.taskNameWithSubtask));
        } else {
            final String str = this.taskNameWithSubtask;
            final SafetyNetCloseableRegistry safetyNetCloseableRegistryForThread = FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
            executeAsyncCallRunnable(new Runnable() { // from class: org.apache.flink.runtime.taskmanager.Task.2
                @Override // java.lang.Runnable
                public void run() {
                    Task.LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
                    FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistryForThread);
                    try {
                        try {
                            if (!abstractInvokable.triggerCheckpoint(checkpointMetaData, checkpointOptions)) {
                                Task.this.checkpointResponder.declineCheckpoint(Task.this.getJobID(), Task.this.getExecutionId(), j, new CheckpointDeclineTaskNotReadyException(str));
                            }
                            FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread((SafetyNetCloseableRegistry) null);
                        } catch (Throwable th) {
                            if (Task.this.getExecutionState() == ExecutionState.RUNNING) {
                                Task.this.failExternally(new Exception("Error while triggering checkpoint " + j + " for " + Task.this.taskNameWithSubtask, th));
                            } else {
                                Task.LOG.debug("Encountered error while triggering checkpoint {} for {} ({}) while being not in state running.", new Object[]{Long.valueOf(j), Task.this.taskNameWithSubtask, Task.this.executionId, th});
                            }
                            FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread((SafetyNetCloseableRegistry) null);
                        }
                    } catch (Throwable th2) {
                        FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread((SafetyNetCloseableRegistry) null);
                        throw th2;
                    }
                }
            }, String.format("Checkpoint Trigger for %s (%s).", this.taskNameWithSubtask, this.executionId));
        }
    }

    @Override // org.apache.flink.runtime.state.CheckpointListener
    public void notifyCheckpointComplete(final long j) {
        final AbstractInvokable abstractInvokable = this.invokable;
        if (this.executionState != ExecutionState.RUNNING || abstractInvokable == null) {
            LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", this.taskNameWithSubtask);
        } else {
            executeAsyncCallRunnable(new Runnable() { // from class: org.apache.flink.runtime.taskmanager.Task.3
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        abstractInvokable.notifyCheckpointComplete(j);
                        Task.this.taskStateManager.notifyCheckpointComplete(j);
                    } catch (Throwable th) {
                        if (Task.this.getExecutionState() == ExecutionState.RUNNING) {
                            Task.this.failExternally(new RuntimeException("Error while confirming checkpoint", th));
                        }
                    }
                }
            }, "Checkpoint Confirmation for " + this.taskNameWithSubtask);
        }
    }

    @VisibleForTesting
    void onPartitionStateUpdate(IntermediateDataSetID intermediateDataSetID, ResultPartitionID resultPartitionID, ExecutionState executionState) throws IOException, InterruptedException {
        if (this.executionState != ExecutionState.RUNNING) {
            LOG.debug("Task {} ignored a partition producer state notification, because it's not running.", this.taskNameWithSubtask);
            return;
        }
        SingleInputGate singleInputGate = this.inputGatesById.get(intermediateDataSetID);
        if (singleInputGate == null) {
            failExternally(new IllegalStateException("Received partition producer state for unknown input gate " + intermediateDataSetID + ScopeFormat.SCOPE_SEPARATOR));
            return;
        }
        if (executionState == ExecutionState.SCHEDULED || executionState == ExecutionState.DEPLOYING || executionState == ExecutionState.RUNNING || executionState == ExecutionState.FINISHED) {
            singleInputGate.retriggerPartitionRequest(resultPartitionID.getPartitionId());
            return;
        }
        if (executionState != ExecutionState.CANCELING && executionState != ExecutionState.CANCELED && executionState != ExecutionState.FAILED) {
            failExternally(new IllegalStateException(String.format("Producer with attempt ID %s of partition %s in unexpected state %s.", resultPartitionID.getProducerId(), resultPartitionID.getPartitionId(), executionState)));
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Cancelling task {} after the producer of partition {} with attempt ID {} has entered state {}.", new Object[]{this.taskNameWithSubtask, resultPartitionID.getPartitionId(), resultPartitionID.getProducerId(), executionState});
        }
        cancelExecution();
    }

    private void executeAsyncCallRunnable(Runnable runnable, String str) {
        synchronized (this) {
            if (this.executionState != ExecutionState.RUNNING) {
                return;
            }
            ExecutorService executorService = this.asyncCallDispatcher;
            if (executorService == null) {
                Preconditions.checkState(this.userCodeClassLoader != null, "userCodeClassLoader must not be null");
                executorService = Executors.newSingleThreadExecutor(new DispatcherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + this.taskNameWithSubtask, this.userCodeClassLoader));
                this.asyncCallDispatcher = executorService;
                if (this.executionState != ExecutionState.RUNNING) {
                    executorService.shutdown();
                    this.asyncCallDispatcher = null;
                    return;
                }
            }
            LOG.debug("Invoking async call {} on task {}", str, this.taskNameWithSubtask);
            try {
                executorService.submit(runnable);
            } catch (RejectedExecutionException e) {
                if (this.executionState == ExecutionState.RUNNING) {
                    throw new RuntimeException("Async call was rejected, even though the task is running.", e);
                }
            }
        }
    }

    private void cancelInvokable() {
        if (this.invokable == null || this.invokable == null || !this.invokableHasBeenCanceled.compareAndSet(false, true)) {
            return;
        }
        try {
            this.invokable.cancel();
        } catch (Throwable th) {
            LOG.error("Error while canceling task {}.", this.taskNameWithSubtask, th);
        }
    }

    public String toString() {
        return String.format("%s (%s) [%s]", this.taskNameWithSubtask, this.executionId, this.executionState);
    }

    private static AbstractInvokable loadAndInstantiateInvokable(ClassLoader classLoader, String str, Environment environment) throws Throwable {
        try {
            try {
                try {
                    return (AbstractInvokable) Class.forName(str, true, classLoader).asSubclass(AbstractInvokable.class).getConstructor(Environment.class).newInstance(environment);
                } catch (InvocationTargetException e) {
                    throw e.getTargetException();
                } catch (Exception e2) {
                    throw new FlinkException("Could not instantiate the task's invokable class.", e2);
                }
            } catch (NoSuchMethodException e3) {
                throw new FlinkException("Task misses proper constructor", e3);
            }
        } catch (Throwable th) {
            throw new Exception("Could not load the task's invokable class.", th);
        }
    }
}
