/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.akka;

import akka.actor.ActorNotFound;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.AddressFromURIString$;
import akka.actor.RobustActorSystem$;
import akka.pattern.AskableActorRef$;
import akka.pattern.package$;
import akka.util.Timeout;
import akka.util.Timeout$;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import java.io.IOException;
import java.io.Serializable;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.UnknownHostException;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.akka.EscalatingSupervisorStrategy;
import org.apache.flink.runtime.akka.RemoteAddressExtension;
import org.apache.flink.runtime.akka.RemoteAddressExtension$;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.shaded.akka.org.jboss.netty.channel.ChannelException;
import org.apache.flink.shaded.akka.org.jboss.netty.logging.InternalLoggerFactory;
import org.apache.flink.shaded.akka.org.jboss.netty.logging.Slf4JLoggerFactory;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.concurrent.Await$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Try;
import scala.util.Try$;

public final class AkkaUtils$ {
    public static AkkaUtils$ MODULE$;
    private final Logger LOG;
    private final String FLINK_ACTOR_SYSTEM_NAME;

    static {
        new AkkaUtils$();
    }

    public Logger LOG() {
        return this.LOG;
    }

    public String FLINK_ACTOR_SYSTEM_NAME() {
        return this.FLINK_ACTOR_SYSTEM_NAME;
    }

    public String getFlinkActorSystemName() {
        return this.FLINK_ACTOR_SYSTEM_NAME();
    }

    public ActorSystem createLocalActorSystem(Configuration configuration) {
        Config akkaConfig = this.getAkkaConfig(configuration, (Option<Tuple2<String, Object>>)None$.MODULE$);
        return this.createActorSystem(akkaConfig);
    }

    public ActorSystem createActorSystem(Configuration configuration, String hostname, int port) {
        return this.createActorSystem(configuration, (Option<Tuple2<String, Object>>)new Some((Object)new Tuple2((Object)hostname, (Object)BoxesRunTime.boxToInteger((int)port))));
    }

    public ActorSystem createActorSystem(Configuration configuration, Option<Tuple2<String, Object>> listeningAddress) {
        Config akkaConfig = this.getAkkaConfig(configuration, listeningAddress);
        return this.createActorSystem(akkaConfig);
    }

    public ActorSystem createActorSystem(Config akkaConfig) {
        return this.createActorSystem(this.FLINK_ACTOR_SYSTEM_NAME(), akkaConfig);
    }

    public ActorSystem createActorSystem(String actorSystemName, Config akkaConfig) {
        InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
        return RobustActorSystem$.MODULE$.create(actorSystemName, akkaConfig);
    }

    public ActorSystem createDefaultActorSystem() {
        return this.createActorSystem(this.getDefaultAkkaConfig());
    }

    public Config getAkkaConfig(Configuration configuration, String hostname, int port, Config executorConfig) {
        return this.getAkkaConfig(configuration, (Option<Tuple2<String, Object>>)new Some((Object)new Tuple2((Object)hostname, (Object)BoxesRunTime.boxToInteger((int)port))), (Option<Tuple2<String, Object>>)None$.MODULE$, executorConfig);
    }

    public Config getAkkaConfig(Configuration configuration, String hostname, int port) {
        return this.getAkkaConfig(configuration, (Option<Tuple2<String, Object>>)new Some((Object)new Tuple2((Object)hostname, (Object)BoxesRunTime.boxToInteger((int)port))));
    }

    public Config getAkkaConfig(Configuration configuration) {
        return this.getAkkaConfig(configuration, (Option<Tuple2<String, Object>>)None$.MODULE$);
    }

    public Config getAkkaConfig(Configuration configuration, Option<Tuple2<String, Object>> externalAddress) throws UnknownHostException {
        return this.getAkkaConfig(configuration, externalAddress, (Option<Tuple2<String, Object>>)None$.MODULE$, this.getForkJoinExecutorConfig(BootstrapTools.ForkJoinExecutorConfiguration.fromConfiguration(configuration)));
    }

