/*
 * Decompiled with CFR 0.152.
 */
package com.xxl.mq.core.thread;

import com.xxl.mq.core.bootstrap.XxlMqBootstrap;
import com.xxl.mq.core.openapi.model.ConsumeRequest;
import com.xxl.mq.core.openapi.model.MessageData;
import com.xxl.mq.core.openapi.model.ProduceRequest;
import com.xxl.tool.concurrent.MessageQueue;
import com.xxl.tool.core.StringTool;
import com.xxl.tool.exception.BizException;
import com.xxl.tool.response.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageThread {
    private static final Logger logger = LoggerFactory.getLogger(MessageThread.class);
    private final XxlMqBootstrap xxlMqBootstrap;
    private volatile MessageQueue<MessageData> produceMessageQueue;
    private volatile MessageQueue<MessageData> consumeMessageQueue;

    public MessageThread(XxlMqBootstrap xxlMqBootstrap) {
        this.xxlMqBootstrap = xxlMqBootstrap;
    }

    public void start() {
        this.produceMessageQueue = new MessageQueue("produceMessageQueue", messages -> {
            ProduceRequest produceRequest = new ProduceRequest();
            produceRequest.setAccesstoken(this.xxlMqBootstrap.getAccesstoken());
            produceRequest.setMessageList(messages);
            for (int i = 0; i < 3; ++i) {
                try {
                    Response<String> produceResponse = this.xxlMqBootstrap.loadBrokerClient().produce(produceRequest);
                    if (produceResponse.isSuccess()) {
                        logger.debug(">>>>>>>>>>> xxl-mq MessageThread-produceMessageQueue produce success, produceRequest:{}, produceResponse:{}", (Object)produceRequest, produceResponse);
                        break;
                    }
                    logger.error(">>>>>>>>>>> xxl-mq MessageThread-produceMessageQueue produce fail, produceRequest:{}, produceResponse:{}", (Object)produceRequest, produceResponse);
                    continue;
                }
                catch (Exception e) {
                    logger.error(">>>>>>>>>>> xxl-mq MessageThread-produceMessageQueue produce error, produceRequest:{}", (Object)produceRequest, (Object)e);
                }
            }
        }, 5, 50);
        this.consumeMessageQueue = new MessageQueue("consumeMessageQueue", messages -> {
            ConsumeRequest consumeRequest = new ConsumeRequest();
            consumeRequest.setAccesstoken(this.xxlMqBootstrap.getAccesstoken());
            consumeRequest.setMessageList(messages);
            for (int i = 0; i < 3; ++i) {
                try {
                    Response<String> consumeResponse = this.xxlMqBootstrap.loadBrokerClient().consume(consumeRequest);
                    if (consumeResponse.isSuccess()) {
                        logger.debug(">>>>>>>>>>> xxl-mq MessageThread-consumeMessageQueue consume success, consumeRequest:{}, consumeResponse:{}", (Object)consumeRequest, consumeResponse);
                        break;
                    }
                    logger.error(">>>>>>>>>>> xxl-mq MessageThread-consumeMessageQueue consume fail, consumeRequest:{}, consumeResponse:{}", (Object)consumeRequest, consumeResponse);
                    continue;
                }
                catch (Exception e) {
                    logger.error(">>>>>>>>>>> xxl-mq MessageThread-consumeMessageQueue consume error, consumeRequest:{}", (Object)consumeRequest, (Object)e);
                }
            }
        }, 5, 50);
    }

    public void stop() {
        if (this.produceMessageQueue != null) {
            this.produceMessageQueue.stop();
        }
        if (this.consumeMessageQueue != null) {
            this.consumeMessageQueue.stop();
        }
    }

    public boolean produceSend(MessageData messageData) {
        if (messageData == null || StringTool.isBlank((String)messageData.getTopic())) {
            logger.error(">>>>>>>>>>> xxl-mq MessageThread-produceSend fail, message topic is null", (Throwable)new BizException("message topic is null"));
            return false;
        }
        if (messageData.getTopic().length() > 100) {
            logger.error(">>>>>>>>>>> xxl-mq MessageThread-produceSend fail, message topic is too long\uff08>100): topic = {}", (Object)messageData.getTopic(), (Object)new BizException("message topic is too long"));
            return false;
        }
        if (messageData.getData() == null) {
            messageData.setData("");
        }
        if (messageData.getData().length() > 4000) {
            logger.error(">>>>>>>>>>> xxl-mq MessageThread-produceSend fail, message data is too long\uff08>4000): topic = {}", (Object)messageData.getTopic(), (Object)new BizException("message data is too long"));
            return false;
        }
        if (messageData.getEffectTime() < System.currentTimeMillis()) {
            messageData.setEffectTime(System.currentTimeMillis());
        }
        if (messageData.getBizId() <= 0L) {
            messageData.setBizId(0L);
        }
        return this.produceMessageQueue.produce((Object)messageData);
    }

    public boolean consumeCallback(MessageData messageData) {
        return this.consumeMessageQueue.produce((Object)messageData);
    }
}

