/*
 * Decompiled with CFR 0.152.
 */
package com.xxl.mq.core.thread;

import com.xxl.mq.core.bootstrap.XxlMqBootstrap;
import com.xxl.mq.core.openapi.model.MessageData;
import com.xxl.mq.core.openapi.model.PullRequest;
import com.xxl.mq.core.thread.ConsumerThread;
import com.xxl.tool.concurrent.CyclicThread;
import com.xxl.tool.core.CollectionTool;
import com.xxl.tool.response.Response;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PullThread {
    private static final Logger logger = LoggerFactory.getLogger(PullThread.class);
    private final XxlMqBootstrap xxlMqBootstrap;

    public PullThread(XxlMqBootstrap xxlMqBootstrap) {
        this.xxlMqBootstrap = xxlMqBootstrap;
    }

    public void start() {
        int pullBatchsize = this.xxlMqBootstrap.getPullBatchsize();
        int pullInterval = this.xxlMqBootstrap.getPullInterval();
        if (pullBatchsize < 20 || pullBatchsize > 500) {
            pullBatchsize = 100;
        }
        if (pullInterval < 1000 || pullInterval > 30000) {
            pullInterval = 1000;
        }
        final int finalPullBatchsize = pullBatchsize;
        CyclicThread pullThread = new CyclicThread("pullThread", new Runnable(){

            @Override
            public void run() {
                PullThread.this.xxlMqBootstrap.stopIdleConsumerThead();
                List<String> freeConsumerTopicList = PullThread.this.xxlMqBootstrap.getFreeConsumerTopicList();
                if (CollectionTool.isEmpty(freeConsumerTopicList)) {
                    return;
                }
                PullRequest pullRequest = new PullRequest();
                pullRequest.setAccesstoken(PullThread.this.xxlMqBootstrap.getAccesstoken());
                pullRequest.setAppname(PullThread.this.xxlMqBootstrap.getAppname());
                pullRequest.setInstanceUuid(PullThread.this.xxlMqBootstrap.getInstanceUuid());
                pullRequest.setTopicList(freeConsumerTopicList);
                pullRequest.setBatchsize(finalPullBatchsize);
                Response<String> pullPreCheckResponse = PullThread.this.xxlMqBootstrap.loadBrokerClient().pullPreCheck(pullRequest);
                if (!pullPreCheckResponse.isSuccess()) {
                    logger.debug(">>>>>>>>>>> xxl-mq PullThread pullPreCheck fail, pullRequest:{}, pullPreCheckResponse:{}", (Object)pullRequest, pullPreCheckResponse);
                    try {
                        TimeUnit.SECONDS.sleep(10L);
                    }
                    catch (InterruptedException e) {
                        logger.error(">>>>>>>>>>> xxl-mq PullThread pullPreCheck fail and sleep interrupted.", (Throwable)e);
                    }
                    return;
                }
                Response<List<MessageData>> pullResponse = PullThread.this.xxlMqBootstrap.loadBrokerClient().pullAndLock(pullRequest);
                if (!pullResponse.isSuccess()) {
                    if (pullResponse.getCode() == 402) {
                        logger.debug(">>>>>>>>>>> xxl-mq PullThread pullAndLock fail, pullRequest:{}, pullResponse:{}", (Object)pullRequest, pullResponse);
                    } else {
                        logger.error(">>>>>>>>>>> xxl-mq PullThread pullAndLock fail, pullRequest:{}, pullResponse:{}", (Object)pullRequest, pullResponse);
                    }
                    return;
                }
                PullThread.this.dispatchConsumer((List)pullResponse.getData());
            }
        }, (long)pullInterval, true);
        pullThread.start();
    }

    public void stop() {
    }

    private void dispatchConsumer(List<MessageData> messageDataList) {
        if (CollectionTool.isEmpty(messageDataList)) {
            return;
        }
        for (MessageData messageData : messageDataList) {
            try {
                ConsumerThread consumerThread = this.xxlMqBootstrap.lazyInitConsumerThread(messageData.getTopic());
                consumerThread.accept(messageData);
            }
            catch (Exception e) {
                logger.error(">>>>>>>>>>> xxl-mq PullThread message-accept error, messageData:{}", (Object)messageData, (Object)e);
            }
        }
    }
}

