/*
 * Decompiled with CFR 0.152.
 */
package com.xxl.mq.core.thread;

import com.xxl.mq.core.XxlMqHelper;
import com.xxl.mq.core.bootstrap.XxlMqBootstrap;
import com.xxl.mq.core.constant.MessageStatusEnum;
import com.xxl.mq.core.consumer.IConsumer;
import com.xxl.mq.core.context.XxlMqContext;
import com.xxl.mq.core.openapi.model.MessageData;
import com.xxl.tool.http.IPTool;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerThread {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerThread.class);
    private final XxlMqBootstrap xxlMqBootstrap;
    private final IConsumer consumer;
    private final ScheduledThreadPoolExecutor scheduledExecutorService;
    private volatile long lastExecuteTime;

    public ConsumerThread(XxlMqBootstrap xxlMqBootstrap, IConsumer consumer) {
        this.xxlMqBootstrap = xxlMqBootstrap;
        this.consumer = consumer;
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
        this.lastExecuteTime = System.currentTimeMillis();
    }

    public void stop() {
        try {
            this.scheduledExecutorService.shutdown();
            if (!this.scheduledExecutorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                this.scheduledExecutorService.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            this.scheduledExecutorService.shutdownNow();
        }
    }

    public void accept(final MessageData message) {
        this.scheduledExecutorService.schedule(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                String consumeLog;
                ConsumerThread.this.lastExecuteTime = System.currentTimeMillis();
                if (ConsumerThread.this.scheduledExecutorService.isShutdown()) {
                    String consumeLog2 = "<br>consumer-thread terminated, the message not consumed and message-status change to\uff1a0";
                    ConsumerThread.this.xxlMqBootstrap.getMessageThread().consumeCallback(new MessageData(message.getId(), message.getTopic(), 0, consumeLog2, null));
                    return;
                }
                XxlMqContext.setContext(new XxlMqContext(message.getId(), message.getData()));
                try {
                    int executeTimeout;
                    int n = executeTimeout = message.getExecutionTimeout() != null ? message.getExecutionTimeout() : 0;
                    if (executeTimeout > 0) {
                        Thread futureThread = null;
                        try {
                            FutureTask<Boolean> futureTask = new FutureTask<Boolean>(() -> {
                                XxlMqContext.setContext(new XxlMqContext(message.getId(), message.getData()));
                                ConsumerThread.this.consumer.consume();
                                return true;
                            });
                            futureThread = new Thread(futureTask);
                            futureThread.start();
                            Boolean bl = futureTask.get(executeTimeout, TimeUnit.SECONDS);
                        }
                        catch (TimeoutException e) {
                            XxlMqHelper.consumeTimeout("consume fail, execute timeout.");
                        }
                        finally {
                            futureThread.interrupt();
                        }
                    } else {
                        ConsumerThread.this.consumer.consume();
                    }
                    consumeLog = XxlMqContext.getContext().getConsumeLog();
                    consumeLog = consumeLog != null && consumeLog.length() > 500 ? consumeLog.substring(0, 500) + "..." : consumeLog;
                }
                catch (Exception e) {
                    String consumeLog3;
                    try {
                        logger.error(">>>>>>>>>>> ConsumerThread consume error, message:{}", (Object)message, (Object)e);
                        XxlMqHelper.consumeFail("consume error: " + e.getMessage());
                        consumeLog3 = XxlMqContext.getContext().getConsumeLog();
                        consumeLog3 = consumeLog3 != null && consumeLog3.length() > 500 ? consumeLog3.substring(0, 500) + "..." : consumeLog3;
                    }
                    catch (Throwable throwable) {
                        String consumeLog4 = XxlMqContext.getContext().getConsumeLog();
                        consumeLog4 = consumeLog4 != null && consumeLog4.length() > 500 ? consumeLog4.substring(0, 500) + "..." : consumeLog4;
                        MessageStatusEnum messageStatus = MessageStatusEnum.match(XxlMqContext.getContext().getStatus(), null);
                        consumeLog4 = consumeLog4 + "<br>Other : IP = " + IPTool.getIp() + ", instanceUuid = " + ConsumerThread.this.xxlMqBootstrap.getInstanceUuid() + " , message-status change to = " + (messageStatus != null ? messageStatus.name() : "" + XxlMqContext.getContext().getStatus());
                        ConsumerThread.this.xxlMqBootstrap.getMessageThread().consumeCallback(new MessageData(message.getId(), message.getTopic(), XxlMqContext.getContext().getStatus(), consumeLog4, ConsumerThread.this.xxlMqBootstrap.getInstanceUuid()));
                        throw throwable;
                    }
                    MessageStatusEnum messageStatus = MessageStatusEnum.match(XxlMqContext.getContext().getStatus(), null);
                    consumeLog3 = consumeLog3 + "<br>Other : IP = " + IPTool.getIp() + ", instanceUuid = " + ConsumerThread.this.xxlMqBootstrap.getInstanceUuid() + " , message-status change to = " + (messageStatus != null ? messageStatus.name() : "" + XxlMqContext.getContext().getStatus());
                    ConsumerThread.this.xxlMqBootstrap.getMessageThread().consumeCallback(new MessageData(message.getId(), message.getTopic(), XxlMqContext.getContext().getStatus(), consumeLog3, ConsumerThread.this.xxlMqBootstrap.getInstanceUuid()));
                }
                MessageStatusEnum messageStatus = MessageStatusEnum.match(XxlMqContext.getContext().getStatus(), null);
                consumeLog = consumeLog + "<br>Other : IP = " + IPTool.getIp() + ", instanceUuid = " + ConsumerThread.this.xxlMqBootstrap.getInstanceUuid() + " , message-status change to = " + (messageStatus != null ? messageStatus.name() : "" + XxlMqContext.getContext().getStatus());
                ConsumerThread.this.xxlMqBootstrap.getMessageThread().consumeCallback(new MessageData(message.getId(), message.getTopic(), XxlMqContext.getContext().getStatus(), consumeLog, ConsumerThread.this.xxlMqBootstrap.getInstanceUuid()));
            }
        }, message.getEffectTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    public boolean isBusy() {
        return this.scheduledExecutorService.getQueue().size() > 10;
    }

    public boolean isIdle() {
        if (this.isBusy()) {
            return false;
        }
        return System.currentTimeMillis() - this.lastExecuteTime > 180000L;
    }
}

