/*
 * Decompiled with CFR 0.152.
 */
package com.xxl.mq.core.bootstrap;

import com.xxl.mq.core.consumer.IConsumer;
import com.xxl.mq.core.consumer.annotation.XxlMq;
import com.xxl.mq.core.consumer.impl.MethodConsumer;
import com.xxl.mq.core.openapi.BrokerService;
import com.xxl.mq.core.thread.ConsumerThread;
import com.xxl.mq.core.thread.MessageThread;
import com.xxl.mq.core.thread.PullThread;
import com.xxl.mq.core.thread.RegistryThread;
import com.xxl.tool.core.StringTool;
import com.xxl.tool.exception.BizException;
import com.xxl.tool.jsonrpc.JsonRpcClient;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class XxlMqBootstrap {
    private static final Logger logger = LoggerFactory.getLogger(XxlMqBootstrap.class);
    private static XxlMqBootstrap xxlMqBootstrap;
    private String address;
    private String accesstoken;
    private String appname;
    private int timeout;
    private Boolean consumerEnabled;
    private int pullBatchsize;
    private int pullInterval;
    private String instanceUuid;
    private MessageThread messageThread = null;
    private RegistryThread registryThread = null;
    private PullThread pullThread = null;
    private final String service = "brokerService";
    private final List<BrokerService> clientList = new ArrayList<BrokerService>();
    private ConcurrentMap<String, IConsumer> consumerRepository = new ConcurrentHashMap<String, IConsumer>();
    private volatile Map<String, ConsumerThread> consumerThreadRepository = new ConcurrentHashMap<String, ConsumerThread>();

    public static XxlMqBootstrap getInstance() {
        if (xxlMqBootstrap == null) {
            throw new BizException("xxl-mq bootstrap not init.");
        }
        return xxlMqBootstrap;
    }

    public String getAddress() {
        return this.address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public String getAccesstoken() {
        return this.accesstoken;
    }

    public void setAccesstoken(String accesstoken) {
        this.accesstoken = accesstoken;
    }

    public String getAppname() {
        return this.appname;
    }

    public void setAppname(String appname) {
        this.appname = appname;
    }

    public Boolean getConsumerEnabled() {
        return this.consumerEnabled;
    }

    public void setConsumerEnabled(Boolean consumerEnabled) {
        this.consumerEnabled = consumerEnabled;
    }

    public int getTimeout() {
        return this.timeout;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public int getPullBatchsize() {
        return this.pullBatchsize;
    }

    public void setPullBatchsize(int pullBatchsize) {
        this.pullBatchsize = pullBatchsize;
    }

    public int getPullInterval() {
        return this.pullInterval;
    }

    public void setPullInterval(int pullInterval) {
        this.pullInterval = pullInterval;
    }

    public String getInstanceUuid() {
        return this.instanceUuid;
    }

    public MessageThread getMessageThread() {
        return this.messageThread;
    }

    public void start() {
        xxlMqBootstrap = this;
        this.instanceUuid = UUID.randomUUID().toString().replaceAll("-", "");
        this.buildBrokerClient();
        this.messageThread = new MessageThread(this);
        this.messageThread.start();
        if (this.consumerEnabled != null && !this.consumerEnabled.booleanValue()) {
            logger.info(">>>>>>>>>>> xxl-mq XxlMqBootstrap consumerEnabled = {}, consumers will not work.", (Object)this.consumerEnabled);
            return;
        }
        this.registryThread = new RegistryThread(this);
        this.registryThread.start();
        this.pullThread = new PullThread(this);
        this.pullThread.start();
        logger.info(">>>>>>>>>>> xxl-mq XxlMqBootstrap started, instanceUuid = " + this.instanceUuid);
    }

    public void stop() {
        if (this.registryThread != null) {
            this.registryThread.stop();
        }
        if (this.messageThread != null) {
            this.messageThread.stop();
        }
        if (this.pullThread != null) {
            this.pullThread.stop();
            for (String topic : this.consumerThreadRepository.keySet()) {
                this.stopConsumerThread(topic);
            }
        }
        logger.info(">>>>>>>>>>> xxl-mq XxlMqBootstrap stopped.");
    }

    private void buildBrokerClient() {
        if (StringTool.isBlank((String)this.address)) {
            throw new BizException("xxl-mq bootstrap address can not be empty.");
        }
        if (StringTool.isBlank((String)this.accesstoken)) {
            throw new BizException("xxl-mq bootstrap accesstoken can not be empty.");
        }
        if (StringTool.isBlank((String)this.appname)) {
            throw new BizException("xxl-mq bootstrap appname can not be empty.");
        }
        if (this.timeout < 500 || this.timeout > 3000) {
            this.timeout = 3000;
        }
        List addressList = Arrays.stream(this.address.split(",")).filter(StringTool::isNotBlank).collect(Collectors.toList());
        for (String url : addressList) {
            String finalUrl = url + "/openapi";
            JsonRpcClient jsonRpcClient = new JsonRpcClient(finalUrl, this.timeout);
            BrokerService brokerService = (BrokerService)jsonRpcClient.proxy("brokerService", BrokerService.class);
            this.clientList.add(brokerService);
        }
    }

    public BrokerService loadBrokerClient() {
        return this.clientList.get(ThreadLocalRandom.current().nextInt(this.clientList.size()));
    }

    public IConsumer loadConsumer(String topic) {
        return (IConsumer)this.consumerRepository.get(topic);
    }

    public List<String> getAllConsumerTopicList() {
        return new ArrayList<String>(this.consumerRepository.keySet());
    }

    public IConsumer registryConsumer(String topic, IConsumer consumer) {
        logger.info(">>>>>>>>>>> xxl-mq register consumer success, topic:{}, consumer:{}", (Object)topic, (Object)consumer);
        return this.consumerRepository.put(topic, consumer);
    }

    protected void registryMethodConsumer(XxlMq xxlMq, Object bean, Method executeMethod) {
        if (xxlMq == null) {
            return;
        }
        String topic = xxlMq.value();
        Class<?> clazz = bean.getClass();
        String methodName = executeMethod.getName();
        if (topic.trim().isEmpty()) {
            throw new RuntimeException("xxl-mq method-consumer name invalid, for[" + clazz + "#" + methodName + "] .");
        }
        if (this.loadConsumer(topic) != null) {
            throw new RuntimeException("xxl-mq consumer[" + topic + "] naming conflicts.");
        }
        executeMethod.setAccessible(true);
        Method initMethod = null;
        Method destroyMethod = null;
        if (!xxlMq.init().trim().isEmpty()) {
            try {
                initMethod = clazz.getDeclaredMethod(xxlMq.init(), new Class[0]);
                initMethod.setAccessible(true);
            }
            catch (NoSuchMethodException e) {
                throw new RuntimeException("xxl-mq method-consumer initMethod invalid, for[" + clazz + "#" + methodName + "] .");
            }
        }
        if (!xxlMq.destroy().trim().isEmpty()) {
            try {
                destroyMethod = clazz.getDeclaredMethod(xxlMq.destroy(), new Class[0]);
                destroyMethod.setAccessible(true);
            }
            catch (NoSuchMethodException e) {
                throw new RuntimeException("xxl-mq method-consumer destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
            }
        }
        this.registryConsumer(topic, new MethodConsumer(bean, executeMethod, initMethod, destroyMethod));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public ConsumerThread lazyInitConsumerThread(String topic) {
        ConsumerThread consumerThread = this.consumerThreadRepository.get(topic);
        if (consumerThread != null) return consumerThread;
        Class<XxlMqBootstrap> clazz = XxlMqBootstrap.class;
        synchronized (XxlMqBootstrap.class) {
            consumerThread = this.consumerThreadRepository.get(topic);
            if (consumerThread != null) return consumerThread;
            IConsumer consumer = this.loadConsumer(topic);
            if (consumer == null) {
                throw new BizException("xxl-mq lazyInitConsumerThread fail, IConsumer[topic=" + topic + "] not found.");
            }
            this.consumerThreadRepository.put(topic, new ConsumerThread(xxlMqBootstrap, consumer));
            return this.consumerThreadRepository.get(topic);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void stopConsumerThread(String topic) {
        ConsumerThread consumerThread = this.consumerThreadRepository.get(topic);
        if (consumerThread == null) return;
        Class<XxlMqBootstrap> clazz = XxlMqBootstrap.class;
        synchronized (XxlMqBootstrap.class) {
            consumerThread.stop();
            this.consumerThreadRepository.remove(topic);
            // ** MonitorExit[var3_3] (shouldn't be in output)
            return;
        }
    }

    public List<String> getFreeConsumerTopicList() {
        List<String> allTopicList = this.getAllConsumerTopicList();
        List busyTopicList = this.consumerThreadRepository.entrySet().stream().filter(entry -> ((ConsumerThread)entry.getValue()).isBusy()).map(Map.Entry::getKey).collect(Collectors.toList());
        List<String> freeTopicList = allTopicList.stream().filter(topic -> !busyTopicList.contains(topic)).collect(Collectors.toList());
        return freeTopicList;
    }

    public void stopIdleConsumerThead() {
        for (String topic : this.consumerThreadRepository.keySet()) {
            ConsumerThread consumerThread = this.consumerThreadRepository.get(topic);
            if (consumerThread == null || !consumerThread.isIdle()) continue;
            this.stopConsumerThread(topic);
        }
    }
}

