package org.apache.flink.streaming.api.environment;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.JobListener;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Public
/* loaded from: input_file:org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.class */
public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
    private final String host;
    private final int port;
    private final Configuration clientConfiguration;
    private final List<URL> jarFiles;
    private final List<URL> globalClasspaths;

    public RemoteStreamEnvironment(String str, int i, String... strArr) {
        this(str, i, null, strArr);
    }

    public RemoteStreamEnvironment(String str, int i, Configuration configuration, String... strArr) {
        this(str, i, configuration, strArr, null);
    }

    public RemoteStreamEnvironment(String str, int i, Configuration configuration, String[] strArr, URL[] urlArr) {
        if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
            throw new InvalidProgramException("The RemoteEnvironment cannot be used when submitting a program through a client, or running in a TestEnvironment context.");
        }
        if (str == null) {
            throw new NullPointerException("Host must not be null.");
        }
        if (i < 1 || i >= 65535) {
            throw new IllegalArgumentException("Port out of range");
        }
        this.host = str;
        this.port = i;
        this.clientConfiguration = configuration == null ? new Configuration() : configuration;
        this.jarFiles = new ArrayList(strArr.length);
        for (String str2 : strArr) {
            try {
                URL url = new File(str2).getAbsoluteFile().toURI().toURL();
                this.jarFiles.add(url);
                JobWithJars.checkJarFile(url);
            } catch (MalformedURLException e) {
                throw new IllegalArgumentException("JAR file path is invalid '" + str2 + "'", e);
            } catch (IOException e2) {
                throw new RuntimeException("Problem with jar file " + str2, e2);
            }
        }
        if (urlArr == null) {
            this.globalClasspaths = Collections.emptyList();
        } else {
            this.globalClasspaths = Arrays.asList(urlArr);
        }
    }

    @Override // org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    /* renamed from: executeInternal */
    protected JobSubmissionResult mo15executeInternal(String str, boolean z, SavepointRestoreSettings savepointRestoreSettings) throws ProgramInvocationException {
        StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(str);
        this.transformations.clear();
        return executeRemotely(streamGraph, this.jarFiles, z, savepointRestoreSettings);
    }

    @Override // org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
        this.transformations.clear();
        return executeRemotely(streamGraph, this.jarFiles, false, SavepointRestoreSettings.none());
    }

    protected JobSubmissionResult executeRemotely(StreamGraph streamGraph, List<URL> list, boolean z, SavepointRestoreSettings savepointRestoreSettings) throws ProgramInvocationException {
        if (LOG.isInfoEnabled()) {
            LOG.info("Running remotely at {}:{}", this.host, Integer.valueOf(this.port));
        }
        ClassLoader buildUserCodeClassLoader = JobWithJars.buildUserCodeClassLoader(list, this.globalClasspaths, getClass().getClassLoader());
        ClusterClient<?> clusterClient = null;
        try {
            try {
                try {
                    clusterClient = prepareClusterClient(z);
                    JobSubmissionResult run = clusterClient.run(streamGraph, list, this.globalClasspaths, buildUserCodeClassLoader, savepointRestoreSettings, z);
                    if (z) {
                        try {
                            clusterClient.shutdown();
                        } catch (Exception e) {
                            LOG.warn("Could not properly shut down the cluster client.", e);
                        }
                        return run;
                    }
                    JobExecutionResult jobExecutionResult = run.getJobExecutionResult();
                    try {
                        clusterClient.shutdown();
                    } catch (Exception e2) {
                        LOG.warn("Could not properly shut down the cluster client.", e2);
                    }
                    return jobExecutionResult;
                } catch (ProgramInvocationException e3) {
                    throw e3;
                }
            } catch (Exception e4) {
                throw new ProgramInvocationException("The program execution failed" + (e4.getMessage() == null ? "." : ": " + e4.getMessage()), e4);
            }
        } catch (Throwable th) {
            try {
                clusterClient.shutdown();
            } catch (Exception e5) {
                LOG.warn("Could not properly shut down the cluster client.", e5);
            }
            throw th;
        }
    }

    private ClusterClient<?> prepareClusterClient(boolean z) throws Exception {
        Configuration configuration = new Configuration();
        configuration.addAll(this.clientConfiguration);
        configuration.setString(JobManagerOptions.ADDRESS, this.host);
        configuration.setInteger(JobManagerOptions.PORT, this.port);
        try {
            StandaloneClusterClient standaloneClusterClient = "legacy".equals(configuration.getString(CoreOptions.MODE)) ? new StandaloneClusterClient(configuration) : new RestClusterClient(configuration, "RemoteStreamEnvironment");
            standaloneClusterClient.setJobListeners(getJobListeners());
            standaloneClusterClient.setDetached(z);
            standaloneClusterClient.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
            return standaloneClusterClient;
        } catch (Exception e) {
            throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e);
        }
    }

    @Override // org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    public void stopJob(JobID jobID) throws Exception {
        ClusterClient<?> clusterClient = null;
        try {
            try {
                clusterClient = prepareClusterClient(true);
                clusterClient.stop(jobID);
                if (clusterClient != null) {
                    clusterClient.shutdown();
                }
            } catch (Exception e) {
                LOG.error("Stop Job Fails with JobID = " + jobID, e);
                throw e;
            }
        } catch (Throwable th) {
            if (clusterClient != null) {
                clusterClient.shutdown();
            }
            throw th;
        }
    }

    @Override // org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    public void cancel(String str) throws Exception {
        if (LOG.isInfoEnabled()) {
            LOG.info("Running remotely at {}:{}", this.host, Integer.valueOf(this.port));
        }
        JobWithJars.buildUserCodeClassLoader(this.jarFiles, this.globalClasspaths, getClass().getClassLoader());
        Configuration configuration = new Configuration();
        configuration.addAll(this.clientConfiguration);
        configuration.setString(JobManagerOptions.ADDRESS, this.host);
        configuration.setInteger(JobManagerOptions.PORT, this.port);
        configuration.setInteger(RestOptions.PORT, this.port);
        try {
            RestClusterClient restClusterClient = new RestClusterClient(configuration, "RemoteStreamEnvironment");
            restClusterClient.setDetached(true);
            LOG.info("Cancel Job: " + str);
            restClusterClient.cancel(JobID.fromHexString(str));
            Iterator<JobListener> it = getJobListeners().iterator();
            while (it.hasNext()) {
                it.next().onJobCanceled(JobID.fromHexString(str), (String) null);
            }
        } catch (Exception e) {
            throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e);
        }
    }

    @Override // org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    public String cancelWithSavepoint(String str, String str2) throws Exception {
        if (LOG.isInfoEnabled()) {
            LOG.info("Running remotely at {}:{}", this.host, Integer.valueOf(this.port));
        }
        JobWithJars.buildUserCodeClassLoader(this.jarFiles, this.globalClasspaths, getClass().getClassLoader());
        Configuration configuration = new Configuration();
        configuration.addAll(this.clientConfiguration);
        configuration.setString(JobManagerOptions.ADDRESS, this.host);
        configuration.setInteger(JobManagerOptions.PORT, this.port);
        configuration.setInteger(RestOptions.PORT, this.port);
        try {
            RestClusterClient restClusterClient = new RestClusterClient(configuration, "RemoteStreamEnvironment");
            restClusterClient.setDetached(true);
            LOG.info("CancelWithSavePoint Job: " + str);
            String cancelWithSavepoint = restClusterClient.cancelWithSavepoint(JobID.fromHexString(str), str2);
            Iterator<JobListener> it = getJobListeners().iterator();
            while (it.hasNext()) {
                it.next().onJobCanceled(JobID.fromHexString(str), cancelWithSavepoint);
            }
            return cancelWithSavepoint;
        } catch (Exception e) {
            throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e);
        }
    }

    @Override // org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    public String triggerSavepoint(String str, String str2) throws Exception {
        if (LOG.isInfoEnabled()) {
            LOG.info("Running remotely at {}:{}", this.host, Integer.valueOf(this.port));
        }
        JobWithJars.buildUserCodeClassLoader(this.jarFiles, this.globalClasspaths, getClass().getClassLoader());
        Configuration configuration = new Configuration();
        configuration.addAll(this.clientConfiguration);
        configuration.setString(JobManagerOptions.ADDRESS, this.host);
        configuration.setInteger(JobManagerOptions.PORT, this.port);
        configuration.setInteger(RestOptions.PORT, this.port);
        try {
            RestClusterClient restClusterClient = new RestClusterClient(configuration, "RemoteStreamEnvironment");
            restClusterClient.setDetached(true);
            LOG.info("Trigger Savepoint for Job: " + str);
            return (String) restClusterClient.triggerSavepoint(JobID.fromHexString(str), str2).get();
        } catch (Exception e) {
            throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e);
        }
    }

    public String toString() {
        return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " + (getParallelism() == -1 ? "default" : Integer.valueOf(getParallelism())) + ")";
    }

    public String getHost() {
        return this.host;
    }

    public int getPort() {
        return this.port;
    }

    public Configuration getClientConfiguration() {
        return this.clientConfiguration;
    }
}
