/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.KubernetesWorkerNode;
import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

public class KubernetesResourceManagerDriver
extends AbstractResourceManagerDriver<KubernetesWorkerNode> {
    private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d";
    private final String clusterId;
    private final Time podCreationRetryInterval;
    private final FlinkKubeClient flinkKubeClient;
    private final Map<String, CompletableFuture<KubernetesWorkerNode>> requestResourceFutures;
    private long currentMaxAttemptId = 0L;
    private long currentMaxPodId = 0L;
    private Optional<KubernetesWatch> podsWatchOpt;
    private volatile boolean running;
    private CompletableFuture<Void> podCreationCoolDown;

    public KubernetesResourceManagerDriver(Configuration flinkConfig, FlinkKubeClient flinkKubeClient, KubernetesResourceManagerDriverConfiguration configuration) {
        super(flinkConfig, GlobalConfiguration.loadConfiguration());
        this.clusterId = (String)Preconditions.checkNotNull((Object)configuration.getClusterId());
        this.podCreationRetryInterval = (Time)Preconditions.checkNotNull((Object)configuration.getPodCreationRetryInterval());
        this.flinkKubeClient = (FlinkKubeClient)Preconditions.checkNotNull((Object)flinkKubeClient);
        this.requestResourceFutures = new HashMap<String, CompletableFuture<KubernetesWorkerNode>>();
        this.podCreationCoolDown = FutureUtils.completedVoidFuture();
        this.running = false;
    }

    protected void initializeInternal() throws Exception {
        this.podsWatchOpt = this.watchTaskManagerPods();
        this.recoverWorkerNodesFromPreviousAttempts();
        this.running = true;
    }

    public CompletableFuture<Void> terminate() {
        if (!this.running) {
            return FutureUtils.completedVoidFuture();
        }
        this.running = false;
        Exception exception = null;
        try {
            this.podsWatchOpt.ifPresent(KubernetesWatch::close);
        }
        catch (Exception e) {
            exception = e;
        }
        try {
            this.flinkKubeClient.close();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        return exception == null ? FutureUtils.completedVoidFuture() : FutureUtils.completedExceptionally((Throwable)exception);
    }

    public void deregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) {
        this.log.info("Deregistering Flink Kubernetes cluster, clusterId: {}, diagnostics: {}", (Object)this.clusterId, (Object)(optionalDiagnostics == null ? "" : optionalDiagnostics));
        this.flinkKubeClient.stopAndCleanupCluster(this.clusterId);
    }

    public CompletableFuture<KubernetesWorkerNode> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
        KubernetesTaskManagerParameters parameters = this.createKubernetesTaskManagerParameters(taskExecutorProcessSpec);
        KubernetesPod taskManagerPod = KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod(parameters);
        String podName = taskManagerPod.getName();
        CompletableFuture<KubernetesWorkerNode> requestResourceFuture = new CompletableFuture<KubernetesWorkerNode>();
        this.requestResourceFutures.put(podName, requestResourceFuture);
        this.log.info("Creating new TaskManager pod with name {} and resource <{},{}>.", new Object[]{podName, parameters.getTaskManagerMemoryMB(), parameters.getTaskManagerCPU()});
        CompletionStage createPodFuture = this.podCreationCoolDown.thenCompose(ignore -> this.flinkKubeClient.createTaskManagerPod(taskManagerPod));
        FutureUtils.assertNoException((CompletableFuture)((CompletableFuture)createPodFuture).handleAsync((ignore, exception) -> {
            if (exception != null) {
                this.log.warn("Could not create pod {}, exception: {}", (Object)podName, exception);
                this.tryResetPodCreationCoolDown();
                CompletableFuture<KubernetesWorkerNode> future = this.requestResourceFutures.remove(taskManagerPod.getName());
                if (future != null) {
                    future.completeExceptionally((Throwable)exception);
                }
            } else {
                this.log.info("Pod {} is created.", (Object)podName);
            }
            return null;
        }, (Executor)this.getMainThreadExecutor()));
        return requestResourceFuture;
    }

    public void releaseResource(KubernetesWorkerNode worker) {
        String podName = worker.getResourceID().toString();
        this.log.info("Stopping TaskManager pod {}.", (Object)podName);
        this.stopPod(podName);
    }

    private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerException {
        List<KubernetesPod> podList = this.flinkKubeClient.getPodsWithLabels(KubernetesUtils.getTaskManagerLabels(this.clusterId));
        ArrayList<KubernetesWorkerNode> recoveredWorkers = new ArrayList<KubernetesWorkerNode>();
        for (KubernetesPod pod : podList) {
            KubernetesWorkerNode worker = new KubernetesWorkerNode(new ResourceID(pod.getName()));
            long attempt = worker.getAttempt();
            if (attempt > this.currentMaxAttemptId) {
                this.currentMaxAttemptId = attempt;
            }
            if (pod.isTerminated()) {
                this.stopPod(pod.getName());
                continue;
            }
            recoveredWorkers.add(worker);
        }
        this.log.info("Recovered {} pods from previous attempts, current attempt id is {}.", (Object)recoveredWorkers.size(), (Object)(++this.currentMaxAttemptId));
        this.getResourceEventHandler().onPreviousAttemptWorkersRecovered(recoveredWorkers);
    }

    private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(TaskExecutorProcessSpec taskExecutorProcessSpec) {
        String podName = String.format(TASK_MANAGER_POD_FORMAT, this.clusterId, this.currentMaxAttemptId, ++this.currentMaxPodId);
        ContaineredTaskManagerParameters taskManagerParameters = ContaineredTaskManagerParameters.create((Configuration)this.flinkConfig, (TaskExecutorProcessSpec)taskExecutorProcessSpec);
        Configuration taskManagerConfig = new Configuration(this.flinkConfig);
        taskManagerConfig.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, (Object)podName);
        String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString((Configuration)this.flinkClientConfig, (Configuration)taskManagerConfig);
        return new KubernetesTaskManagerParameters(this.flinkConfig, podName, dynamicProperties, taskManagerParameters, ExternalResourceUtils.getExternalResources((Configuration)this.flinkConfig, (String)"kubernetes.config-key"));
    }

    private void tryResetPodCreationCoolDown() {
        if (this.podCreationCoolDown.isDone()) {
            this.log.info("Pod creation failed. Will not retry creating pods in {}.", (Object)this.podCreationRetryInterval);
            this.podCreationCoolDown = new CompletableFuture();
            this.getMainThreadExecutor().schedule(() -> this.podCreationCoolDown.complete(null), this.podCreationRetryInterval.getSize(), this.podCreationRetryInterval.getUnit());
        }
    }

    private void terminatedPodsInMainThread(List<KubernetesPod> pods) {
        this.getMainThreadExecutor().execute(() -> {
            for (KubernetesPod pod : pods) {
                if (!pod.isTerminated()) continue;
                String podName = pod.getName();
                this.log.debug("TaskManager pod {} is terminated.", (Object)podName);
                CompletableFuture<KubernetesWorkerNode> requestResourceFuture = this.requestResourceFutures.remove(podName);
                if (requestResourceFuture != null) {
                    this.log.warn("Pod {} is terminated before receiving the ADDED event.", (Object)podName);
                    requestResourceFuture.completeExceptionally((Throwable)new FlinkException("Pod is terminated."));
                }
                this.getResourceEventHandler().onWorkerTerminated(new ResourceID(podName), pod.getTerminatedDiagnostics());
                this.stopPod(podName);
            }
        });
    }

    private void stopPod(String podName) {
        this.flinkKubeClient.stopPod(podName).whenComplete((ignore, throwable) -> {
            if (throwable != null) {
                this.log.warn("Could not remove TaskManager pod {}, exception: {}", (Object)podName, throwable);
            }
        });
    }

    private Optional<KubernetesWatch> watchTaskManagerPods() {
        return Optional.of(this.flinkKubeClient.watchPodsAndDoCallback(KubernetesUtils.getTaskManagerLabels(this.clusterId), new PodCallbackHandlerImpl()));
    }

    private class PodCallbackHandlerImpl
    implements FlinkKubeClient.WatchCallbackHandler<KubernetesPod> {
        private PodCallbackHandlerImpl() {
        }

        @Override
        public void onAdded(List<KubernetesPod> pods) {
            KubernetesResourceManagerDriver.this.getMainThreadExecutor().execute(() -> {
                for (KubernetesPod pod : pods) {
                    String podName = pod.getName();
                    CompletableFuture requestResourceFuture = (CompletableFuture)KubernetesResourceManagerDriver.this.requestResourceFutures.remove(podName);
                    if (requestResourceFuture == null) {
                        KubernetesResourceManagerDriver.this.log.debug("Ignore TaskManager pod that is already added: {}", (Object)podName);
                        continue;
                    }
                    KubernetesResourceManagerDriver.this.log.info("Received new TaskManager pod: {}", (Object)podName);
                    requestResourceFuture.complete(new KubernetesWorkerNode(new ResourceID(podName)));
                }
            });
        }

        @Override
        public void onModified(List<KubernetesPod> pods) {
            KubernetesResourceManagerDriver.this.terminatedPodsInMainThread(pods);
        }

        @Override
        public void onDeleted(List<KubernetesPod> pods) {
            KubernetesResourceManagerDriver.this.terminatedPodsInMainThread(pods);
        }

        @Override
        public void onError(List<KubernetesPod> pods) {
            KubernetesResourceManagerDriver.this.terminatedPodsInMainThread(pods);
        }

        @Override
        public void handleError(Throwable throwable) {
            if (throwable instanceof KubernetesTooOldResourceVersionException) {
                KubernetesResourceManagerDriver.this.getMainThreadExecutor().execute(() -> {
                    if (KubernetesResourceManagerDriver.this.running) {
                        KubernetesResourceManagerDriver.this.podsWatchOpt.ifPresent(KubernetesWatch::close);
                        KubernetesResourceManagerDriver.this.log.info("Creating a new watch on TaskManager pods.");
                        KubernetesResourceManagerDriver.this.podsWatchOpt = KubernetesResourceManagerDriver.this.watchTaskManagerPods();
                    }
                });
            } else {
                KubernetesResourceManagerDriver.this.getResourceEventHandler().onError(throwable);
            }
        }
    }
}

