package com.tongtech.client.consumer.service;

import com.tongtech.client.consumer.ConsumeMessageService;
import com.tongtech.client.consumer.MessageQueueLock;
import com.tongtech.client.consumer.PullResult;
import com.tongtech.client.consumer.PullStatus;
import com.tongtech.client.consumer.TLQConsumerPushInner;
import com.tongtech.client.consumer.common.ConsumeModel;
import com.tongtech.client.consumer.common.ProcessQueue;
import com.tongtech.client.consumer.listener.ConsumeOrderlyContext;
import com.tongtech.client.consumer.listener.ConsumeOrderlyStatus;
import com.tongtech.client.consumer.listener.MessageListenerOrderly;
import com.tongtech.client.exception.HTPException;
import com.tongtech.client.factory.TLQClientInstance;
import com.tongtech.client.factory.ThreadFactoryImpl;
import com.tongtech.client.message.MessageExt;
import com.tongtech.client.producer.TopicBrokerInfo;
import com.tongtech.client.remoting.common.IpUtils;
import com.tongtech.client.remoting.common.RemotingHelper;
import com.tongtech.client.trace.TraceDispatcherType;
import com.tongtech.client.trace.hook.ConsumeMessageContext;
import com.tongtech.client.utils.ThreadUtils;
import com.tongtech.slf4j.Logger;
import com.tongtech.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/* loaded from: input_file:com/tongtech/client/consumer/service/ConsumeMessageOrderlyService.class */
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
    private static Logger log = LoggerFactory.getLogger((Class<?>) ConsumeMessageOrderlyService.class);
    private final TLQConsumerPushInner consumerInner;
    private final MessageListenerOrderly messageListener;
    private final ThreadPoolExecutor consumeExecutor;
    private final String consumerGroup;
    private final ScheduledExecutorService scheduledExecutorService;
    private final MessageQueueLock messageQueueLock = new MessageQueueLock();
    private volatile boolean stopped = false;
    private final BlockingQueue<Runnable> consumeRequestQueue = new LinkedBlockingQueue();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/tongtech/client/consumer/service/ConsumeMessageOrderlyService$ConsumeRequest.class */
    public class ConsumeRequest implements Runnable {
        private final List<MessageExt> msgs;
        private final ProcessQueue processQueue;
        private final TopicBrokerInfo messageQueue;
        private final PullResult pullResult;
        private final TLQClientInstance mQClientFactory;

        public ConsumeRequest(List<MessageExt> list, ProcessQueue processQueue, TopicBrokerInfo topicBrokerInfo, PullResult pullResult, TLQClientInstance tLQClientInstance) {
            this.msgs = list;
            this.processQueue = processQueue;
            this.messageQueue = topicBrokerInfo;
            this.pullResult = pullResult;
            this.mQClientFactory = tLQClientInstance;
        }

        public List<MessageExt> getMsgs() {
            return this.msgs;
        }

        public ProcessQueue getProcessQueue() {
            return this.processQueue;
        }

        public TopicBrokerInfo getMessageQueue() {
            return this.messageQueue;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            if (this.processQueue.isDropped()) {
                ConsumeMessageOrderlyService.log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }
            synchronized (ConsumeMessageOrderlyService.this.messageQueueLock.fetchLockObject(this.messageQueue)) {
                System.currentTimeMillis();
                boolean z = true;
                while (true) {
                    if (!z) {
                        break;
                    }
                    if (this.processQueue.isDropped()) {
                        ConsumeMessageOrderlyService.log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        break;
                    }
                    List<MessageExt> takeMessags = this.processQueue.takeMessags(this.msgs.size());
                    if (takeMessags.isEmpty()) {
                        z = false;
                    } else {
                        ConsumeOrderlyContext consumeOrderlyContext = new ConsumeOrderlyContext(this.messageQueue);
                        ConsumeOrderlyStatus consumeOrderlyStatus = null;
                        ConsumeMessageContext consumeMessageContext = null;
                        if (ConsumeMessageOrderlyService.this.consumerInner.hasHook()) {
                            consumeMessageContext = new ConsumeMessageContext();
                            consumeMessageContext.setClusterName(ConsumeMessageOrderlyService.this.consumerInner.getClusterName());
                            consumeMessageContext.setNamespace(ConsumeMessageOrderlyService.this.consumerInner.getDomain());
                            consumeMessageContext.setConsumerGroup(ConsumeMessageOrderlyService.this.consumerGroup);
                            consumeMessageContext.setBrokerName(this.messageQueue.getBrokerName());
                            consumeMessageContext.setBrokerAddr(IpUtils.getAddr(this.messageQueue));
                            consumeMessageContext.setClientId(this.mQClientFactory.getClientId());
                            consumeMessageContext.setMsgList(takeMessags);
                            consumeMessageContext.setConsumeModel(ConsumeModel.getConsumeModel(ConsumeMessageOrderlyService.this.consumerInner.getMessageModel(), ConsumeMessageOrderlyService.this.consumerInner.getAllocateStrategy()));
                            consumeMessageContext.setPullType(ConsumeMessageOrderlyService.this.consumerInner.getPullType());
                            consumeMessageContext.setSuccess(PullStatus.FOUND.equals(this.pullResult.getPullStatus()));
                            consumeMessageContext.setStatus(this.pullResult.getPullStatus().name());
                            consumeMessageContext.setDispatcherType(TraceDispatcherType.PUSH_ORDERLY);
                            consumeMessageContext.setRequestId(this.pullResult.getRequestId());
                            ConsumeMessageOrderlyService.this.consumerInner.executeHookBefore(consumeMessageContext);
                        }
                        System.currentTimeMillis();
                        try {
                            try {
                                this.processQueue.getLockConsume().lock();
                            } catch (Throwable th) {
                                ConsumeMessageOrderlyService.log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(th), ConsumeMessageOrderlyService.this.consumerGroup, takeMessags, this.messageQueue);
                                this.processQueue.getLockConsume().unlock();
                            }
                            if (this.processQueue.isDropped()) {
                                ConsumeMessageOrderlyService.log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                                this.processQueue.getLockConsume().unlock();
                                break;
                            }
                            consumeOrderlyStatus = ConsumeMessageOrderlyService.this.messageListener.consumeMessage(ConsumeMessageOrderlyService.this.buildNewPullResult(this.pullResult, takeMessags), consumeOrderlyContext);
                            this.processQueue.getLockConsume().unlock();
                            if (null == consumeOrderlyStatus) {
                                ConsumeMessageOrderlyService.log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", ConsumeMessageOrderlyService.this.consumerGroup, takeMessags, this.messageQueue);
                                consumeOrderlyStatus = ConsumeOrderlyStatus.RECONSUME_LATER;
                            }
                            if (consumeMessageContext != null && ConsumeMessageOrderlyService.this.consumerInner.hasHook()) {
                                consumeMessageContext.setSuccess(ConsumeOrderlyStatus.CONSUME_SUCCESS == consumeOrderlyStatus);
                                consumeMessageContext.setStatus(consumeOrderlyStatus.name());
                                ConsumeMessageOrderlyService.this.consumerInner.executeHookAfter(consumeMessageContext);
                            }
                            z = ConsumeMessageOrderlyService.this.processConsumeResult(takeMessags, consumeOrderlyStatus, consumeOrderlyContext, this.messageQueue, this, this.pullResult, this.mQClientFactory);
                        } catch (Throwable th2) {
                            this.processQueue.getLockConsume().unlock();
                            throw th2;
                        }
                    }
                }
            }
        }
    }

    public ConsumeMessageOrderlyService(TLQConsumerPushInner tLQConsumerPushInner, MessageListenerOrderly messageListenerOrderly) {
        this.consumerInner = tLQConsumerPushInner;
        this.messageListener = messageListenerOrderly;
        this.consumerGroup = tLQConsumerPushInner.getConsumerGroupName();
        this.consumeExecutor = new ThreadPoolExecutor(this.consumerInner.getConsumeThreadMin(), this.consumerInner.getConsumeThreadMax(), 60000L, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_" + ((this.consumerGroup.length() > 100 ? this.consumerGroup.substring(0, 100) : this.consumerGroup) + "_")));
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
    }

    @Override // com.tongtech.client.consumer.ConsumeMessageService
    public void shutdown(long j) {
        this.stopped = true;
        this.scheduledExecutorService.shutdown();
        ThreadUtils.shutdownGracefully(this.consumeExecutor, j, TimeUnit.MILLISECONDS);
    }

    @Override // com.tongtech.client.consumer.ConsumeMessageService
    public void submitConsumeRequest(List<MessageExt> list, ProcessQueue processQueue, TopicBrokerInfo topicBrokerInfo, boolean z, PullResult pullResult, TLQClientInstance tLQClientInstance) {
        if (z) {
            this.consumeExecutor.submit(new ConsumeRequest(list, processQueue, topicBrokerInfo, pullResult, tLQClientInstance));
        }
    }

    private void submitConsumeRequestLater(final List<MessageExt> list, final ProcessQueue processQueue, final TopicBrokerInfo topicBrokerInfo, long j, final PullResult pullResult, final TLQClientInstance tLQClientInstance) {
        long j2 = j;
        if (j2 == -1) {
            j2 = this.consumerInner.getSuspendCurrentQueueTimeMillis();
        }
        if (j2 < 10) {
            j2 = 10;
        } else if (j2 > 30000) {
            j2 = 30000;
        }
        this.scheduledExecutorService.schedule(new Runnable() { // from class: com.tongtech.client.consumer.service.ConsumeMessageOrderlyService.1
            @Override // java.lang.Runnable
            public void run() {
                ConsumeMessageOrderlyService.this.submitConsumeRequest(list, processQueue, topicBrokerInfo, true, pullResult, tLQClientInstance);
            }
        }, j2, TimeUnit.MILLISECONDS);
    }

    public boolean processConsumeResult(List<MessageExt> list, ConsumeOrderlyStatus consumeOrderlyStatus, ConsumeOrderlyContext consumeOrderlyContext, TopicBrokerInfo topicBrokerInfo, ConsumeRequest consumeRequest, PullResult pullResult, TLQClientInstance tLQClientInstance) {
        boolean z = true;
        switch (consumeOrderlyStatus) {
            case CONSUME_SUCCESS:
                consumeRequest.getProcessQueue().commit();
                if (!this.consumerInner.isAutoAck()) {
                    try {
                        tLQClientInstance.acknowledge(consumeRequest.getMsgs());
                        break;
                    } catch (HTPException e) {
                        e.printStackTrace();
                        break;
                    }
                }
                break;
            case RECONSUME_LATER:
                List<MessageExt> list2 = (List) list.stream().filter(messageExt -> {
                    return tLQClientInstance.findLocalOffsetStore(messageExt.getMsgidOffset());
                }).collect(Collectors.toList());
                if (list2.size() > 0) {
                    if (!checkReconsumeTimes(list2)) {
                        try {
                            tLQClientInstance.acknowledge(consumeRequest.getMsgs());
                            break;
                        } catch (HTPException e2) {
                            e2.printStackTrace();
                            break;
                        }
                    } else {
                        consumeRequest.getProcessQueue().makeMessageToCosumeAgain(list2);
                        submitConsumeRequestLater(list2, consumeRequest.processQueue, consumeRequest.messageQueue, consumeOrderlyContext.getSuspendCurrentQueueMs(), pullResult, tLQClientInstance);
                        z = false;
                        break;
                    }
                }
                break;
        }
        return z;
    }

    private int getMaxReconsumeTimes() {
        if (this.consumerInner.getMaxReconsumeTimes() == -1) {
            return Integer.MAX_VALUE;
        }
        return this.consumerInner.getMaxReconsumeTimes();
    }

    private boolean checkReconsumeTimes(List<MessageExt> list) {
        boolean z = false;
        if (list != null && !list.isEmpty()) {
            for (MessageExt messageExt : list) {
                if (messageExt.getReconsumeTimes() >= getMaxReconsumeTimes()) {
                    messageExt.setReconsumeTimes(messageExt.getReconsumeTimes());
                    if (!sendMessageBack(messageExt)) {
                        z = true;
                        messageExt.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
                    }
                } else {
                    z = true;
                    messageExt.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
                }
            }
        }
        return z;
    }

    private boolean sendMessageBack(MessageExt messageExt) {
        try {
            messageExt.setReconsumeTimes(messageExt.getReconsumeTimes() == 0 ? 1 : messageExt.getReconsumeTimes() + 1);
            messageExt.setDelayTimeLevel(-1);
            this.consumerInner.sendMessageBack(messageExt);
            return true;
        } catch (Exception e) {
            log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + messageExt.toString(), (Throwable) e);
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PullResult buildNewPullResult(PullResult pullResult, List<MessageExt> list) {
        PullResult pullResult2 = new PullResult(pullResult.getPullStatus());
        pullResult2.setConsumeHistoryOffset(pullResult.getConsumeHistoryOffset());
        pullResult2.setStatueCode(pullResult.getStatueCode());
        pullResult2.setConsumerId(pullResult.getConsumerId());
        pullResult2.setQueueId(pullResult.getQueueId());
        pullResult2.setClientId(pullResult.getClientId());
        pullResult2.setDomain(pullResult.getDomain());
        pullResult2.setTopic(pullResult.getTopic());
        pullResult2.setGroupName(pullResult.getGroupName());
        pullResult2.setMinConsumeQueueOffset(pullResult.getMinConsumeQueueOffset());
        pullResult2.setMaxConsumeQueueOffset(pullResult.getMaxConsumeQueueOffset());
        pullResult2.setMaxConsumeOffset(pullResult.getMaxConsumeOffset());
        pullResult2.setMessageQueue(pullResult.getMessageQueue());
        pullResult2.setMsgFoundList(Collections.unmodifiableList(list));
        return pullResult2;
    }
}
