/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.concurrent.impl;

import akka.dispatch.ExecutionContexts$;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import akka.dispatch.Recover;
import akka.japi.Procedure;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.Option;
import scala.PartialFunction;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

public class FlinkFuture<T>
implements Future<T> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkFuture.class);
    protected scala.concurrent.Future<T> scalaFuture;

    FlinkFuture() {
        this.scalaFuture = null;
    }

    public FlinkFuture(scala.concurrent.Future<T> scalaFuture) {
        this.scalaFuture = (scala.concurrent.Future)Preconditions.checkNotNull(scalaFuture);
    }

    @Override
    public boolean isDone() {
        return this.scalaFuture.isCompleted();
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public T get() throws InterruptedException, ExecutionException {
        Preconditions.checkNotNull(this.scalaFuture);
        try {
            return (T)Await.result(this.scalaFuture, (Duration)Duration.Inf());
        }
        catch (InterruptedException e) {
            throw e;
        }
        catch (ThrowableWrapperException e) {
            throw new ExecutionException(e.getCause());
        }
        catch (Exception e) {
            throw new ExecutionException(e);
        }
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        Preconditions.checkNotNull(this.scalaFuture);
        Preconditions.checkArgument((timeout >= 0L ? 1 : 0) != 0, (Object)"The timeout value has to be larger or equal than 0.");
        try {
            return (T)Await.result(this.scalaFuture, (Duration)new FiniteDuration(timeout, unit));
        }
        catch (InterruptedException | TimeoutException e) {
            throw e;
        }
        catch (Exception e) {
            throw new ExecutionException(e);
        }
    }

    @Override
    public T getNow(T valueIfAbsent) throws ExecutionException {
        Preconditions.checkNotNull(this.scalaFuture);
        Option value = this.scalaFuture.value();
        if (value.isDefined()) {
            Try tri = (Try)value.get();
            if (tri instanceof Success) {
                return (T)((Success)tri).value();
            }
            throw new ExecutionException(((Failure)tri).exception());
        }
        return valueIfAbsent;
    }

    @Override
    public <R> Future<R> thenApplyAsync(final ApplyFunction<? super T, ? extends R> applyFunction, Executor executor) {
        Preconditions.checkNotNull(this.scalaFuture);
        Preconditions.checkNotNull(applyFunction);
        Preconditions.checkNotNull((Object)executor);
        scala.concurrent.Future mappedFuture = this.scalaFuture.map((Function1)new Mapper<T, R>(){

            public R apply(T value) {
                return applyFunction.apply(value);
            }
        }, FlinkFuture.createExecutionContext(executor));
        return new FlinkFuture<T>(mappedFuture);
    }

    @Override
    public <R> Future<R> thenApply(ApplyFunction<? super T, ? extends R> applyFunction) {
        return this.thenApplyAsync(applyFunction, Executors.directExecutor());
    }

    @Override
    public Future<Void> thenAcceptAsync(final AcceptFunction<? super T> acceptFunction, Executor executor) {
        Preconditions.checkNotNull(this.scalaFuture);
        Preconditions.checkNotNull(acceptFunction);
        Preconditions.checkNotNull((Object)executor);
        scala.concurrent.Future acceptedFuture = this.scalaFuture.map((Function1)new Mapper<T, Void>(){

            public Void apply(T value) {
                acceptFunction.accept(value);
                return null;
            }
        }, FlinkFuture.createExecutionContext(executor));
        return new FlinkFuture<Void>(acceptedFuture);
    }

    @Override
    public Future<Void> thenAccept(AcceptFunction<? super T> acceptFunction) {
        return this.thenAcceptAsync(acceptFunction, Executors.directExecutor());
    }

    @Override
    public <R> Future<R> exceptionallyAsync(final ApplyFunction<Throwable, ? extends R> exceptionallyFunction, Executor executor) {
        Preconditions.checkNotNull(this.scalaFuture);
        Preconditions.checkNotNull(exceptionallyFunction);
        Preconditions.checkNotNull((Object)executor);
        scala.concurrent.Future recoveredFuture = this.scalaFuture.recover((PartialFunction)new Recover<R>(){

            public R recover(Throwable failure) throws Throwable {
                return exceptionallyFunction.apply(failure);
            }
        }, FlinkFuture.createExecutionContext(executor));
        return new FlinkFuture<T>(recoveredFuture);
    }

    @Override
    public <R> Future<R> exceptionally(ApplyFunction<Throwable, ? extends R> exceptionallyFunction) {
        return this.exceptionallyAsync(exceptionallyFunction, Executors.directExecutor());
    }

    @Override
    public <R> Future<R> thenComposeAsync(final ApplyFunction<? super T, ? extends Future<R>> applyFunction, Executor executor) {
        Preconditions.checkNotNull(this.scalaFuture);
        Preconditions.checkNotNull(applyFunction);
        Preconditions.checkNotNull((Object)executor);
        final ExecutionContext executionContext = FlinkFuture.createExecutionContext(executor);
        scala.concurrent.Future flatMappedFuture = this.scalaFuture.flatMap((Function1)new Mapper<T, scala.concurrent.Future<R>>(){

            public scala.concurrent.Future<R> apply(T value) {
                final Future future = (Future)applyFunction.apply(value);
                if (future instanceof FlinkFuture) {
                    FlinkFuture flinkFuture = (FlinkFuture)future;
                    return flinkFuture.scalaFuture;
                }
                return Futures.future((Callable)new Callable<R>(){

                    @Override
                    public R call() throws Exception {
                        try {
                            return future.get();
                        }
                        catch (ExecutionException e) {
                            if (e.getCause() instanceof Exception) {
                                throw (Exception)e.getCause();
                            }
                            throw new ThrowableWrapperException(e.getCause());
                        }
                    }
                }, (ExecutionContext)executionContext);
            }
        }, executionContext);
        return new FlinkFuture<T>(flatMappedFuture);
    }

    @Override
    public <R> Future<R> thenCompose(ApplyFunction<? super T, ? extends Future<R>> composeFunction) {
        return this.thenComposeAsync(composeFunction, Executors.directExecutor());
    }

    @Override
    public <R> Future<R> handleAsync(final BiFunction<? super T, Throwable, ? extends R> biFunction, Executor executor) {
        Preconditions.checkNotNull(this.scalaFuture);
        Preconditions.checkNotNull(biFunction);
        Preconditions.checkNotNull((Object)executor);
        ExecutionContext executionContext = FlinkFuture.createExecutionContext(executor);
        final FlinkCompletableFuture resultFuture = new FlinkCompletableFuture();
        this.scalaFuture.onComplete((Function1)new OnComplete<T>(){

            public void onComplete(Throwable failure, T success) throws Throwable {
                Object result = biFunction.apply(success, failure);
                resultFuture.complete(result);
            }
        }, executionContext);
        return resultFuture;
    }

    @Override
    public <R> Future<R> handle(BiFunction<? super T, Throwable, ? extends R> biFunction) {
        return this.handleAsync(biFunction, Executors.directExecutor());
    }

    @Override
    public <U, R> Future<R> thenCombineAsync(final Future<U> other, final BiFunction<? super T, ? super U, ? extends R> biFunction, Executor executor) {
        Preconditions.checkNotNull(other);
        Preconditions.checkNotNull(biFunction);
        Preconditions.checkNotNull((Object)executor);
        ExecutionContext executionContext = FlinkFuture.createExecutionContext(executor);
        scala.concurrent.Future thatScalaFuture = other instanceof FlinkFuture ? ((FlinkFuture)other).scalaFuture : Futures.future((Callable)new Callable<U>(){

            @Override
            public U call() throws Exception {
                try {
                    return other.get();
                }
                catch (ExecutionException e) {
                    if (e.getCause() instanceof Exception) {
                        throw (Exception)e.getCause();
                    }
                    throw new ThrowableWrapperException(e.getCause());
                }
            }
        }, (ExecutionContext)executionContext);
        scala.concurrent.Future result = this.scalaFuture.zip(thatScalaFuture).map((Function1)new Mapper<Tuple2<T, U>, R>(){

            public R apply(Tuple2<T, U> tuple2) {
                return biFunction.apply(tuple2._1, tuple2._2);
            }
        }, executionContext);
        return new FlinkFuture<T>(result);
    }

    @Override
    public <U, R> Future<R> thenCombine(Future<U> other, BiFunction<? super T, ? super U, ? extends R> biFunction) {
        return this.thenCombineAsync(other, biFunction, Executors.directExecutor());
    }

    public static <T> Future<T> supplyAsync(Callable<T> callable, Executor executor) {
        Preconditions.checkNotNull(callable);
        Preconditions.checkNotNull((Object)executor);
        scala.concurrent.Future scalaFuture = Futures.future(callable, (ExecutionContext)FlinkFuture.createExecutionContext(executor));
        return new FlinkFuture<T>(scalaFuture);
    }

    private static ExecutionContext createExecutionContext(final Executor executor) {
        return ExecutionContexts$.MODULE$.fromExecutor(executor, (Procedure)new Procedure<Throwable>(){

            public void apply(Throwable throwable) throws Exception {
                if (executor instanceof ExecutorService) {
                    ExecutorService executorService = (ExecutorService)executor;
                    if (!executorService.isShutdown()) {
                        this.logThrowable(throwable);
                    }
                } else {
                    this.logThrowable(throwable);
                }
            }

            private void logThrowable(Throwable throwable) {
                LOG.warn("Uncaught exception in execution context.", throwable);
            }
        });
    }

    private static class ThrowableWrapperException
    extends Exception {
        private static final long serialVersionUID = 3855668690181179801L;

        ThrowableWrapperException(Throwable throwable) {
            super((Throwable)Preconditions.checkNotNull((Object)throwable));
        }
    }
}

