/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network.netty;

import java.nio.ByteBuffer;
import org.apache.spark.SecurityManager;
import org.apache.spark.SparkConf;
import org.apache.spark.network.BlockDataManager;
import org.apache.spark.network.BlockTransferService;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.netty.NettyBlockRpcServer;
import org.apache.spark.network.netty.NettyBlockTransferService$;
import org.apache.spark.network.netty.SparkTransportConf$;
import org.apache.spark.network.sasl.SaslClientBootstrap;
import org.apache.spark.network.sasl.SaslServerBootstrap;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.OneForOneBlockFetcher;
import org.apache.spark.network.shuffle.RetryingBlockFetcher;
import org.apache.spark.network.shuffle.protocol.UploadBlock;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.serializer.JavaSerializer;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.util.Utils$;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.mutable.StringBuilder;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\u00055e\u0001B\u0001\u0003\u00015\u0011\u0011DT3uif\u0014En\\2l)J\fgn\u001d4feN+'O^5dK*\u00111\u0001B\u0001\u0006]\u0016$H/\u001f\u0006\u0003\u000b\u0019\tqA\\3uo>\u00148N\u0003\u0002\b\u0011\u0005)1\u000f]1sW*\u0011\u0011BC\u0001\u0007CB\f7\r[3\u000b\u0003-\t1a\u001c:h\u0007\u0001\u0019\"\u0001\u0001\b\u0011\u0005=\u0001R\"\u0001\u0003\n\u0005E!!\u0001\u0006\"m_\u000e\\GK]1og\u001a,'oU3sm&\u001cW\r\u0003\u0005\u0014\u0001\t\u0005\t\u0015!\u0003\u0015\u0003\u0011\u0019wN\u001c4\u0011\u0005U1R\"\u0001\u0004\n\u0005]1!!C*qCJ\\7i\u001c8g\u0011!I\u0002A!A!\u0002\u0013Q\u0012aD:fGV\u0014\u0018\u000e^=NC:\fw-\u001a:\u0011\u0005UY\u0012B\u0001\u000f\u0007\u0005=\u0019VmY;sSRLX*\u00198bO\u0016\u0014\b\u0002\u0003\u0010\u0001\u0005\u0003\u0005\u000b\u0011B\u0010\u0002\u00119,XnQ8sKN\u0004\"\u0001I\u0012\u000e\u0003\u0005R\u0011AI\u0001\u0006g\u000e\fG.Y\u0005\u0003I\u0005\u00121!\u00138u\u0011\u00151\u0003\u0001\"\u0001(\u0003\u0019a\u0014N\\5u}Q!\u0001FK\u0016-!\tI\u0003!D\u0001\u0003\u0011\u0015\u0019R\u00051\u0001\u0015\u0011\u0015IR\u00051\u0001\u001b\u0011\u0015qR\u00051\u0001 \u0011\u001dq\u0003A1A\u0005\n=\n!b]3sS\u0006d\u0017N_3s+\u0005\u0001\u0004CA\u00194\u001b\u0005\u0011$B\u0001\u0018\u0007\u0013\t!$G\u0001\bKCZ\f7+\u001a:jC2L'0\u001a:\t\rY\u0002\u0001\u0015!\u00031\u0003-\u0019XM]5bY&TXM\u001d\u0011\t\u000fa\u0002!\u0019!C\u0005s\u0005Y\u0011-\u001e;i\u000b:\f'\r\\3e+\u0005Q\u0004C\u0001\u0011<\u0013\ta\u0014EA\u0004C_>dW-\u00198\t\ry\u0002\u0001\u0015!\u0003;\u00031\tW\u000f\u001e5F]\u0006\u0014G.\u001a3!\u0011\u001d\u0001\u0005A1A\u0005\n\u0005\u000bQ\u0002\u001e:b]N\u0004xN\u001d;D_:4W#\u0001\"\u0011\u0005\r3U\"\u0001#\u000b\u0005\u0015#\u0011\u0001B;uS2L!a\u0012#\u0003\u001bQ\u0013\u0018M\\:q_J$8i\u001c8g\u0011\u0019I\u0005\u0001)A\u0005\u0005\u0006qAO]1ogB|'\u000f^\"p]\u001a\u0004\u0003\"C&\u0001\u0001\u0004\u0005\t\u0015)\u0003M\u0003A!(/\u00198ta>\u0014HoQ8oi\u0016DH\u000f\u0005\u0002\u0010\u001b&\u0011a\n\u0002\u0002\u0011)J\fgn\u001d9peR\u001cuN\u001c;fqRD\u0011\u0002\u0015\u0001A\u0002\u0003\u0005\u000b\u0015B)\u0002\rM,'O^3s!\t\u0011F+D\u0001T\u0015\t\u0001F!\u0003\u0002V'\nyAK]1ogB|'\u000f^*feZ,'\u000fC\u0005X\u0001\u0001\u0007\t\u0011)Q\u00051\u0006i1\r\\5f]R4\u0015m\u0019;pef\u0004\"!\u0017/\u000e\u0003iS!a\u0017\u0003\u0002\r\rd\u0017.\u001a8u\u0013\ti&L\u0001\fUe\u0006t7\u000f]8si\u000ec\u0017.\u001a8u\r\u0006\u001cGo\u001c:z\u0011%y\u0006\u00011A\u0001B\u0003&\u0001-A\u0003baBLE\r\u0005\u0002bI:\u0011\u0001EY\u0005\u0003G\u0006\na\u0001\u0015:fI\u00164\u0017BA3g\u0005\u0019\u0019FO]5oO*\u00111-\t\u0005\u0006Q\u0002!\t%[\u0001\u0005S:LG\u000f\u0006\u0002k[B\u0011\u0001e[\u0005\u0003Y\u0006\u0012A!\u00168ji\")an\u001aa\u0001_\u0006\u0001\"\r\\8dW\u0012\u000bG/Y'b]\u0006<WM\u001d\t\u0003\u001fAL!!\u001d\u0003\u0003!\tcwnY6ECR\fW*\u00198bO\u0016\u0014\b\"B:\u0001\t\u0013!\u0018\u0001D2sK\u0006$XmU3sm\u0016\u0014HCA)v\u0011\u00151(\u000f1\u0001x\u0003)\u0011wn\u001c;tiJ\f\u0007o\u001d\t\u0006q\u0006\u0005\u0011q\u0001\b\u0003szt!A_?\u000e\u0003mT!\u0001 \u0007\u0002\rq\u0012xn\u001c;?\u0013\u0005\u0011\u0013BA@\"\u0003\u001d\u0001\u0018mY6bO\u0016LA!a\u0001\u0002\u0006\t!A*[:u\u0015\ty\u0018\u0005E\u0002S\u0003\u0013I1!a\u0003T\u0005a!&/\u00198ta>\u0014HoU3sm\u0016\u0014(i\\8ugR\u0014\u0018\r\u001d\u0005\b\u0003\u001f\u0001A\u0011IA\t\u0003-1W\r^2i\u00052|7m[:\u0015\u0017)\f\u0019\"a\u0006\u0002\u001c\u0005}\u0011\u0011\u0006\u0005\b\u0003+\ti\u00011\u0001a\u0003\u0011Awn\u001d;\t\u000f\u0005e\u0011Q\u0002a\u0001?\u0005!\u0001o\u001c:u\u0011\u001d\ti\"!\u0004A\u0002\u0001\fa!\u001a=fG&#\u0007\u0002CA\u0011\u0003\u001b\u0001\r!a\t\u0002\u0011\tdwnY6JIN\u0004B\u0001IA\u0013A&\u0019\u0011qE\u0011\u0003\u000b\u0005\u0013(/Y=\t\u0011\u0005-\u0012Q\u0002a\u0001\u0003[\t\u0001\u0002\\5ti\u0016tWM\u001d\t\u0005\u0003_\t)$\u0004\u0002\u00022)\u0019\u00111\u0007\u0003\u0002\u000fMDWO\u001a4mK&!\u0011qGA\u0019\u0005U\u0011En\\2l\r\u0016$8\r[5oO2K7\u000f^3oKJDq!a\u000f\u0001\t\u0003\ni$\u0001\u0005i_N$h*Y7f+\u0005\u0001\u0007bBA\r\u0001\u0011\u0005\u0013\u0011I\u000b\u0002?!9\u0011Q\t\u0001\u0005B\u0005\u001d\u0013aC;qY>\fGM\u00117pG.$b\"!\u0013\u0002V\u0005e\u00131LA/\u0003[\ni\bE\u0003\u0002L\u0005E#.\u0004\u0002\u0002N)\u0019\u0011qJ\u0011\u0002\u0015\r|gnY;se\u0016tG/\u0003\u0003\u0002T\u00055#A\u0002$viV\u0014X\rC\u0004\u0002X\u0005\r\u0003\u0019\u00011\u0002\u0011!|7\u000f\u001e8b[\u0016Dq!!\u0007\u0002D\u0001\u0007q\u0004C\u0004\u0002\u001e\u0005\r\u0003\u0019\u00011\t\u0011\u0005}\u00131\ta\u0001\u0003C\nqA\u00197pG.LE\r\u0005\u0003\u0002d\u0005%TBAA3\u0015\r\t9GB\u0001\bgR|'/Y4f\u0013\u0011\tY'!\u001a\u0003\u000f\tcwnY6JI\"A\u0011qNA\"\u0001\u0004\t\t(A\u0005cY>\u001c7\u000eR1uCB!\u00111OA=\u001b\t\t)HC\u0002\u0002x\u0011\taAY;gM\u0016\u0014\u0018\u0002BA>\u0003k\u0012Q\"T1oC\u001e,GMQ;gM\u0016\u0014\b\u0002CA@\u0003\u0007\u0002\r!!!\u0002\u000b1,g/\u001a7\u0011\t\u0005\r\u00141Q\u0005\u0005\u0003\u000b\u000b)G\u0001\u0007Ti>\u0014\u0018mZ3MKZ,G\u000eC\u0004\u0002\n\u0002!\t%a#\u0002\u000b\rdwn]3\u0015\u0003)\u0004")
public class NettyBlockTransferService
extends BlockTransferService {
    private final SparkConf conf;
    private final SecurityManager securityManager;
    private final JavaSerializer serializer;
    private final boolean authEnabled;
    private final TransportConf transportConf;
    private TransportContext transportContext;
    public TransportServer org$apache$spark$network$netty$NettyBlockTransferService$$server;
    public TransportClientFactory org$apache$spark$network$netty$NettyBlockTransferService$$clientFactory;
    public String org$apache$spark$network$netty$NettyBlockTransferService$$appId;

    private JavaSerializer serializer() {
        return this.serializer;
    }

    private boolean authEnabled() {
        return this.authEnabled;
    }

    private TransportConf transportConf() {
        return this.transportConf;
    }

    @Override
    public void init(BlockDataManager blockDataManager) {
        NettyBlockRpcServer rpcHandler = new NettyBlockRpcServer(this.conf.getAppId(), this.serializer(), blockDataManager);
        None$ serverBootstrap = None$.MODULE$;
        None$ clientBootstrap = None$.MODULE$;
        if (this.authEnabled()) {
            serverBootstrap = new Some((Object)new SaslServerBootstrap(this.transportConf(), (SecretKeyHolder)this.securityManager));
            clientBootstrap = new Some((Object)new SaslClientBootstrap(this.transportConf(), this.conf.getAppId(), (SecretKeyHolder)this.securityManager, this.securityManager.isSaslEncryptionEnabled()));
        }
        this.transportContext = new TransportContext(this.transportConf(), (RpcHandler)rpcHandler);
        this.org$apache$spark$network$netty$NettyBlockTransferService$$clientFactory = this.transportContext.createClientFactory((java.util.List)JavaConverters$.MODULE$.seqAsJavaListConverter(Option$.MODULE$.option2Iterable((Option)clientBootstrap).toSeq()).asJava());
        this.org$apache$spark$network$netty$NettyBlockTransferService$$server = this.createServer((List<TransportServerBootstrap>)serverBootstrap.toList());
        this.org$apache$spark$network$netty$NettyBlockTransferService$$appId = this.conf.getAppId();
        this.logInfo((Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ NettyBlockTransferService $outer;

            public final String apply() {
                return new StringBuilder().append((Object)"Server created on ").append((Object)BoxesRunTime.boxToInteger((int)this.$outer.org$apache$spark$network$netty$NettyBlockTransferService$$server.getPort())).toString();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
    }

    private TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
        int portToTry = this.conf.getInt("spark.blockManager.port", 0);
        return (TransportServer)Utils$.MODULE$.startServiceOnPort(portToTry, new Serializable(this, bootstraps){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ NettyBlockTransferService $outer;
            private final List bootstraps$1;

            public final Tuple2<TransportServer, Object> apply(int port) {
                return this.$outer.org$apache$spark$network$netty$NettyBlockTransferService$$startService$1(port, this.bootstraps$1);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.bootstraps$1 = bootstraps$1;
            }
        }, this.conf, this.getClass().getName())._1();
    }

    @Override
    public void fetchBlocks(String host, int port, String execId, String[] blockIds, BlockFetchingListener listener) {
        this.logTrace((Function0<String>)new Serializable(this, host, port, execId){
            public static final long serialVersionUID = 0L;
            private final String host$1;
            private final int port$1;
            private final String execId$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Fetch blocks from ", ":", " (executor id ", ")"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.host$1, BoxesRunTime.boxToInteger((int)this.port$1), this.execId$1}));
            }
            {
                this.host$1 = host$1;
                this.port$1 = port$1;
                this.execId$1 = execId$1;
            }
        });
        try {
            RetryingBlockFetcher.BlockFetchStarter blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter(this, host, port, execId){
                private final /* synthetic */ NettyBlockTransferService $outer;
                private final String host$1;
                private final int port$1;
                private final String execId$1;

                public void createAndStart(String[] blockIds, BlockFetchingListener listener) {
                    TransportClient client = this.$outer.org$apache$spark$network$netty$NettyBlockTransferService$$clientFactory.createClient(this.host$1, this.port$1);
                    new OneForOneBlockFetcher(client, this.$outer.org$apache$spark$network$netty$NettyBlockTransferService$$appId, this.execId$1, (String[])Predef$.MODULE$.refArrayOps((Object[])blockIds).toArray(ClassTag$.MODULE$.apply(String.class)), listener).start();
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.host$1 = host$1;
                    this.port$1 = port$1;
                    this.execId$1 = execId$1;
                }
            };
            int maxRetries = this.transportConf().maxIORetries();
            if (maxRetries > 0) {
                new RetryingBlockFetcher(this.transportConf(), blockFetchStarter, blockIds, listener).start();
            } else {
                blockFetchStarter.createAndStart(blockIds, listener);
            }
        }
        catch (Exception exception2) {
            this.logError((Function0<String>)new Serializable(this){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Exception while beginning fetchBlocks";
                }
            }, exception2);
            Predef$.MODULE$.refArrayOps((Object[])blockIds).foreach((Function1)new Serializable(this, listener, exception2){
                public static final long serialVersionUID = 0L;
                private final BlockFetchingListener listener$1;
                private final Exception e$1;

                public final void apply(String x$1) {
                    this.listener$1.onBlockFetchFailure(x$1, (Throwable)this.e$1);
                }
                {
                    this.listener$1 = listener$1;
                    this.e$1 = e$1;
                }
            });
        }
    }

    @Override
    public String hostName() {
        return Utils$.MODULE$.localHostName();
    }

    @Override
    public int port() {
        return this.org$apache$spark$network$netty$NettyBlockTransferService$$server.getPort();
    }

    @Override
    public Future<BoxedUnit> uploadBlock(String hostname, int port, String execId, BlockId blockId, ManagedBuffer blockData, StorageLevel level) {
        byte[] byArray;
        Promise result = Promise$.MODULE$.apply();
        TransportClient client = this.org$apache$spark$network$netty$NettyBlockTransferService$$clientFactory.createClient(hostname, port);
        byte[] levelBytes = this.serializer().newInstance().serialize(level, ClassTag$.MODULE$.apply(StorageLevel.class)).array();
        ByteBuffer nioBuffer = blockData.nioByteBuffer();
        if (nioBuffer.hasArray()) {
            byArray = nioBuffer.array();
        } else {
            byte[] data = new byte[nioBuffer.remaining()];
            nioBuffer.get(data);
            byArray = data;
        }
        byte[] array = byArray;
        client.sendRpc(new UploadBlock(this.org$apache$spark$network$netty$NettyBlockTransferService$$appId, execId, blockId.toString(), levelBytes, array).toByteBuffer(), new RpcResponseCallback(this, blockId, result){
            private final /* synthetic */ NettyBlockTransferService $outer;
            public final BlockId blockId$1;
            private final Promise result$1;

            public void onSuccess(ByteBuffer response) {
                this.$outer.logTrace((Function0<String>)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$2 $outer;

                    public final String apply() {
                        return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Successfully uploaded block ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.blockId$1}));
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                });
                this.result$1.success((Object)BoxedUnit.UNIT);
            }

            public void onFailure(Throwable e) {
                this.$outer.logError((Function0<String>)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$2 $outer;

                    public final String apply() {
                        return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Error while uploading block ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.blockId$1}));
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                }, e);
                this.result$1.failure(e);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.blockId$1 = blockId$1;
                this.result$1 = result$1;
            }
        });
        return result.future();
    }

    @Override
    public void close() {
        if (this.org$apache$spark$network$netty$NettyBlockTransferService$$server != null) {
            this.org$apache$spark$network$netty$NettyBlockTransferService$$server.close();
        }
        if (this.org$apache$spark$network$netty$NettyBlockTransferService$$clientFactory != null) {
            this.org$apache$spark$network$netty$NettyBlockTransferService$$clientFactory.close();
        }
    }

    public final Tuple2 org$apache$spark$network$netty$NettyBlockTransferService$$startService$1(int port, List bootstraps$1) {
        TransportServer server = this.transportContext.createServer(port, (java.util.List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)bootstraps$1).asJava());
        return new Tuple2((Object)server, (Object)BoxesRunTime.boxToInteger((int)server.getPort()));
    }

    public NettyBlockTransferService(SparkConf conf, SecurityManager securityManager, int numCores) {
        this.conf = conf;
        this.securityManager = securityManager;
        this.serializer = new JavaSerializer(conf);
        this.authEnabled = securityManager.isAuthenticationEnabled();
        this.transportConf = SparkTransportConf$.MODULE$.fromSparkConf(conf, "shuffle", numCores);
    }
}

