/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorRef;
import akka.pattern.AskTimeoutException;
import akka.pattern.Patterns;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.akka.AkkaBasedEndpoint;
import org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue;
import org.apache.flink.runtime.rpc.akka.ControlMessages;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AkkaInvocationHandler
implements InvocationHandler,
AkkaBasedEndpoint,
RpcServer {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaInvocationHandler.class);
    private final String address;
    private final String hostname;
    private final ActorRef rpcEndpoint;
    protected final boolean isLocal;
    private final Time timeout;
    private final long maximumFramesize;
    @Nullable
    private final CompletableFuture<Void> terminationFuture;
    private final boolean captureAskCallStack;

    AkkaInvocationHandler(String address, String hostname, ActorRef rpcEndpoint, Time timeout, long maximumFramesize, @Nullable CompletableFuture<Void> terminationFuture, boolean captureAskCallStack) {
        this.address = (String)Preconditions.checkNotNull((Object)address);
        this.hostname = (String)Preconditions.checkNotNull((Object)hostname);
        this.rpcEndpoint = (ActorRef)Preconditions.checkNotNull((Object)rpcEndpoint);
        this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();
        this.timeout = (Time)Preconditions.checkNotNull((Object)timeout);
        this.maximumFramesize = maximumFramesize;
        this.terminationFuture = terminationFuture;
        this.captureAskCallStack = captureAskCallStack;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Object result;
        Class<?> declaringClass = method.getDeclaringClass();
        if (declaringClass.equals(AkkaBasedEndpoint.class) || declaringClass.equals(Object.class) || declaringClass.equals(RpcGateway.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(RpcServer.class)) {
            result = method.invoke((Object)this, args);
        } else {
            if (declaringClass.equals(FencedRpcGateway.class)) {
                throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" + method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to retrieve a properly FencedRpcGateway.");
            }
            result = this.invokeRpc(method, args);
        }
        return result;
    }

    @Override
    public ActorRef getActorRef() {
        return this.rpcEndpoint;
    }

    @Override
    public void runAsync(Runnable runnable) {
        this.scheduleRunAsync(runnable, 0L);
    }

    @Override
    public void scheduleRunAsync(Runnable runnable, long delayMillis) {
        Preconditions.checkNotNull((Object)runnable, (String)"runnable");
        Preconditions.checkArgument((delayMillis >= 0L ? 1 : 0) != 0, (Object)"delay must be zero or greater");
        if (!this.isLocal) {
            throw new RuntimeException("Trying to send a Runnable to a remote actor at " + this.rpcEndpoint.path() + ". This is not supported.");
        }
        long atTimeNanos = delayMillis == 0L ? 0L : System.nanoTime() + delayMillis * 1000000L;
        this.tell(new RunAsync(runnable, atTimeNanos));
    }

    @Override
    public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout) {
        if (this.isLocal) {
            CompletableFuture<?> resultFuture = this.ask(new CallAsync(callable), callTimeout);
            return resultFuture;
        }
        throw new RuntimeException("Trying to send a Callable to a remote actor at " + this.rpcEndpoint.path() + ". This is not supported.");
    }

    @Override
    public void start() {
        this.rpcEndpoint.tell((Object)ControlMessages.START, ActorRef.noSender());
    }

    @Override
    public void stop() {
        this.rpcEndpoint.tell((Object)ControlMessages.STOP, ActorRef.noSender());
    }

    private Object invokeRpc(Method method, Object[] args) throws Exception {
        CompletableFuture result;
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        Annotation[][] parameterAnnotations = method.getParameterAnnotations();
        Time futureTimeout = AkkaInvocationHandler.extractRpcTimeout(parameterAnnotations, args, this.timeout);
        RpcInvocation rpcInvocation = this.createRpcInvocationMessage(methodName, parameterTypes, args);
        Class<?> returnType = method.getReturnType();
        if (Objects.equals(returnType, Void.TYPE)) {
            this.tell(rpcInvocation);
            result = null;
        } else {
            Throwable callStackCapture = this.captureAskCallStack ? new Throwable() : null;
            CompletableFuture<?> resultFuture = this.ask(rpcInvocation, futureTimeout);
            CompletableFuture completableFuture = new CompletableFuture();
            resultFuture.whenComplete((resultValue, failure) -> {
                if (failure != null) {
                    completableFuture.completeExceptionally(AkkaInvocationHandler.resolveTimeoutException(failure, callStackCapture, method));
                } else {
                    completableFuture.complete(AkkaInvocationHandler.deserializeValueIfNeeded(resultValue, method));
                }
            });
            if (Objects.equals(returnType, CompletableFuture.class)) {
                result = completableFuture;
            } else {
                try {
                    result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
                }
                catch (ExecutionException ee) {
                    throw new RpcException("Failure while obtaining synchronous RPC result.", ExceptionUtils.stripExecutionException((Throwable)ee));
                }
            }
        }
        return result;
    }

    protected RpcInvocation createRpcInvocationMessage(String methodName, Class<?>[] parameterTypes, Object[] args) throws IOException {
        RpcInvocation rpcInvocation;
        if (this.isLocal) {
            rpcInvocation = new LocalRpcInvocation(methodName, parameterTypes, args);
        } else {
            try {
                RemoteRpcInvocation remoteRpcInvocation = new RemoteRpcInvocation(methodName, parameterTypes, args);
                if (remoteRpcInvocation.getSize() > this.maximumFramesize) {
                    throw new IOException(String.format("The rpc invocation size %d exceeds the maximum akka framesize.", remoteRpcInvocation.getSize()));
                }
                rpcInvocation = remoteRpcInvocation;
            }
            catch (IOException e) {
                LOG.warn("Could not create remote rpc invocation message. Failing rpc invocation because...", (Throwable)e);
                throw e;
            }
        }
        return rpcInvocation;
    }

    private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Time defaultTimeout) {
        if (args != null) {
            Preconditions.checkArgument((parameterAnnotations.length == args.length ? 1 : 0) != 0);
            for (int i = 0; i < parameterAnnotations.length; ++i) {
                if (!AkkaInvocationHandler.isRpcTimeout(parameterAnnotations[i])) continue;
                if (args[i] instanceof Time) {
                    return (Time)args[i];
                }
                throw new RuntimeException("The rpc timeout parameter must be of type " + Time.class.getName() + ". The type " + args[i].getClass().getName() + " is not supported.");
            }
        }
        return defaultTimeout;
    }

    private static boolean isRpcTimeout(Annotation[] annotations) {
        for (Annotation annotation : annotations) {
            if (!annotation.annotationType().equals(RpcTimeout.class)) continue;
            return true;
        }
        return false;
    }

    protected void tell(Object message) {
        this.rpcEndpoint.tell(message, ActorRef.noSender());
    }

    protected CompletableFuture<?> ask(Object message, Time timeout) {
        return FutureUtils.toJava(Patterns.ask((ActorRef)this.rpcEndpoint, (Object)message, (long)timeout.toMilliseconds()));
    }

    @Override
    public String getAddress() {
        return this.address;
    }

    @Override
    public String getHostname() {
        return this.hostname;
    }

    @Override
    public CompletableFuture<Void> getTerminationFuture() {
        return this.terminationFuture;
    }

    static Object deserializeValueIfNeeded(Object o, Method method) {
        if (o instanceof AkkaRpcSerializedValue) {
            try {
                return ((AkkaRpcSerializedValue)o).deserializeValue(AkkaInvocationHandler.class.getClassLoader());
            }
            catch (IOException | ClassNotFoundException e) {
                throw new CompletionException(new RpcException("Could not deserialize the serialized payload of RPC method : " + method.getName(), e));
            }
        }
        return o;
    }

    static Throwable resolveTimeoutException(Throwable exception, @Nullable Throwable callStackCapture, Method method) {
        if (!(exception instanceof AskTimeoutException)) {
            return exception;
        }
        TimeoutException newException = new TimeoutException("Invocation of " + method + " timed out.");
        newException.initCause(exception);
        if (callStackCapture != null) {
            StackTraceElement[] stackTrace = callStackCapture.getStackTrace();
            newException.setStackTrace(Arrays.copyOfRange(stackTrace, 3, stackTrace.length));
        }
        return newException;
    }
}

