package org.apache.dolphinscheduler.extract.base;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
import org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
import org.apache.dolphinscheduler.extract.base.exception.RemotingTooMuchRequestException;
import org.apache.dolphinscheduler.extract.base.future.InvokeCallback;
import org.apache.dolphinscheduler.extract.base.future.ReleaseSemaphore;
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
import org.apache.dolphinscheduler.extract.base.utils.CallerThreadExecutePolicy;
import org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/extract/base/NettyRemotingClient.class */
public class NettyRemotingClient implements AutoCloseable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(NettyRemotingClient.class);
    private final EventLoopGroup workerGroup;
    private final NettyClientConfig clientConfig;
    private final ExecutorService callbackExecutor;
    private final NettyClientHandler clientHandler;
    private final ScheduledExecutorService responseFutureExecutor;
    private final Bootstrap bootstrap = new Bootstrap();
    private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap<>(128);
    private final AtomicBoolean isStarted = new AtomicBoolean(false);
    private final Semaphore asyncSemaphore = new Semaphore(1024, true);

    public NettyRemotingClient(NettyClientConfig nettyClientConfig) {
        this.clientConfig = nettyClientConfig;
        ThreadFactory newDaemonThreadFactory = ThreadUtils.newDaemonThreadFactory("NettyClientThread-");
        if (Epoll.isAvailable()) {
            this.workerGroup = new EpollEventLoopGroup(nettyClientConfig.getWorkerThreads(), newDaemonThreadFactory);
        } else {
            this.workerGroup = new NioEventLoopGroup(nettyClientConfig.getWorkerThreads(), newDaemonThreadFactory);
        }
        this.callbackExecutor = new ThreadPoolExecutor(Constants.CPUS, Constants.CPUS, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue(1000), ThreadUtils.newDaemonThreadFactory("NettyClientCallbackThread-"), new CallerThreadExecutePolicy());
        this.clientHandler = new NettyClientHandler(this, this.callbackExecutor);
        this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(ThreadUtils.newDaemonThreadFactory("NettyClientResponseFutureThread-"));
        start();
    }

    private void start() {
        this.bootstrap.group(this.workerGroup).channel(NettyUtils.getSocketChannelClass()).option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(this.clientConfig.isSoKeepalive())).option(ChannelOption.TCP_NODELAY, Boolean.valueOf(this.clientConfig.isTcpNoDelay())).option(ChannelOption.SO_SNDBUF, Integer.valueOf(this.clientConfig.getSendBufferSize())).option(ChannelOption.SO_RCVBUF, Integer.valueOf(this.clientConfig.getReceiveBufferSize())).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(this.clientConfig.getConnectTimeoutMillis())).handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.dolphinscheduler.extract.base.NettyRemotingClient.1
            public void initChannel(SocketChannel socketChannel) {
                socketChannel.pipeline().addLast("client-idle-handler", new IdleStateHandler(6000L, 0L, 0L, TimeUnit.MILLISECONDS)).addLast(new ChannelHandler[]{new TransporterDecoder(), NettyRemotingClient.this.clientHandler, new TransporterEncoder()});
            }
        });
        this.responseFutureExecutor.scheduleWithFixedDelay(ResponseFuture::scanFutureTable, 0L, 1L, TimeUnit.SECONDS);
        this.isStarted.compareAndSet(false, true);
    }

    public void sendAsync(Host host, Transporter transporter, long j, InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
        Channel channel = getChannel(host);
        if (channel == null) {
            throw new RemotingException("network error");
        }
        long opaque = transporter.getHeader().getOpaque();
        if (!this.asyncSemaphore.tryAcquire(j, TimeUnit.MILLISECONDS)) {
            throw new RemotingTooMuchRequestException(String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d", Long.valueOf(j), Integer.valueOf(this.asyncSemaphore.getQueueLength()), Integer.valueOf(this.asyncSemaphore.availablePermits())));
        }
        ResponseFuture responseFuture = new ResponseFuture(opaque, j, invokeCallback, new ReleaseSemaphore(this.asyncSemaphore));
        try {
            channel.writeAndFlush(transporter).addListener(future -> {
                if (future.isSuccess()) {
                    responseFuture.setSendOk(true);
                    return;
                }
                responseFuture.setSendOk(false);
                responseFuture.setCause(future.cause());
                responseFuture.putResponse(null);
                try {
                    responseFuture.executeInvokeCallback();
                } catch (Exception e) {
                    log.error("execute callback error", e);
                } finally {
                    responseFuture.release();
                }
            });
        } catch (Exception e) {
            responseFuture.release();
            throw new RemotingException(String.format("Send transporter to host: %s failed", host), e);
        }
    }

    public IRpcResponse sendSync(Host host, Transporter transporter, long j) throws InterruptedException, RemotingException {
        Channel channel = getChannel(host);
        if (channel == null) {
            throw new RemotingException(String.format("connect to : %s fail", host));
        }
        ResponseFuture responseFuture = new ResponseFuture(transporter.getHeader().getOpaque(), j, null, null);
        channel.writeAndFlush(transporter).addListener(future -> {
            if (future.isSuccess()) {
                responseFuture.setSendOk(true);
                return;
            }
            responseFuture.setSendOk(false);
            responseFuture.setCause(future.cause());
            responseFuture.putResponse(null);
            log.error("Send Sync request {} to host {} failed", new Object[]{transporter, host, responseFuture.getCause()});
        });
        IRpcResponse waitResponse = responseFuture.waitResponse();
        if (waitResponse != null) {
            return waitResponse;
        }
        if (responseFuture.isSendOK()) {
            throw new RemotingTimeoutException(host.toString(), j, responseFuture.getCause());
        }
        throw new RemotingException(host.toString(), responseFuture.getCause());
    }

    public Channel getChannel(Host host) {
        Channel channel = this.channels.get(host);
        return (channel == null || !channel.isActive()) ? createChannel(host, true) : channel;
    }

    public Channel createChannel(Host host, boolean z) {
        ChannelFuture connect;
        try {
            synchronized (this.bootstrap) {
                connect = this.bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort()));
            }
            if (z) {
                connect.sync();
            }
            if (!connect.isSuccess()) {
                return null;
            }
            Channel channel = connect.channel();
            this.channels.put(host, channel);
            return channel;
        } catch (Exception e) {
            log.warn(String.format("connect to %s error", host), e);
            return null;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.isStarted.compareAndSet(true, false)) {
            try {
                closeChannels();
                if (this.workerGroup != null) {
                    this.workerGroup.shutdownGracefully();
                }
                if (this.callbackExecutor != null) {
                    this.callbackExecutor.shutdownNow();
                }
                if (this.responseFutureExecutor != null) {
                    this.responseFutureExecutor.shutdownNow();
                }
                log.info("netty client closed");
            } catch (Exception e) {
                log.error("netty client close exception", e);
            }
        }
    }

    private void closeChannels() {
        Iterator<Channel> it = this.channels.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.channels.clear();
    }

    public void closeChannel(Host host) {
        Channel remove = this.channels.remove(host);
        if (remove != null) {
            remove.close();
        }
    }
}
