/*
 * Decompiled with CFR 0.152.
 */
package com.suning.api.push;

import com.suning.api.link.io.netty.bootstrap.Bootstrap;
import com.suning.api.link.io.netty.buffer.PooledByteBufAllocator;
import com.suning.api.link.io.netty.buffer.Unpooled;
import com.suning.api.link.io.netty.channel.AdaptiveRecvByteBufAllocator;
import com.suning.api.link.io.netty.channel.Channel;
import com.suning.api.link.io.netty.channel.ChannelFuture;
import com.suning.api.link.io.netty.channel.ChannelFutureListener;
import com.suning.api.link.io.netty.channel.ChannelHandlerContext;
import com.suning.api.link.io.netty.channel.ChannelInitializer;
import com.suning.api.link.io.netty.channel.ChannelOption;
import com.suning.api.link.io.netty.channel.ChannelPipeline;
import com.suning.api.link.io.netty.channel.ChannelPromise;
import com.suning.api.link.io.netty.channel.EventLoop;
import com.suning.api.link.io.netty.channel.EventLoopGroup;
import com.suning.api.link.io.netty.channel.SimpleChannelInboundHandler;
import com.suning.api.link.io.netty.channel.nio.NioEventLoopGroup;
import com.suning.api.link.io.netty.channel.socket.nio.NioSocketChannel;
import com.suning.api.link.io.netty.handler.codec.http.DefaultHttpHeaders;
import com.suning.api.link.io.netty.handler.codec.http.FullHttpResponse;
import com.suning.api.link.io.netty.handler.codec.http.HttpClientCodec;
import com.suning.api.link.io.netty.handler.codec.http.HttpObjectAggregator;
import com.suning.api.link.io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import com.suning.api.link.io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import com.suning.api.link.io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import com.suning.api.link.io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import com.suning.api.link.io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import com.suning.api.link.io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import com.suning.api.link.io.netty.handler.codec.http.websocketx.WebSocketFrame;
import com.suning.api.link.io.netty.handler.codec.http.websocketx.WebSocketVersion;
import com.suning.api.link.io.netty.handler.timeout.IdleState;
import com.suning.api.link.io.netty.handler.timeout.IdleStateEvent;
import com.suning.api.link.io.netty.handler.timeout.IdleStateHandler;
import com.suning.api.link.io.netty.util.CharsetUtil;
import com.suning.api.message.Message;
import com.suning.api.message.MessageType;
import com.suning.api.push.MessageListener;
import com.suning.api.push.MessagePushConfig;
import com.suning.api.push.NamedThreadFactory;
import com.suning.api.util.MessageUtils;
import com.suning.api.util.NetUtil;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessagePushClient {
    private URI uri;
    private String appKey;
    private String appSecret;
    private String groupName;
    private InnerClient client;
    private MessageListener listener;
    private MessagePushConfig config;
    private ThreadPoolExecutor threadPool;
    private int queueSize = 2000;
    private int threadCount = Runtime.getRuntime().availableProcessors() * 8;
    private int fetchPeriod = 30;

    public MessagePushClient(String uri, String appKey, String appSecret, String groupName) {
        try {
            this.uri = new URI(uri);
        }
        catch (URISyntaxException e) {
            throw new IllegalArgumentException("URISyntaxException: " + uri);
        }
        this.appKey = appKey;
        this.appSecret = appSecret;
        this.groupName = groupName;
        this.initThreadPool();
        this.client = new InnerClient();
    }

    public MessagePushClient(String uri, MessagePushConfig config, String groupName) {
        try {
            this.uri = new URI(uri);
        }
        catch (URISyntaxException e) {
            throw new IllegalArgumentException("URISyntaxException: " + uri);
        }
        if (null == config) {
            throw new IllegalArgumentException("MessagePushConfig is null");
        }
        this.appKey = config.getAppKey();
        this.appSecret = config.getAppSecret();
        this.groupName = groupName;
        this.initThreadPool();
        this.client = new InnerClient();
    }

    public void initThreadPool() {
        this.threadPool = new ThreadPoolExecutor(this.threadCount, this.threadCount, (long)(this.fetchPeriod * 2), TimeUnit.MICROSECONDS, new ArrayBlockingQueue<Runnable>(this.queueSize), new NamedThreadFactory("push-worker"));
    }

    public MessagePushConfig getConfig() {
        return this.config;
    }

    public String getAppKey() {
        if (null != this.config) {
            return this.config.getAppKey();
        }
        return this.appKey;
    }

    public String getAppSecret() {
        if (null != this.config) {
            return this.config.getAppSecret();
        }
        return this.appSecret;
    }

    public synchronized void connect() throws Exception {
        if (!this.isActive()) {
            this.client.run();
        }
    }

    public synchronized void close() {
        if (this.threadPool != null) {
            this.threadPool.shutdown();
            this.threadPool = null;
        }
        this.client.activeClose();
        this.client.close();
    }

    public boolean isActive() {
        return this.client.isActive();
    }

    public void send(String topic, String content) {
        this.client.send(topic, content);
    }

    public void setMessageListener(MessageListener listener) {
        this.listener = listener;
    }

    public ThreadPoolExecutor getThreadPool() {
        return this.threadPool;
    }

    public synchronized void activeClose() {
        this.client.activeClose();
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    class MessageHandler
    extends SimpleChannelInboundHandler<Object> {
        private final Logger LOG = LoggerFactory.getLogger(MessageHandler.class);
        private final WebSocketClientHandshaker handshaker;
        private ChannelPromise handshakeFuture;
        private ChannelPromise authFuture;
        private MessageListener listener;
        private CountDownLatch latch;

        public MessageHandler(CountDownLatch latch) {
            DefaultHttpHeaders customHeaders = new DefaultHttpHeaders();
            this.handshaker = WebSocketClientHandshakerFactory.newHandshaker(MessagePushClient.this.uri, WebSocketVersion.V13, null, false, customHeaders);
            this.latch = latch;
        }

        public ChannelFuture handshakeFuture() {
            return this.handshakeFuture;
        }

        public ChannelFuture authFuture() {
            return this.authFuture;
        }

        public void setListener(MessageListener listener) {
            this.listener = listener;
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            this.handshakeFuture = ctx.newPromise();
            this.authFuture = ctx.newPromise();
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            this.handshaker.handshake(ctx.channel());
        }

        @Override
        public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
            IdleStateEvent e;
            if (evt instanceof IdleStateEvent && (e = (IdleStateEvent)evt).state() == IdleState.ALL_IDLE) {
                this.LOG.error("channel {} trigger all idle event,5 sec later will be close!", (Object)ctx.channel());
                ctx.writeAndFlush(new CloseWebSocketFrame());
                ctx.executor().schedule(new Runnable(){

                    public void run() {
                        MessageHandler.this.LOG.error("channel {} is closing!", (Object)ctx.channel());
                        ctx.close();
                    }
                }, 5L, TimeUnit.SECONDS);
            }
        }

        @Override
        public void channelRead0(final ChannelHandlerContext ctx, Object msg) throws Exception {
            WebSocketFrame frame;
            Channel ch = ctx.channel();
            if (!this.handshaker.isHandshakeComplete()) {
                this.handshaker.finishHandshake(ch, (FullHttpResponse)msg);
                this.LOG.debug(String.format("client %s handshake success!", ctx.channel()));
                this.handshakeFuture.setSuccess();
                ctx.writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(MessagePushClient.this.client.authMessage(), CharsetUtil.UTF_8)));
                return;
            }
            if (!this.authFuture.isSuccess() && (frame = (WebSocketFrame)msg) instanceof BinaryWebSocketFrame) {
                BinaryWebSocketFrame authFrame = (BinaryWebSocketFrame)frame;
                Message authAckMsg = this.resovle(authFrame.content().toString(CharsetUtil.UTF_8));
                if (authAckMsg.getMsg().equals("OK")) {
                    this.authFuture.setSuccess();
                    MessagePushClient.this.client.setConnected(true);
                    ctx.writeAndFlush(new PingWebSocketFrame(Unpooled.copiedBuffer(new byte[]{1}))).addListener(new ChannelFutureListener(){

                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isSuccess()) {
                                MessageHandler.this.latch.countDown();
                            }
                        }
                    });
                } else {
                    this.authFuture.setFailure(new RuntimeException(authAckMsg.getMsg()));
                    MessagePushClient.this.client.close();
                }
                return;
            }
            if (msg instanceof FullHttpResponse) {
                FullHttpResponse response = (FullHttpResponse)msg;
                throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
            }
            frame = (WebSocketFrame)msg;
            if (frame instanceof PongWebSocketFrame) {
                this.LOG.debug(String.format("client %s  received pong", ctx.channel()));
            } else if (frame instanceof CloseWebSocketFrame) {
                this.LOG.debug(String.format("client %s  received closing", ctx.channel()));
                ch.close();
            } else if (frame instanceof PingWebSocketFrame) {
                this.LOG.debug(String.format("client %s  received ping", ctx.channel()));
                ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
            } else if (frame instanceof BinaryWebSocketFrame) {
                BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame)frame;
                this.LOG.debug(String.format("client %s  received message %s", ctx.channel(), binaryFrame.content()));
                String respMsg = binaryFrame.content().toString(CharsetUtil.UTF_8);
                final Message message = this.resovle(respMsg);
                message.setRecevieDateTime(new Date());
                if (null != MessagePushClient.this.getThreadPool()) {
                    MessagePushClient.this.getThreadPool().submit(new Runnable(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         * Enabled aggressive block sorting
                         * Enabled unnecessary exception pruning
                         * Enabled aggressive exception aggregation
                         */
                        public void run() {
                            try {
                                try {
                                    MessageHandler.this.listener.onMessage(message);
                                }
                                catch (Exception e) {
                                    MessageHandler.this.LOG.error("handle message occur error,detail is ", (Throwable)e);
                                    Object var3_2 = null;
                                    String ackMsg = MessageUtils.buildAckMsg(message);
                                    ctx.writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(ackMsg, CharsetUtil.UTF_8)));
                                    return;
                                }
                                Object var3_1 = null;
                            }
                            catch (Throwable throwable) {
                                Object var3_3 = null;
                                String ackMsg = MessageUtils.buildAckMsg(message);
                                ctx.writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(ackMsg, CharsetUtil.UTF_8)));
                                throw throwable;
                            }
                            String ackMsg = MessageUtils.buildAckMsg(message);
                            ctx.writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(ackMsg, CharsetUtil.UTF_8)));
                        }
                    });
                }
            }
        }

        private Message resovle(String msg) {
            Message message = MessageUtils.toMessage(msg);
            return message;
        }

        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            MessagePushClient.this.client.channel = null;
            MessagePushClient.this.client.setConnected(false);
            this.LOG.debug(String.format("%s disconnected... will try to reconnect in 20 sec...", ctx.channel()));
            EventLoop loop = ctx.channel().eventLoop();
            loop.schedule(new Runnable(){

                public void run() {
                    try {
                        MessagePushClient.this.connect();
                    }
                    catch (Exception e) {
                        MessageHandler.this.LOG.error("reconnect client occur exception,detail is:{}", (Throwable)e);
                    }
                }
            }, 20L, TimeUnit.SECONDS);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            this.LOG.error("channel {} exceptionCaught:{}", (Object)ctx.channel(), (Object)cause);
            if (!this.handshakeFuture.isDone()) {
                this.handshakeFuture.setFailure(cause);
            }
            super.exceptionCaught(ctx, cause);
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    class MessageChannelInitializer
    extends ChannelInitializer<Channel> {
        private MessageHandler messageHandler;

        public MessageChannelInitializer(MessageHandler messageHandler) {
            this.messageHandler = messageHandler;
        }

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new HttpClientCodec());
            pipeline.addLast(new HttpObjectAggregator(8192));
            pipeline.addLast(new IdleStateHandler(0, 0, 45));
            pipeline.addLast(this.messageHandler);
        }
    }

    private class InnerClient {
        private final Logger log = LoggerFactory.getLogger(InnerClient.class);
        private volatile Channel channel = null;
        private ScheduledFuture<?> pingExecutor;
        private EventLoopGroup group = new NioEventLoopGroup();
        private volatile boolean connected = false;
        private CountDownLatch latch;

        public void run() throws Exception {
            this.connect();
            this.schedulePing();
        }

        private void send(String topic, String content) {
            try {
                this.latch.await();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            Message message = new Message(MessageType.TO);
            message.setAppKey(MessagePushClient.this.getAppKey());
            message.setMsg(content);
            message.setTopic(topic);
            String msg = MessageUtils.toMsg(message);
            this.channel.writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8)));
        }

        private void activeClose() {
            if (null != this.channel) {
                this.channel.writeAndFlush(new CloseWebSocketFrame()).addListener(ChannelFutureListener.CLOSE);
            }
        }

        public void schedulePing() {
            this.pingExecutor = this.group.scheduleAtFixedRate(new Runnable(){

                public void run() {
                    InnerClient.this.sendPingWebSocketFrame();
                }
            }, 1L, 30L, TimeUnit.SECONDS);
        }

        private void connect() throws Exception {
            this.latch = new CountDownLatch(1);
            MessageHandler handler = new MessageHandler(this.latch);
            handler.setListener(MessagePushClient.this.listener);
            String protocol = MessagePushClient.this.uri.getScheme();
            if (!"ws".equals(protocol)) {
                throw new IllegalArgumentException("Unsupported protocol: " + protocol);
            }
            this.log.debug(this.toString() + " start connect.");
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.remoteAddress(MessagePushClient.this.uri.getHost(), MessagePushClient.this.uri.getPort() == -1 ? 80 : MessagePushClient.this.uri.getPort());
            bootstrap.group(this.group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.TCP_NODELAY, true);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, false);
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
            bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
            bootstrap.handler(new MessageChannelInitializer(handler));
            this.channel = bootstrap.connect().sync().channel();
            handler.handshakeFuture().sync();
            handler.authFuture().sync();
            this.log.debug(this.toString() + " end connect.");
        }

        public boolean isActive() {
            return this.channel != null && this.connected;
        }

        private void sendPingWebSocketFrame() {
            try {
                if (this.isActive()) {
                    this.channel.writeAndFlush(new PingWebSocketFrame());
                }
            }
            catch (Throwable ex) {
                this.log.error("sendPingWebSocketFrame Exception:", ex);
            }
        }

        public void setConnected(boolean flag) {
            this.connected = flag;
        }

        public void close() {
            if (null != this.pingExecutor) {
                this.pingExecutor.cancel(true);
                this.pingExecutor = null;
            }
            if (null != this.group) {
                this.group.shutdownGracefully();
                this.group = null;
            }
        }

        private String authMessage() {
            MessagePushConfig config = MessagePushClient.this.getConfig();
            if (null != config) {
                return "{\"appKey\":\"" + config.getAppKey() + "\",\"appSecret\":\"" + config.getAppKey() + "\",\"groupName\":\"" + MessagePushClient.this.groupName + "\",\"clientIp\":\"" + NetUtil.getLocalAddress() + "\"}";
            }
            return "{\"appKey\":\"" + MessagePushClient.this.appKey + "\",\"appSecret\":\"" + MessagePushClient.this.appSecret + "\",\"groupName\":\"" + MessagePushClient.this.groupName + "\",\"clientIp\":\"" + NetUtil.getLocalAddress() + "\"}";
        }
    }
}

