/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ScheduledFutureAdapter;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class RpcEndpoint
implements RpcGateway,
AutoCloseableAsync {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    private final RpcService rpcService;
    private final String endpointId;
    protected final RpcServer rpcServer;
    final AtomicReference<Thread> currentMainThread = new AtomicReference<Object>(null);
    private final MainThreadExecutor mainThreadExecutor;
    private final CloseableRegistry resourceRegistry;
    private boolean isRunning;

    protected RpcEndpoint(RpcService rpcService, String endpointId) {
        this.rpcService = (RpcService)Preconditions.checkNotNull((Object)rpcService, (String)"rpcService");
        this.endpointId = (String)Preconditions.checkNotNull((Object)endpointId, (String)"endpointId");
        this.rpcServer = rpcService.startServer(this);
        this.resourceRegistry = new CloseableRegistry();
        this.mainThreadExecutor = new MainThreadExecutor((MainThreadExecutable)this.rpcServer, this::validateRunsInMainThread, endpointId);
        this.registerResource(this.mainThreadExecutor);
    }

    protected RpcEndpoint(RpcService rpcService) {
        this(rpcService, UUID.randomUUID().toString());
    }

    public String getEndpointId() {
        return this.endpointId;
    }

    protected boolean isRunning() {
        this.validateRunsInMainThread();
        return this.isRunning;
    }

    public final void start() {
        this.rpcServer.start();
    }

    public final void internalCallOnStart() throws Exception {
        this.validateRunsInMainThread();
        this.isRunning = true;
        this.onStart();
    }

    protected void onStart() throws Exception {
    }

    protected final void stop() {
        this.rpcServer.stop();
    }

    public final CompletableFuture<Void> internalCallOnStop() {
        this.validateRunsInMainThread();
        CompletableFuture<Void> stopFuture = new CompletableFuture();
        try {
            this.resourceRegistry.close();
            stopFuture.complete(null);
        }
        catch (IOException e) {
            stopFuture.completeExceptionally(new RuntimeException("Close resource registry fail", e));
        }
        stopFuture = CompletableFuture.allOf(stopFuture, this.onStop());
        this.isRunning = false;
        return stopFuture;
    }

    protected void registerResource(Closeable closeableResource) {
        try {
            this.resourceRegistry.registerCloseable((AutoCloseable)closeableResource);
        }
        catch (IOException e) {
            throw new RuntimeException("Registry closeable resource " + closeableResource + " fail", e);
        }
    }

    protected boolean unregisterResource(Closeable closeableResource) {
        return this.resourceRegistry.unregisterCloseable((AutoCloseable)closeableResource);
    }

    protected CompletableFuture<Void> onStop() {
        return CompletableFuture.completedFuture(null);
    }

    public final CompletableFuture<Void> closeAsync() {
        this.rpcService.stopServer(this.rpcServer);
        return this.getTerminationFuture();
    }

    public <C extends RpcGateway> C getSelfGateway(Class<C> selfGatewayType) {
        return this.rpcService.getSelfGateway(selfGatewayType, this.rpcServer);
    }

    @Override
    public String getAddress() {
        return this.rpcServer.getAddress();
    }

    @Override
    public String getHostname() {
        return this.rpcServer.getHostname();
    }

    protected MainThreadExecutor getMainThreadExecutor() {
        return this.mainThreadExecutor;
    }

    public RpcService getRpcService() {
        return this.rpcService;
    }

    public CompletableFuture<Void> getTerminationFuture() {
        return this.rpcServer.getTerminationFuture();
    }

    protected void runAsync(Runnable runnable) {
        this.rpcServer.runAsync(runnable);
    }

    protected void scheduleRunAsync(Runnable runnable, Duration delay) {
        this.scheduleRunAsync(runnable, delay.toMillis(), TimeUnit.MILLISECONDS);
    }

    protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
        this.rpcServer.scheduleRunAsync(runnable, unit.toMillis(delay));
    }

    protected <V> CompletableFuture<V> callAsync(Callable<V> callable, Duration timeout) {
        return this.rpcServer.callAsync(callable, timeout);
    }

    public void validateRunsInMainThread() {
        assert (MainThreadValidatorUtil.isRunningInExpectedThread(this.currentMainThread.get()));
    }

    boolean validateResourceClosed() {
        return this.mainThreadExecutor.validateScheduledExecutorClosed() && this.resourceRegistry.isClosed();
    }

    protected static class MainThreadExecutor
    implements ComponentMainThreadExecutor,
    Closeable {
        private static final Logger log = LoggerFactory.getLogger(MainThreadExecutor.class);
        private final MainThreadExecutable gateway;
        private final Runnable mainThreadCheck;
        private final ScheduledExecutorService mainScheduledExecutor;

        MainThreadExecutor(MainThreadExecutable gateway, Runnable mainThreadCheck, String endpointId) {
            this(gateway, mainThreadCheck, Executors.newSingleThreadScheduledExecutor((ThreadFactory)new ExecutorThreadFactory(endpointId + "-main-scheduler")));
        }

        @VisibleForTesting
        MainThreadExecutor(MainThreadExecutable gateway, Runnable mainThreadCheck, ScheduledExecutorService mainScheduledExecutor) {
            this.gateway = (MainThreadExecutable)Preconditions.checkNotNull((Object)gateway);
            this.mainThreadCheck = (Runnable)Preconditions.checkNotNull((Object)mainThreadCheck);
            this.mainScheduledExecutor = mainScheduledExecutor;
        }

        public void execute(@Nonnull Runnable command) {
            this.gateway.runAsync(command);
        }

        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
            long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit);
            FutureTask<Object> ft = new FutureTask<Object>(command, null);
            if (this.mainScheduledExecutor.isShutdown()) {
                log.warn("The scheduled executor service is shutdown and ignores the command {}", (Object)command);
            } else {
                this.mainScheduledExecutor.schedule(() -> this.gateway.runAsync(ft), delayMillis, TimeUnit.MILLISECONDS);
            }
            return new ScheduledFutureAdapter<Object>(ft, delayMillis, TimeUnit.MILLISECONDS);
        }

        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
            long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit);
            FutureTask ft = new FutureTask(callable);
            if (this.mainScheduledExecutor.isShutdown()) {
                log.warn("The scheduled executor service is shutdown and ignores the callable {}", callable);
            } else {
                this.mainScheduledExecutor.schedule(() -> this.gateway.runAsync(ft), delayMillis, TimeUnit.MILLISECONDS);
            }
            return new ScheduledFutureAdapter<V>(ft, delayMillis, TimeUnit.MILLISECONDS);
        }

        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
            throw new UnsupportedOperationException("Not implemented because the method is currently not required.");
        }

        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
            throw new UnsupportedOperationException("Not implemented because the method is currently not required.");
        }

        @Override
        public void assertRunningInMainThread() {
            this.mainThreadCheck.run();
        }

        @Override
        public void close() {
            if (!this.mainScheduledExecutor.isShutdown()) {
                this.mainScheduledExecutor.shutdownNow();
            }
        }

        final boolean validateScheduledExecutorClosed() {
            return this.mainScheduledExecutor.isShutdown();
        }
    }
}

