package org.apache.dolphinscheduler.extract.base;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.concurrent.ExecutorService;
import lombok.Generated;
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
import org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.serialize.JsonSerializer;
import org.apache.dolphinscheduler.extract.base.utils.ChannelUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
/* loaded from: input_file:org/apache/dolphinscheduler/extract/base/NettyClientHandler.class */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(NettyClientHandler.class);
    private final NettyRemotingClient nettyRemotingClient;
    private final ExecutorService callbackExecutor;

    public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService executorService) {
        this.nettyRemotingClient = nettyRemotingClient;
        this.callbackExecutor = executorService;
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        this.nettyRemotingClient.closeChannel(ChannelUtils.toAddress(channelHandlerContext.channel()));
        channelHandlerContext.channel().close();
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        processReceived((Transporter) obj);
    }

    private void processReceived(Transporter transporter) {
        ResponseFuture future = ResponseFuture.getFuture(transporter.getHeader().getOpaque());
        if (future == null) {
            log.warn("Cannot find the ResponseFuture if transporter: {}", transporter);
            return;
        }
        StandardRpcResponse standardRpcResponse = (StandardRpcResponse) JsonSerializer.deserialize(transporter.getBody(), StandardRpcResponse.class);
        future.setIRpcResponse(standardRpcResponse);
        future.release();
        if (future.getInvokeCallback() == null) {
            future.putResponse(standardRpcResponse);
            return;
        }
        future.removeFuture();
        ExecutorService executorService = this.callbackExecutor;
        future.getClass();
        executorService.execute(future::executeInvokeCallback);
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        log.error("NettyClientHandler catch an exception : {}", th.getMessage(), th);
        this.nettyRemotingClient.closeChannel(ChannelUtils.toAddress(channelHandlerContext.channel()));
        channelHandlerContext.channel().close();
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!(obj instanceof IdleStateEvent)) {
            super.userEventTriggered(channelHandlerContext, obj);
            return;
        }
        channelHandlerContext.channel().writeAndFlush(HeartBeatTransporter.getHeartBeatTransporter()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        if (log.isDebugEnabled()) {
            log.debug("Client send heart beat to: {}", ChannelUtils.getRemoteAddress(channelHandlerContext.channel()));
        }
    }
}
