package org.apache.flink.runtime.minicluster;

import akka.actor.ActorSystem;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.ClientUtils;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.LeaderShipLostHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster.class */
public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
    private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);
    private final MiniClusterConfiguration miniClusterConfiguration;

    @GuardedBy("lock")
    private MetricRegistryImpl metricRegistry;

    @GuardedBy("lock")
    private RpcService commonRpcService;

    @GuardedBy("lock")
    private RpcService jobManagerRpcService;

    @GuardedBy("lock")
    private RpcService[] taskManagerRpcServices;

    @GuardedBy("lock")
    private RpcService resourceManagerRpcService;

    @GuardedBy("lock")
    private HighAvailabilityServices haServices;

    @GuardedBy("lock")
    private BlobServer blobServer;

    @GuardedBy("lock")
    private HeartbeatServices heartbeatServices;

    @GuardedBy("lock")
    private BlobCacheService blobCacheService;

    @GuardedBy("lock")
    private ResourceManagerRunner resourceManagerRunner;
    private volatile TaskExecutor[] taskManagers;

    @GuardedBy("lock")
    private DispatcherRestEndpoint dispatcherRestEndpoint;

    @GuardedBy("lock")
    private URI restAddressURI;

    @GuardedBy("lock")
    private LeaderRetrievalService resourceManagerLeaderRetriever;

    @GuardedBy("lock")
    private LeaderRetrievalService dispatcherLeaderRetriever;

    @GuardedBy("lock")
    private StandaloneDispatcher dispatcher;

    @GuardedBy("lock")
    private JobManagerMetricGroup jobManagerMetricGroup;

    @GuardedBy("lock")
    private RpcGatewayRetriever<DispatcherId, DispatcherGateway> dispatcherGatewayRetriever;
    private final Object lock = new Object();
    private Map<JobID, CompletableFuture<JobResult>> jobResultMap = new HashMap();
    private final Time rpcTimeout = Time.seconds(10);
    private CompletableFuture<Void> terminationFuture = CompletableFuture.completedFuture(null);
    private volatile boolean running = false;

    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$IgnoreLeaderShipLostHandler.class */
    private class IgnoreLeaderShipLostHandler implements LeaderShipLostHandler {
        private IgnoreLeaderShipLostHandler() {
        }

        @Override // org.apache.flink.runtime.rpc.LeaderShipLostHandler
        public void onLeaderShipLost(Throwable th) {
            MiniCluster.LOG.warn("Something in MiniCluster lost its leader ship", th);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$ShutDownFatalErrorHandler.class */
    private class ShutDownFatalErrorHandler implements FatalErrorHandler {
        private ShutDownFatalErrorHandler() {
        }

        @Override // org.apache.flink.runtime.rpc.FatalErrorHandler
        public void onFatalError(Throwable th) {
            MiniCluster.LOG.warn("Error in MiniCluster. Shutting the MiniCluster down.", th);
            MiniCluster.this.closeAsync();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$TerminatingFatalErrorHandler.class */
    public class TerminatingFatalErrorHandler implements FatalErrorHandler {
        private final int index;

        private TerminatingFatalErrorHandler(int i) {
            this.index = i;
        }

        @Override // org.apache.flink.runtime.rpc.FatalErrorHandler
        public void onFatalError(Throwable th) {
            if (MiniCluster.this.running) {
                MiniCluster.LOG.error("TaskManager #{} failed.", Integer.valueOf(this.index), th);
                TaskExecutor[] taskExecutorArr = MiniCluster.this.taskManagers;
                if (taskExecutorArr != null) {
                    taskExecutorArr[this.index].shutDown();
                }
            }
        }
    }

    public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) {
        this.miniClusterConfiguration = (MiniClusterConfiguration) Preconditions.checkNotNull(miniClusterConfiguration, "config may not be null");
    }

    public URI getRestAddress() {
        URI uri;
        synchronized (this.lock) {
            Preconditions.checkState(this.running, "MiniCluster is not yet running.");
            uri = this.restAddressURI;
        }
        return uri;
    }

    public HighAvailabilityServices getHighAvailabilityServices() {
        HighAvailabilityServices highAvailabilityServices;
        synchronized (this.lock) {
            Preconditions.checkState(this.running, "MiniCluster is not yet running.");
            highAvailabilityServices = this.haServices;
        }
        return highAvailabilityServices;
    }

    public boolean isRunning() {
        return this.running;
    }

    public void start() throws Exception {
        RpcService createRpcService;
        RpcService createRpcService2;
        synchronized (this.lock) {
            Preconditions.checkState(!this.running, "FlinkMiniCluster is already running");
            LOG.info("Starting Flink Mini Cluster");
            LOG.debug("Using configuration {}", this.miniClusterConfiguration);
            UnmodifiableConfiguration configuration = this.miniClusterConfiguration.getConfiguration();
            Time rpcTimeout = this.miniClusterConfiguration.getRpcTimeout();
            int numTaskManagers = this.miniClusterConfiguration.getNumTaskManagers();
            boolean z = this.miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED;
            try {
                initializeIOFormatClasses(configuration);
                LOG.info("Starting Metrics Registry");
                this.metricRegistry = createMetricRegistry(configuration);
                RpcService[] rpcServiceArr = new RpcService[numTaskManagers];
                LOG.info("Starting RPC Service(s)");
                this.commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
                ActorSystem actorSystem = ((AkkaRpcService) this.commonRpcService).getActorSystem();
                this.metricRegistry.startQueryService(actorSystem, null);
                if (z) {
                    for (int i = 0; i < numTaskManagers; i++) {
                        rpcServiceArr[i] = this.commonRpcService;
                    }
                    createRpcService = this.commonRpcService;
                    createRpcService2 = this.commonRpcService;
                    this.resourceManagerRpcService = null;
                    this.jobManagerRpcService = null;
                    this.taskManagerRpcServices = null;
                } else {
                    String jobManagerBindAddress = this.miniClusterConfiguration.getJobManagerBindAddress();
                    String taskManagerBindAddress = this.miniClusterConfiguration.getTaskManagerBindAddress();
                    String resourceManagerBindAddress = this.miniClusterConfiguration.getResourceManagerBindAddress();
                    createRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress);
                    createRpcService2 = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress);
                    for (int i2 = 0; i2 < numTaskManagers; i2++) {
                        rpcServiceArr[i2] = createRpcService(configuration, rpcTimeout, true, taskManagerBindAddress);
                    }
                    this.jobManagerRpcService = createRpcService;
                    this.taskManagerRpcServices = rpcServiceArr;
                    this.resourceManagerRpcService = createRpcService2;
                }
                LOG.info("Starting high-availability services");
                this.haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration, this.commonRpcService.getExecutor());
                this.blobServer = new BlobServer(configuration, this.haServices.createBlobStore());
                this.blobServer.start();
                this.heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
                LOG.info("Starting ResourceManger");
                this.resourceManagerRunner = startResourceManager(configuration, this.haServices, this.heartbeatServices, this.metricRegistry, createRpcService2, new ClusterInformation("localhost", this.blobServer.getPort()));
                this.blobCacheService = new BlobCacheService(configuration, this.haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), this.blobServer.getPort()));
                LOG.info("Starting {} TaskManger(s)", Integer.valueOf(numTaskManagers));
                this.taskManagers = startTaskManagers(configuration, this.haServices, this.heartbeatServices, this.metricRegistry, this.blobCacheService, numTaskManagers, rpcServiceArr);
                LOG.info("Starting dispatcher rest endpoint.");
                this.dispatcherGatewayRetriever = new RpcGatewayRetriever<>(createRpcService, DispatcherGateway.class, DispatcherId::fromUuid, 20, Time.milliseconds(20L));
                RpcGatewayRetriever rpcGatewayRetriever = new RpcGatewayRetriever(createRpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, 20, Time.milliseconds(20L));
                this.dispatcherRestEndpoint = new DispatcherRestEndpoint(RestServerEndpointConfiguration.fromConfiguration(configuration), this.dispatcherGatewayRetriever, configuration, RestHandlerConfiguration.fromConfiguration(configuration), rpcGatewayRetriever, this.blobServer.getTransientBlobService(), this.commonRpcService.getExecutor(), new AkkaQueryServiceRetriever(actorSystem, Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))), this.haServices.getWebMonitorLeaderElectionService(), new ShutDownFatalErrorHandler());
                this.dispatcherRestEndpoint.start();
                this.restAddressURI = new URI(this.dispatcherRestEndpoint.getRestBaseUrl());
                LOG.info("Starting job dispatcher(s) for JobManger");
                this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(this.metricRegistry, "localhost");
                this.dispatcher = new StandaloneDispatcher(createRpcService, Dispatcher.DISPATCHER_NAME + UUID.randomUUID(), configuration, this.haServices, this.resourceManagerRunner.getResourceManageGateway(), this.blobServer, this.heartbeatServices, this.jobManagerMetricGroup, this.metricRegistry.getMetricQueryServicePath(), new MemoryArchivedExecutionGraphStore(), Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, new ShutDownFatalErrorHandler(), this.dispatcherRestEndpoint.getRestBaseUrl(), HistoryServerArchivist.createHistoryServerArchivist(configuration, this.dispatcherRestEndpoint), new IgnoreLeaderShipLostHandler());
                this.dispatcher.start();
                this.resourceManagerLeaderRetriever = this.haServices.getResourceManagerLeaderRetriever();
                this.dispatcherLeaderRetriever = this.haServices.getDispatcherLeaderRetriever();
                this.resourceManagerLeaderRetriever.start(rpcGatewayRetriever);
                this.dispatcherLeaderRetriever.start(this.dispatcherGatewayRetriever);
                this.terminationFuture = new CompletableFuture<>();
                this.running = true;
                LOG.info("Flink Mini Cluster started successfully");
            } catch (Exception e) {
                try {
                    close();
                } catch (Exception e2) {
                    e.addSuppressed(e2);
                }
                throw e;
            }
        }
    }

    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> completableFuture;
        synchronized (this.lock) {
            if (this.running) {
                LOG.info("Shutting down Flink Mini Cluster");
                try {
                    ArrayList arrayList = new ArrayList(2 + this.miniClusterConfiguration.getNumTaskManagers());
                    if (this.taskManagers != null) {
                        for (TaskExecutor taskExecutor : this.taskManagers) {
                            if (taskExecutor != null) {
                                taskExecutor.shutDown();
                                arrayList.add(taskExecutor.getTerminationFuture());
                            }
                        }
                        this.taskManagers = null;
                    }
                    arrayList.add(shutDownDispatcher());
                    if (this.resourceManagerRunner != null) {
                        arrayList.add(this.resourceManagerRunner.closeAsync());
                        this.resourceManagerRunner = null;
                    }
                    FutureUtils.runAfterwards(FutureUtils.runAfterwards(FutureUtils.completeAll(arrayList), () -> {
                        synchronized (this.lock) {
                            if (this.jobManagerMetricGroup != null) {
                                this.jobManagerMetricGroup.close();
                                this.jobManagerMetricGroup = null;
                            }
                            if (this.metricRegistry != null) {
                                this.metricRegistry.shutdown();
                                this.metricRegistry = null;
                            }
                        }
                    }).thenCompose(r3 -> {
                        return terminateRpcServices();
                    }), this::terminateMiniClusterServices).whenComplete((r4, th) -> {
                        if (th != null) {
                            this.terminationFuture.completeExceptionally(ExceptionUtils.stripCompletionException(th));
                        } else {
                            this.terminationFuture.complete(null);
                        }
                    });
                    this.running = false;
                } catch (Throwable th2) {
                    this.running = false;
                    throw th2;
                }
            }
            completableFuture = this.terminationFuture;
        }
        return completableFuture;
    }

    public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
        try {
            return getDispatcherGateway().requestMultipleJobDetails(this.rpcTimeout).thenApply(multipleJobsDetails -> {
                return (List) multipleJobsDetails.getJobs().stream().map(jobDetails -> {
                    return new JobStatusMessage(jobDetails.getJobId(), jobDetails.getJobName(), jobDetails.getStatus(), jobDetails.getStartTime());
                }).collect(Collectors.toList());
            });
        } catch (InterruptedException | LeaderRetrievalException e) {
            return FutureUtils.completedExceptionally(new FlinkException("Could not retrieve job list.", e));
        }
    }

    public CompletableFuture<JobStatus> getJobStatus(JobID jobID) {
        try {
            return getDispatcherGateway().requestJobStatus(jobID, this.rpcTimeout);
        } catch (InterruptedException | LeaderRetrievalException e) {
            return FutureUtils.completedExceptionally(new FlinkException(String.format("Could not retrieve job status for job %s.", jobID), e));
        }
    }

    public CompletableFuture<Acknowledge> cancelJob(JobID jobID) {
        try {
            return getDispatcherGateway().cancelJob(jobID, this.rpcTimeout);
        } catch (InterruptedException | LeaderRetrievalException e) {
            return FutureUtils.completedExceptionally(new FlinkException(String.format("Could not cancel job %s.", jobID), e));
        }
    }

    public CompletableFuture<Acknowledge> stopJob(JobID jobID) {
        try {
            return getDispatcherGateway().stopJob(jobID, this.rpcTimeout);
        } catch (InterruptedException | LeaderRetrievalException e) {
            return FutureUtils.completedExceptionally(new FlinkException(String.format("Could not stop job %s.", jobID), e));
        }
    }

    public CompletableFuture<String> triggerSavepoint(JobID jobID, String str, boolean z) {
        try {
            return getDispatcherGateway().triggerSavepoint(jobID, str, z, this.rpcTimeout);
        } catch (InterruptedException | LeaderRetrievalException e) {
            return FutureUtils.completedExceptionally(new FlinkException(String.format("Could not trigger savepoint for job %s.", jobID), e));
        }
    }

    public CompletableFuture<Acknowledge> disposeSavepoint(String str) {
        try {
            return getDispatcherGateway().disposeSavepoint(str, this.rpcTimeout);
        } catch (InterruptedException | LeaderRetrievalException e) {
            ExceptionUtils.checkInterrupted(e);
            return FutureUtils.completedExceptionally(new FlinkException(String.format("Could not dispose savepoint %s.", str), e));
        }
    }

    public CompletableFuture<? extends AccessExecutionGraph> getExecutionGraph(JobID jobID) {
        try {
            return getDispatcherGateway().requestJob(jobID, this.rpcTimeout);
        } catch (InterruptedException | LeaderRetrievalException e) {
            return FutureUtils.completedExceptionally(new FlinkException(String.format("Could not retrieve job job %s.", jobID), e));
        }
    }

    public void runDetached(JobGraph jobGraph) throws JobExecutionException, InterruptedException {
        Preconditions.checkNotNull(jobGraph, "job is null");
        try {
            submitJob(jobGraph).get();
        } catch (ExecutionException e) {
            throw new JobExecutionException(jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.minicluster.JobExecutor
    public JobSubmissionResult executeJob(JobGraph jobGraph, boolean z) throws JobExecutionException, InterruptedException {
        Preconditions.checkNotNull(jobGraph, "job is null");
        CompletableFuture<JobSubmissionResult> submitJob = submitJob(jobGraph);
        if (z) {
            try {
                return submitJob.get();
            } catch (ExecutionException e) {
                throw new JobExecutionException(jobGraph.getJobID(), "Fail to submit Job", ExceptionUtils.stripExecutionException(e));
            }
        }
        CompletableFuture<U> thenCompose = submitJob.thenCompose(jobSubmissionResult -> {
            return requestJobResult(jobGraph.getJobID());
        });
        this.jobResultMap.put(jobGraph.getJobID(), thenCompose);
        try {
            try {
                return ((JobResult) thenCompose.get()).toJobExecutionResult(Thread.currentThread().getContextClassLoader());
            } catch (IOException | ClassNotFoundException e2) {
                throw new JobExecutionException(jobGraph.getJobID(), e2);
            } catch (JobResult.WrappedJobException e3) {
                throw new JobExecutionException(jobGraph.getJobID(), e3.getCause());
            }
        } catch (ExecutionException e4) {
            throw new JobExecutionException(jobGraph.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e4));
        }
    }

    public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
        try {
            DispatcherGateway dispatcherGateway = getDispatcherGateway();
            jobGraph.setAllowQueuedScheduling(true);
            return uploadAndSetJobFiles(createBlobServerAddress(dispatcherGateway), jobGraph).thenCompose(r7 -> {
                return dispatcherGateway.submitJob(jobGraph, this.rpcTimeout);
            }).thenApply((Function<? super U, ? extends U>) acknowledge -> {
                return new JobSubmissionResult(jobGraph.getJobID());
            });
        } catch (InterruptedException | LeaderRetrievalException e) {
            ExceptionUtils.checkInterrupted(e);
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override // org.apache.flink.runtime.minicluster.JobExecutor
    public void cancel(JobID jobID) throws Exception {
        try {
            getDispatcherGateway().cancelJob(jobID, Time.seconds(10L));
        } catch (InterruptedException | LeaderRetrievalException e) {
            ExceptionUtils.checkInterrupted(e);
            throw new Exception((Throwable) e);
        }
    }

    public CompletableFuture<JobResult> requestJobResult(JobID jobID) {
        try {
            return getDispatcherGateway().requestJobResult(jobID, RpcUtils.INF_TIMEOUT);
        } catch (InterruptedException | LeaderRetrievalException e) {
            ExceptionUtils.checkInterrupted(e);
            return FutureUtils.completedExceptionally(e);
        }
    }

    private DispatcherGateway getDispatcherGateway() throws LeaderRetrievalException, InterruptedException {
        DispatcherGateway dispatcherGateway;
        synchronized (this.lock) {
            Preconditions.checkState(this.running, "MiniCluster is not yet running.");
            try {
                dispatcherGateway = (DispatcherGateway) this.dispatcherGatewayRetriever.getFuture().get();
            } catch (ExecutionException e) {
                throw new LeaderRetrievalException("Could not retrieve the leading dispatcher.", ExceptionUtils.stripExecutionException(e));
            }
        }
        return dispatcherGateway;
    }

    private CompletableFuture<Void> uploadAndSetJobFiles(CompletableFuture<InetSocketAddress> completableFuture, JobGraph jobGraph) {
        return completableFuture.thenAccept(inetSocketAddress -> {
            try {
                ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> {
                    return new BlobClient(inetSocketAddress, this.miniClusterConfiguration.getConfiguration());
                });
            } catch (FlinkException e) {
                throw new CompletionException((Throwable) e);
            }
        });
    }

    private CompletableFuture<InetSocketAddress> createBlobServerAddress(DispatcherGateway dispatcherGateway) {
        return dispatcherGateway.getBlobServerPort(this.rpcTimeout).thenApply(num -> {
            return new InetSocketAddress(dispatcherGateway.getHostname(), num.intValue());
        });
    }

    protected MetricRegistryImpl createMetricRegistry(Configuration configuration) {
        return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
    }

    protected RpcService createRpcService(Configuration configuration, Time time, boolean z, String str) {
        return new AkkaRpcService(AkkaUtils.createActorSystem(AkkaUtils.testDispatcherConfig().withFallback(z ? AkkaUtils.getAkkaConfig(configuration, str, 0) : AkkaUtils.getAkkaConfig(configuration))), time);
    }

    protected ResourceManagerRunner startResourceManager(Configuration configuration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, RpcService rpcService, ClusterInformation clusterInformation) throws Exception {
        ResourceManagerRunner resourceManagerRunner = new ResourceManagerRunner(ResourceID.generate(), "resourcemanager_" + UUID.randomUUID(), configuration, rpcService, highAvailabilityServices, heartbeatServices, metricRegistry, clusterInformation);
        resourceManagerRunner.start();
        return resourceManagerRunner;
    }

    protected TaskExecutor[] startTaskManagers(Configuration configuration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, BlobCacheService blobCacheService, int i, RpcService[] rpcServiceArr) throws Exception {
        TaskExecutor[] taskExecutorArr = new TaskExecutor[i];
        boolean z = i == 1;
        for (int i2 = 0; i2 < i; i2++) {
            taskExecutorArr[i2] = TaskManagerRunner.startTaskManager(configuration, new ResourceID(UUID.randomUUID().toString()), rpcServiceArr[i2], highAvailabilityServices, heartbeatServices, metricRegistry, blobCacheService, Executors.newSingleThreadExecutor(), z, new TerminatingFatalErrorHandler(i2));
            taskExecutorArr[i2].start();
        }
        return taskExecutorArr;
    }

    @GuardedBy("lock")
    private CompletableFuture<Void> shutDownDispatcher() {
        ArrayList arrayList = new ArrayList(2);
        if (this.dispatcher != null) {
            this.dispatcher.shutDown();
            arrayList.add(this.dispatcher.getTerminationFuture());
            this.dispatcher = null;
        }
        if (this.dispatcherRestEndpoint != null) {
            arrayList.add(this.dispatcherRestEndpoint.closeAsync());
            this.dispatcherRestEndpoint = null;
        }
        return FutureUtils.runAfterwards(FutureUtils.completeAll(arrayList), () -> {
            Exception exc = null;
            synchronized (this.lock) {
                if (this.resourceManagerLeaderRetriever != null) {
                    try {
                        this.resourceManagerLeaderRetriever.stop();
                    } catch (Exception e) {
                        exc = (Exception) ExceptionUtils.firstOrSuppressed(e, (Throwable) null);
                    }
                    this.resourceManagerLeaderRetriever = null;
                }
                if (this.dispatcherLeaderRetriever != null) {
                    try {
                        this.dispatcherLeaderRetriever.stop();
                    } catch (Exception e2) {
                        exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
                    }
                    this.dispatcherLeaderRetriever = null;
                }
            }
            if (exc != null) {
                throw exc;
            }
        });
    }

    private void terminateMiniClusterServices() throws Exception {
        Exception exc = null;
        synchronized (this.lock) {
            if (this.blobCacheService != null) {
                try {
                    this.blobCacheService.close();
                } catch (Exception e) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(e, (Throwable) null);
                }
                this.blobCacheService = null;
            }
            if (this.blobServer != null) {
                try {
                    this.blobServer.close();
                } catch (Exception e2) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
                }
                this.blobServer = null;
            }
            if (this.haServices != null) {
                try {
                    this.haServices.closeAndCleanupAllData();
                } catch (Exception e3) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(e3, exc);
                }
                this.haServices = null;
            }
            if (exc != null) {
                throw exc;
            }
        }
    }

    @Nonnull
    private CompletionStage<Void> terminateRpcServices() {
        ArrayList arrayList = new ArrayList(this.miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED ? 1 : 3 + this.miniClusterConfiguration.getNumTaskManagers());
        synchronized (this.lock) {
            arrayList.add(this.commonRpcService.stopService());
            if (this.miniClusterConfiguration.getRpcServiceSharing() != MiniClusterConfiguration.RpcServiceSharing.SHARED) {
                arrayList.add(this.jobManagerRpcService.stopService());
                arrayList.add(this.resourceManagerRpcService.stopService());
                for (RpcService rpcService : this.taskManagerRpcServices) {
                    arrayList.add(rpcService.stopService());
                }
            }
            this.commonRpcService = null;
            this.jobManagerRpcService = null;
            this.taskManagerRpcServices = null;
            this.resourceManagerRpcService = null;
        }
        return FutureUtils.completeAll(arrayList);
    }

    private void initializeIOFormatClasses(Configuration configuration) {
        FileOutputFormat.initDefaultsFromConfiguration(configuration);
    }

    private static Throwable shutDownRpc(RpcService rpcService, Throwable th) {
        if (rpcService != null) {
            try {
                rpcService.stopService().get();
            } catch (Throwable th2) {
                return ExceptionUtils.firstOrSuppressed(th2, th);
            }
        }
        return th;
    }

    private static Throwable shutDownRpcs(RpcService[] rpcServiceArr, Throwable th) {
        if (rpcServiceArr != null) {
            Throwable th2 = th;
            for (RpcService rpcService : rpcServiceArr) {
                if (rpcService != null) {
                    try {
                        rpcService.stopService().get();
                    } catch (Throwable th3) {
                        th2 = ExceptionUtils.firstOrSuppressed(th3, th2);
                    }
                }
            }
        }
        return th;
    }
}