    public Config getAkkaConfig(Configuration configuration, Option<Tuple2<String, Object>> externalAddress, Option<Tuple2<String, Object>> bindAddress, Config executorConfig) throws UnknownHostException {
        Config config;
        Some some;
        Tuple2 tuple2;
        Config defaultConfig = this.getBasicAkkaConfig(configuration).withFallback((ConfigMergeable)executorConfig);
        Option<Tuple2<String, Object>> option = externalAddress;
        if (option instanceof Some && (tuple2 = (Tuple2)(some = (Some)option).value()) != null) {
            Config config2;
            Some some2;
            Tuple2 tuple22;
            String externalHostname = (String)tuple2._1();
            int externalPort = tuple2._2$mcI$sp();
            Option<Tuple2<String, Object>> option2 = bindAddress;
            if (option2 instanceof Some && (tuple22 = (Tuple2)(some2 = (Some)option2).value()) != null) {
                String bindHostname = (String)tuple22._1();
                int bindPort = tuple22._2$mcI$sp();
                Config remoteConfig = this.getRemoteAkkaConfig(configuration, bindHostname, bindPort, externalHostname, externalPort);
                config2 = remoteConfig.withFallback((ConfigMergeable)defaultConfig);
            } else if (None$.MODULE$.equals(option2)) {
                Config remoteConfig = this.getRemoteAkkaConfig(configuration, NetUtils.getWildcardIPAddress(), externalPort, externalHostname, externalPort);
                config2 = remoteConfig.withFallback((ConfigMergeable)defaultConfig);
            } else {
                throw new MatchError(option2);
            }
            config = config2;
        } else if (None$.MODULE$.equals(option)) {
            config = defaultConfig;
        } else {
            throw new MatchError(option);
        }
        return config;
    }

    public Config getDefaultAkkaConfig() {
        return this.getAkkaConfig(new Configuration(), (Option<Tuple2<String, Object>>)new Some((Object)new Tuple2((Object)"", (Object)BoxesRunTime.boxToInteger((int)0))));
    }

    private Config getBasicAkkaConfig(Configuration configuration) {
        int akkaThroughput = configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT);
        boolean lifecycleEvents = configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS);
        String jvmExitOnFatalError = configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR) ? "on" : "off";
        String logLifecycleEvents = lifecycleEvents ? "on" : "off";
        String logLevel = this.getLogLevel();
        String supervisorStrategy2 = EscalatingSupervisorStrategy.class.getCanonicalName();
        String config = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(1005).append("\n        |akka {\n        | daemonic = off\n        |\n        | loggers = [\"akka.event.slf4j.Slf4jLogger\"]\n        | logging-filter = \"akka.event.slf4j.Slf4jLoggingFilter\"\n        | log-config-on-start = off\n        | logger-startup-timeout = 30s\n        |\n        | jvm-exit-on-fatal-error = ").append(jvmExitOnFatalError).append("\n        |\n        | serialize-messages = off\n        |\n        | loglevel = ").append(logLevel).append("\n        | stdout-loglevel = OFF\n        |\n        | log-dead-letters = ").append(logLifecycleEvents).append("\n        | log-dead-letters-during-shutdown = ").append(logLifecycleEvents).append("\n        |\n        | actor {\n        |   guardian-supervisor-strategy = ").append(supervisorStrategy2).append("\n        |\n        |   warn-about-java-serializer-usage = off\n        |\n        |   default-dispatcher {\n        |     throughput = ").append(akkaThroughput).append("\n        |   }\n        |\n        |   supervisor-dispatcher {\n        |     type = Dispatcher\n        |     executor = \"thread-pool-executor\"\n        |     thread-pool-executor {\n        |       core-pool-size-min = 1\n        |       core-pool-size-max = 1\n        |     }\n        |   }\n        | }\n        |}\n      ").toString())).stripMargin();
        return ConfigFactory.parseString((String)config);
    }

    public Config getThreadPoolExecutorConfig(BootstrapTools.FixedThreadPoolExecutorConfiguration configuration) {
        int threadPriority = configuration.getThreadPriority();
        int minNumThreads = configuration.getMinNumThreads();
        int maxNumThreads = configuration.getMaxNumThreads();
        String configString = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(382).append("\n       |akka {\n       |  actor {\n       |    default-dispatcher {\n       |      type = akka.dispatch.PriorityThreadsDispatcher\n       |      executor = \"thread-pool-executor\"\n       |      thread-priority = ").append(threadPriority).append("\n       |      thread-pool-executor {\n       |        core-pool-size-min = ").append(minNumThreads).append("\n       |        core-pool-size-max = ").append(maxNumThreads).append("\n       |      }\n       |    }\n       |  }\n       |}\n        ").toString())).stripMargin();
        return ConfigFactory.parseString((String)configString);
    }

    public Config getForkJoinExecutorConfig(BootstrapTools.ForkJoinExecutorConfiguration configuration) {
        double forkJoinExecutorParallelismFactor = configuration.getParallelismFactor();
        int forkJoinExecutorParallelismMin = configuration.getMinParallelism();
        int forkJoinExecutorParallelismMax = configuration.getMaxParallelism();
        String configString = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(307).append("\n       |akka {\n       |  actor {\n       |    default-dispatcher {\n       |      executor = \"fork-join-executor\"\n       |      fork-join-executor {\n       |        parallelism-factor = ").append(forkJoinExecutorParallelismFactor).append("\n       |        parallelism-min = ").append(forkJoinExecutorParallelismMin).append("\n       |        parallelism-max = ").append(forkJoinExecutorParallelismMax).append("\n       |      }\n       |    }\n       |  }\n       |}").toString())).stripMargin();
        return ConfigFactory.parseString((String)configString);
    }

    public Config testDispatcherConfig() {
        String config = new StringOps(Predef$.MODULE$.augmentString("\n         |akka {\n         |  actor {\n         |    default-dispatcher {\n         |      fork-join-executor {\n         |        parallelism-factor = 1.0\n         |        parallelism-min = 2\n         |        parallelism-max = 4\n         |      }\n         |    }\n         |  }\n         |}\n      ")).stripMargin();
        return ConfigFactory.parseString((String)config);
    }

    private Config getRemoteAkkaConfig(Configuration configuration, String bindAddress, int port, String externalHostname, int externalPort) {
        String normalizedExternalHostname = NetUtils.unresolvedHostToNormalizedString((String)externalHostname);
        java.time.Duration akkaAskTimeout = this.getTimeout(configuration);
        String startupTimeout = TimeUtils.getStringInMillis((java.time.Duration)TimeUtils.parseDuration((String)configuration.getString(AkkaOptions.STARTUP_TIMEOUT, TimeUtils.getStringInMillis((java.time.Duration)akkaAskTimeout.multipliedBy(10L)))));
        String akkaTCPTimeout = TimeUtils.getStringInMillis((java.time.Duration)TimeUtils.parseDuration((String)configuration.getString(AkkaOptions.TCP_TIMEOUT)));
        String akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE);
        boolean lifecycleEvents = configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS);
        String logLifecycleEvents = lifecycleEvents ? "on" : "off";
        boolean akkaEnableSSLConfig = configuration.getBoolean(AkkaOptions.SSL_ENABLED) && SSLUtils.isInternalSSLEnabled(configuration);
        long retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR);
        String akkaEnableSSL = akkaEnableSSLConfig ? "on" : "off";
        String akkaSSLKeyStore = configuration.getString(SecurityOptions.SSL_INTERNAL_KEYSTORE, configuration.getString(SecurityOptions.SSL_KEYSTORE));
        String akkaSSLKeyStorePassword = configuration.getString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD));
        String akkaSSLKeyPassword = configuration.getString(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, configuration.getString(SecurityOptions.SSL_KEY_PASSWORD));
        String akkaSSLTrustStore = configuration.getString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE, configuration.getString(SecurityOptions.SSL_TRUSTSTORE));
        String akkaSSLTrustStorePassword = configuration.getString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD));
        String akkaSSLCertFingerprintString = configuration.getString(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT);
        String akkaSSLCertFingerprints = akkaSSLCertFingerprintString != null ? new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])akkaSSLCertFingerprintString.split(","))).toList().mkString("[\"", "\",\"", "\"]") : "[]";
        String akkaSSLProtocol = configuration.getString(SecurityOptions.SSL_PROTOCOL);
        String akkaSSLAlgorithmsString = configuration.getString(SecurityOptions.SSL_ALGORITHMS);
        String akkaSSLAlgorithms = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])akkaSSLAlgorithmsString.split(","))).toList().mkString("[", ",", "]");
        int clientSocketWorkerPoolPoolSizeMin = configuration.getInteger(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
        int clientSocketWorkerPoolPoolSizeMax = configuration.getInteger(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
        double clientSocketWorkerPoolPoolSizeFactor = configuration.getDouble(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
        int serverSocketWorkerPoolPoolSizeMin = configuration.getInteger(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
        int serverSocketWorkerPoolPoolSizeMax = configuration.getInteger(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
        double serverSocketWorkerPoolPoolSizeFactor = configuration.getDouble(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
        String configString = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(1322).append("\n         |akka {\n         |  actor {\n         |    provider = \"akka.remote.RemoteActorRefProvider\"\n         |  }\n         |\n         |  remote {\n         |    startup-timeout = ").append(startupTimeout).append("\n         |\n         |    # disable the transport failure detector by setting very high values\n         |    transport-failure-detector{\n         |      acceptable-heartbeat-pause = 6000 s\n         |      heartbeat-interval = 1000 s\n         |      threshold = 300\n         |    }\n         |\n         |    netty {\n         |      tcp {\n         |        transport-class = \"akka.remote.transport.netty.NettyTransport\"\n         |        port = ").append(externalPort).append("\n         |        bind-port = ").append(port).append("\n         |        connection-timeout = ").append(akkaTCPTimeout).append("\n         |        maximum-frame-size = ").append(akkaFramesize).append("\n         |        tcp-nodelay = on\n         |\n         |        client-socket-worker-pool {\n         |          pool-size-min = ").append(clientSocketWorkerPoolPoolSizeMin).append("\n         |          pool-size-max = ").append(clientSocketWorkerPoolPoolSizeMax).append("\n         |          pool-size-factor = ").append(clientSocketWorkerPoolPoolSizeFactor).append("\n         |        }\n         |\n         |        server-socket-worker-pool {\n         |          pool-size-min = ").append(serverSocketWorkerPoolPoolSizeMin).append("\n         |          pool-size-max = ").append(serverSocketWorkerPoolPoolSizeMax).append("\n         |          pool-size-factor = ").append(serverSocketWorkerPoolPoolSizeFactor).append("\n         |        }\n         |      }\n         |    }\n         |\n         |    log-remote-lifecycle-events = ").append(logLifecycleEvents).append("\n         |\n         |    retry-gate-closed-for = ").append(new StringBuilder(3).append(retryGateClosedFor).append(" ms").toString()).append("\n         |  }\n         |}\n       ").toString())).stripMargin();
        String effectiveHostname = normalizedExternalHostname != null && new StringOps(Predef$.MODULE$.augmentString(normalizedExternalHostname)).nonEmpty() ? normalizedExternalHostname : "";
        String hostnameConfigString = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(219).append("\n         |akka {\n         |  remote {\n         |    netty {\n         |      tcp {\n         |        hostname = \"").append(effectiveHostname).append("\"\n         |        bind-hostname = \"").append(bindAddress).append("\"\n         |      }\n         |    }\n         |  }\n         |}\n       ").toString())).stripMargin();
        String sslConfigString = akkaEnableSSLConfig ? new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(907).append("\n         |akka {\n         |  remote {\n         |\n         |    enabled-transports = [\"akka.remote.netty.ssl\"]\n         |\n         |    netty {\n         |\n         |      ssl = ${akka.remote.netty.tcp}\n         |\n         |      ssl {\n         |\n         |        enable-ssl = ").append(akkaEnableSSL).append("\n         |        ssl-engine-provider = org.apache.flink.runtime.akka.CustomSSLEngineProvider\n         |        security {\n         |          key-store = \"").append(akkaSSLKeyStore).append("\"\n         |          key-store-password = \"").append(akkaSSLKeyStorePassword).append("\"\n         |          key-password = \"").append(akkaSSLKeyPassword).append("\"\n         |          trust-store = \"").append(akkaSSLTrustStore).append("\"\n         |          trust-store-password = \"").append(akkaSSLTrustStorePassword).append("\"\n         |          protocol = ").append(akkaSSLProtocol).append("\n         |          enabled-algorithms = ").append(akkaSSLAlgorithms).append("\n         |          random-number-generator = \"\"\n         |          require-mutual-authentication = on\n         |          cert-fingerprints = ").append(akkaSSLCertFingerprints).append("\n         |        }\n         |      }\n         |    }\n         |  }\n         |}\n       ").toString())).stripMargin() : "";
        return ConfigFactory.parseString((String)new StringBuilder(0).append(configString).append(hostnameConfigString).append(sslConfigString).toString()).resolve();
    }

    public String getLogLevel() {
        return this.LOG().isTraceEnabled() ? "TRACE" : (this.LOG().isDebugEnabled() ? "DEBUG" : (this.LOG().isInfoEnabled() ? "INFO" : (this.LOG().isWarnEnabled() ? "WARNING" : (this.LOG().isErrorEnabled() ? "ERROR" : "OFF"))));
    }

    public Future<ActorRef> getChild(ActorRef parent, String child, ActorSystem system, FiniteDuration timeout) {
        return system.actorSelection(parent.path().$div(child)).resolveOne(Timeout$.MODULE$.durationToTimeout(timeout));
    }

    public Future<ActorRef> getActorRefFuture(String path, ActorSystem system, FiniteDuration timeout) {
        return system.actorSelection(path).resolveOne(Timeout$.MODULE$.durationToTimeout(timeout));
    }

    public ActorRef getActorRef(String path, ActorSystem system, FiniteDuration timeout) throws IOException {
        ActorRef actorRef;
        try {
            Future<ActorRef> future = this.getActorRefFuture(path, system, timeout);
            actorRef = (ActorRef)Await$.MODULE$.result(future, (Duration)timeout);
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            boolean bl = throwable2 instanceof ActorNotFound ? true : throwable2 instanceof TimeoutException;
            if (bl) {
                throw new IOException(new StringBuilder(94).append("Actor at ").append(path).append(" not reachable. ").append("Please make sure that the actor is running and its port is reachable.").toString(), throwable2);
            }
            if (throwable2 instanceof IOException) {
                IOException iOException = (IOException)throwable2;
                throw new IOException(new StringBuilder(34).append("Could not connect to the actor at ").append(path).toString(), iOException);
            }
            throw throwable;
        }
        return actorRef;
    }

    public <T> Future<T> retry(Function0<T> body2, int tries, ExecutionContext executionContext) {
        return Future$.MODULE$.apply(body2, executionContext).recoverWith((PartialFunction)new scala.Serializable(body2, tries, executionContext){
            public static final long serialVersionUID = 0L;
            private final Function0 body$1;
            private final int tries$1;
            private final ExecutionContext executionContext$1;

            public final <A1 extends Throwable, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                Object object;
                A1 A1 = x1;
                if (A1 != null) {
                    A1 A12 = A1;
                    object = this.tries$1 > 0 ? AkkaUtils$.MODULE$.retry(this.body$1, this.tries$1 - 1, this.executionContext$1) : Future$.MODULE$.failed(A12);
                } else {
                    object = function1.apply(x1);
                }
                return (B1)object;
            }

            public final boolean isDefinedAt(Throwable x1) {
                Throwable throwable = x1;
                boolean bl = throwable != null;
                return bl;
            }
            {
                this.body$1 = body$1;
                this.tries$1 = tries$1;
                this.executionContext$1 = executionContext$1;
            }
        }, executionContext);
    }

    public <T> Future<T> retry(Callable<T> callable, int tries, ExecutionContext executionContext) {
        return this.retry((Function0 & Serializable & scala.Serializable)() -> callable.call(), tries, executionContext);
    }

    public Future<Object> retry(ActorRef target, Object message, int tries, ExecutionContext executionContext, FiniteDuration timeout) {
        ActorRef qual$1 = package$.MODULE$.ask(target);
        Object x$1 = message;
        Timeout x$2 = Timeout$.MODULE$.durationToTimeout(timeout);
        ActorRef x$3 = AskableActorRef$.MODULE$.$qmark$default$3$extension(qual$1, x$1);
        return AskableActorRef$.MODULE$.$qmark$extension1(qual$1, x$1, x$2, x$3).recoverWith((PartialFunction)new scala.Serializable(target, message, tries, executionContext, timeout){
            public static final long serialVersionUID = 0L;
            private final ActorRef target$1;
            private final Object message$1;
            private final int tries$2;
            private final ExecutionContext executionContext$2;
            private final FiniteDuration timeout$1;

            public final <A1 extends Throwable, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                Object object;
                A1 A1 = x2;
                if (A1 != null) {
                    A1 A12 = A1;
                    object = this.tries$2 > 0 ? AkkaUtils$.MODULE$.retry(this.target$1, this.message$1, this.tries$2 - 1, this.executionContext$2, this.timeout$1) : Future$.MODULE$.failed(A12);
                } else {
                    object = function1.apply(x2);
                }
                return (B1)object;
            }

            public final boolean isDefinedAt(Throwable x2) {
                Throwable throwable = x2;
                boolean bl = throwable != null;
                return bl;
            }
            {
                this.target$1 = target$1;
                this.message$1 = message$1;
                this.tries$2 = tries$2;
                this.executionContext$2 = executionContext$2;
                this.timeout$1 = timeout$1;
            }
        }, executionContext);
    }

    public java.time.Duration getTimeout(Configuration config) {
        return TimeUtils.parseDuration((String)config.getString(AkkaOptions.ASK_TIMEOUT));
    }

    public Time getTimeoutAsTime(Configuration config) {
        Time time;
        try {
            java.time.Duration duration = this.getTimeout(config);
            time = Time.milliseconds((long)duration.toMillis());
        }
        catch (NumberFormatException numberFormatException) {
            throw new IllegalConfigurationException(this.formatDurationParsingErrorMessage());
        }
        return time;
    }

    public Time getDefaultTimeout() {
        java.time.Duration duration = TimeUtils.parseDuration((String)((String)AkkaOptions.ASK_TIMEOUT.defaultValue()));
        return Time.milliseconds((long)duration.toMillis());
    }

    public java.time.Duration getLookupTimeout(Configuration config) {
        return TimeUtils.parseDuration((String)config.getString(AkkaOptions.LOOKUP_TIMEOUT));
    }

    public Address getAddress(ActorSystem system) {
        return ((RemoteAddressExtension)RemoteAddressExtension$.MODULE$.apply(system)).address();
    }

    public String getAkkaURL(ActorSystem system, ActorRef actor) {
        Address address = this.getAddress(system);
        return actor.path().toStringWithAddress(address);
    }

    public String getAkkaURL(ActorSystem system, String path) {
        Address address = this.getAddress(system);
        return new StringBuilder(0).append(address.toString()).append(path).toString();
    }

    public InetSocketAddress getInetSocketAddressFromAkkaURL(String akkaURL) throws Exception {
        InetSocketAddress inetSocketAddress;
        try {
            String hostname;
            Option option;
            block5: {
                block4: {
                    Address address = this.getAddressFromAkkaURL(akkaURL);
                    Tuple2 tuple2 = new Tuple2((Object)address.host(), (Object)address.port());
                    if (tuple2 == null) break block4;
                    Option option2 = (Option)tuple2._1();
                    option = (Option)tuple2._2();
                    if (!(option2 instanceof Some)) break block4;
                    Some some = (Some)option2;
                    hostname = (String)some.value();
                    if (option instanceof Some) break block5;
                }
                throw new MalformedURLException();
            }
            Some some = (Some)option;
            int portValue = BoxesRunTime.unboxToInt((Object)some.value());
            InetSocketAddress inetSocketAddress2 = new InetSocketAddress(hostname, portValue);
            inetSocketAddress = inetSocketAddress2;
        }
        catch (MalformedURLException malformedURLException) {
            throw new Exception(new StringBuilder(51).append("Could not retrieve InetSocketAddress from Akka URL ").append(akkaURL).toString());
        }
        return inetSocketAddress;
    }

    public Address getAddressFromAkkaURL(String akkaURL) throws MalformedURLException {
        return AddressFromURIString$.MODULE$.apply(akkaURL);
    }

    public String formatDurationParsingErrorMessage() {
        return "Duration format must be \"val unit\", where 'val' is a number and 'unit' is (d|day)|(h|hour)|(min|minute)|s|sec|second)|(ms|milli|millisecond)|(\u00b5s|micro|microsecond)|(ns|nano|nanosecond)";
    }

    public String getLocalAkkaURL(String actorName) {
        return new StringBuilder(18).append("akka://flink/user/").append(actorName).toString();
    }

    public <T> Try<T> retryOnBindException(Function0<T> fn, Function0<Object> stopCond, long maxSleepBetweenRetries) {
        Try try_;
        block4: {
            Try try_2;
            block5: {
                Failure failure;
                block7: {
                    Exception exception;
                    block6: {
                        while (true) {
                            Throwable x;
                            boolean bl = false;
                            Failure failure2 = null;
                            try_2 = Try$.MODULE$.apply(fn);
                            if (try_2 instanceof Failure) {
                                bl = true;
                                failure2 = (Failure)try_2;
                                Throwable x2 = failure2.exception();
                                if (x2 instanceof BindException) {
                                    BindException bindException = (BindException)x2;
                                    if (!stopCond.apply$mcZ$sp()) {
                                        this.sleepBeforeRetry$1(maxSleepBetweenRetries);
                                        maxSleepBetweenRetries = this.retryOnBindException$default$3();
                                        continue;
                                    }
                                    try_ = new Failure((Throwable)bindException);
                                    break block4;
                                }
                            }
                            if (!bl || !((x = failure2.exception()) instanceof Exception)) break block5;
                            exception = (Exception)x;
                            Throwable throwable = exception.getCause();
                            if (!(throwable instanceof ChannelException)) break block6;
                            if (stopCond.apply$mcZ$sp()) break;
                            this.sleepBeforeRetry$1(maxSleepBetweenRetries);
                            maxSleepBetweenRetries = this.retryOnBindException$default$3();
                        }
                        failure = new Failure((Throwable)new RuntimeException("Unable to do further retries starting the actor system"));
                        break block7;
                    }
                    failure = new Failure((Throwable)exception);
                }
                try_ = failure;
                break block4;
            }
            try_ = try_2;
        }
        return try_;
    }

    public <T> long retryOnBindException$default$3() {
        return 0L;
    }

    public CompletableFuture<Void> terminateActorSystem(ActorSystem actorSystem) {
        return FutureUtils.toJava(actorSystem.terminate()).thenAccept(FunctionUtils.ignoreFn());
    }

    private final void sleepBeforeRetry$1(long maxSleepBetweenRetries$1) {
        block0: {
            if (maxSleepBetweenRetries$1 <= 0L) break block0;
            long sleepTime = (long)(Math.random() * (double)maxSleepBetweenRetries$1);
            this.LOG().info(new StringBuilder(48).append("Retrying after bind exception. Sleeping for ").append(sleepTime).append(" ms.").toString());
            Thread.sleep(sleepTime);
        }
    }

    private AkkaUtils$() {
        MODULE$ = this;
        this.LOG = LoggerFactory.getLogger(this.getClass());
        this.FLINK_ACTOR_SYSTEM_NAME = "flink";
    }
}

