package com.tongtech.client.consumer.service;

import com.tongtech.client.consumer.ConsumeMessageService;
import com.tongtech.client.consumer.PullResult;
import com.tongtech.client.consumer.PullStatus;
import com.tongtech.client.consumer.TLQConsumerPushInner;
import com.tongtech.client.consumer.common.ConsumeModel;
import com.tongtech.client.consumer.common.ProcessQueue;
import com.tongtech.client.consumer.listener.ConsumeConcurrentlyContext;
import com.tongtech.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.tongtech.client.consumer.listener.MessageListenerConcurrently;
import com.tongtech.client.exception.HTPException;
import com.tongtech.client.factory.TLQClientInstance;
import com.tongtech.client.factory.ThreadFactoryImpl;
import com.tongtech.client.message.MessageExt;
import com.tongtech.client.producer.TopicBrokerInfo;
import com.tongtech.client.remoting.common.IpUtils;
import com.tongtech.client.remoting.common.RemotingHelper;
import com.tongtech.client.trace.TraceDispatcherType;
import com.tongtech.client.trace.hook.ConsumeMessageContext;
import com.tongtech.client.utils.ThreadUtils;
import com.tongtech.slf4j.Logger;
import com.tongtech.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/* loaded from: input_file:com/tongtech/client/consumer/service/ConsumeMessageConcurrentlyService.class */
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
    private static Logger log = LoggerFactory.getLogger((Class<?>) ConsumeMessageConcurrentlyService.class);
    private final TLQConsumerPushInner consumerInner;
    private final MessageListenerConcurrently messageListener;
    private final BlockingQueue<Runnable> consumeRequestQueue = new LinkedBlockingQueue();
    private final ThreadPoolExecutor consumeExecutor;
    private final String consumerGroup;
    private final ScheduledExecutorService scheduledExecutorService;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/tongtech/client/consumer/service/ConsumeMessageConcurrentlyService$ConsumeRequest.class */
    public class ConsumeRequest implements Runnable {
        private final List<MessageExt> msgs;
        private final ProcessQueue processQueue;
        private final TopicBrokerInfo messageQueue;
        private final PullResult pullResult;
        private final TLQClientInstance mQClientFactory;

        public ConsumeRequest(List<MessageExt> list, ProcessQueue processQueue, TopicBrokerInfo topicBrokerInfo, PullResult pullResult, TLQClientInstance tLQClientInstance) {
            this.msgs = list;
            this.processQueue = processQueue;
            this.messageQueue = topicBrokerInfo;
            this.pullResult = pullResult;
            this.mQClientFactory = tLQClientInstance;
        }

        public List<MessageExt> getMsgs() {
            return this.msgs;
        }

        public ProcessQueue getProcessQueue() {
            return this.processQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.processQueue.isDropped()) {
                ConsumeMessageConcurrentlyService.log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
                return;
            }
            MessageListenerConcurrently messageListenerConcurrently = ConsumeMessageConcurrentlyService.this.messageListener;
            ConsumeConcurrentlyContext consumeConcurrentlyContext = new ConsumeConcurrentlyContext(this.messageQueue);
            ConsumeConcurrentlyStatus consumeConcurrentlyStatus = null;
            ConsumeMessageContext consumeMessageContext = null;
            if (ConsumeMessageConcurrentlyService.this.consumerInner.hasHook()) {
                consumeMessageContext = new ConsumeMessageContext();
                consumeMessageContext.setClusterName(ConsumeMessageConcurrentlyService.this.consumerInner.getClusterName());
                consumeMessageContext.setNamespace(ConsumeMessageConcurrentlyService.this.consumerInner.getDomain());
                consumeMessageContext.setConsumerGroup(ConsumeMessageConcurrentlyService.this.consumerGroup);
                consumeMessageContext.setBrokerName(this.messageQueue.getBrokerName());
                consumeMessageContext.setBrokerAddr(IpUtils.getAddr(this.messageQueue));
                consumeMessageContext.setClientId(this.mQClientFactory.getClientId());
                consumeMessageContext.setMsgList(this.msgs);
                consumeMessageContext.setConsumeModel(ConsumeModel.getConsumeModel(ConsumeMessageConcurrentlyService.this.consumerInner.getMessageModel(), ConsumeMessageConcurrentlyService.this.consumerInner.getAllocateStrategy()));
                consumeMessageContext.setPullType(ConsumeMessageConcurrentlyService.this.consumerInner.getPullType());
                consumeMessageContext.setSuccess(PullStatus.FOUND.equals(this.pullResult.getPullStatus()));
                consumeMessageContext.setStatus(this.pullResult.getPullStatus().name());
                consumeMessageContext.setDispatcherType(TraceDispatcherType.PUSH_CONCURRENTLY);
                consumeMessageContext.setRequestId(this.pullResult.getRequestId());
                ConsumeMessageConcurrentlyService.this.consumerInner.executeHookBefore(consumeMessageContext);
            }
            try {
                PullResult pullResult = new PullResult(this.pullResult.getPullStatus());
                ConsumeMessageConcurrentlyService.this.setPullResultInfo(pullResult, this.pullResult);
                pullResult.setMsgFoundList(Collections.unmodifiableList(this.msgs));
                consumeConcurrentlyStatus = messageListenerConcurrently.consumeMessage(pullResult, consumeConcurrentlyContext);
            } catch (Throwable th) {
                ConsumeMessageConcurrentlyService.log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(th), ConsumeMessageConcurrentlyService.this.consumerGroup, this.msgs, this.messageQueue);
            }
            if (null == consumeConcurrentlyStatus) {
                ConsumeMessageConcurrentlyService.log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.msgs, this.messageQueue);
                consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            if (consumeMessageContext != null && ConsumeMessageConcurrentlyService.this.consumerInner.hasHook()) {
                consumeMessageContext.setStatus(consumeConcurrentlyStatus.name());
                consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == consumeConcurrentlyStatus);
                ConsumeMessageConcurrentlyService.this.consumerInner.executeHookAfter(consumeMessageContext);
            }
            ConsumeMessageConcurrentlyService.this.processConsumeResult(consumeConcurrentlyStatus, consumeConcurrentlyContext, this, this.pullResult, this.mQClientFactory);
        }

        public TopicBrokerInfo getMessageQueue() {
            return this.messageQueue;
        }
    }

    public ConsumeMessageConcurrentlyService(TLQConsumerPushInner tLQConsumerPushInner, MessageListenerConcurrently messageListenerConcurrently) {
        this.consumerInner = tLQConsumerPushInner;
        this.messageListener = messageListenerConcurrently;
        this.consumerGroup = tLQConsumerPushInner.getConsumerGroupName();
        this.consumeExecutor = new ThreadPoolExecutor(this.consumerInner.getConsumeThreadMin(), this.consumerInner.getConsumeThreadMax(), 60000L, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_" + ((this.consumerGroup.length() > 100 ? this.consumerGroup.substring(0, 100) : this.consumerGroup) + "_")));
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
    }

    @Override // com.tongtech.client.consumer.ConsumeMessageService
    public void shutdown(long j) {
        this.scheduledExecutorService.shutdown();
        ThreadUtils.shutdownGracefully(this.consumeExecutor, j, TimeUnit.MILLISECONDS);
    }

    @Override // com.tongtech.client.consumer.ConsumeMessageService
    public void submitConsumeRequest(List<MessageExt> list, ProcessQueue processQueue, TopicBrokerInfo topicBrokerInfo, boolean z, PullResult pullResult, TLQClientInstance tLQClientInstance) {
        int consumeMessageBatchMaxSize = this.consumerInner.getConsumeMessageBatchMaxSize();
        if (list.size() <= consumeMessageBatchMaxSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(list, processQueue, topicBrokerInfo, pullResult, tLQClientInstance);
            try {
                this.consumeExecutor.submit(consumeRequest);
                return;
            } catch (RejectedExecutionException e) {
                submitConsumeRequestLater(consumeRequest);
                return;
            }
        }
        int i = 0;
        while (i < list.size()) {
            ArrayList arrayList = new ArrayList(consumeMessageBatchMaxSize);
            int i2 = 0;
            while (i2 < consumeMessageBatchMaxSize && i < list.size()) {
                arrayList.add(list.get(i));
                i2++;
                i++;
            }
            ConsumeRequest consumeRequest2 = new ConsumeRequest(arrayList, processQueue, topicBrokerInfo, pullResult, tLQClientInstance);
            try {
                this.consumeExecutor.submit(consumeRequest2);
            } catch (RejectedExecutionException e2) {
                while (i < list.size()) {
                    arrayList.add(list.get(i));
                    i++;
                }
                submitConsumeRequestLater(consumeRequest2);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v48, types: [java.util.List] */
    public void processConsumeResult(ConsumeConcurrentlyStatus consumeConcurrentlyStatus, ConsumeConcurrentlyContext consumeConcurrentlyContext, ConsumeRequest consumeRequest, PullResult pullResult, TLQClientInstance tLQClientInstance) {
        boolean z = true;
        ArrayList<MessageExt> arrayList = new ArrayList();
        switch (consumeConcurrentlyStatus) {
            case CONSUME_SUCCESS:
                if (!this.consumerInner.isAutoAck()) {
                    try {
                        tLQClientInstance.acknowledge(consumeRequest.getMsgs());
                        break;
                    } catch (HTPException e) {
                        e.printStackTrace();
                        break;
                    }
                }
                break;
            case RECONSUME_LATER:
                arrayList = (List) consumeRequest.getMsgs().stream().filter(messageExt -> {
                    return tLQClientInstance.findLocalOffsetStore(messageExt.getMsgidOffset());
                }).collect(Collectors.toList());
                if (arrayList.size() > 0) {
                    z = false;
                    break;
                }
                break;
        }
        if (!z) {
            switch (this.consumerInner.getMessageModel()) {
                case BROADCASTING:
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        log.warn("BROADCASTING, the message consume failed, drop it, {}", ((MessageExt) it.next()).toString());
                    }
                    break;
                case CLUSTERING:
                    ArrayList arrayList2 = new ArrayList(arrayList.size());
                    try {
                        for (MessageExt messageExt2 : arrayList) {
                            if (!consumeRequest.getProcessQueue().containsMessage(messageExt2)) {
                                log.info("Message is not found in its process queue; skip send-back-procedure, topic={}, brokerName={}", messageExt2.getTopic(), messageExt2.getBrokerName());
                            } else if (sendMessageBackBroker(messageExt2, consumeConcurrentlyContext)) {
                                tLQClientInstance.acknowledge(messageExt2);
                            } else {
                                arrayList2.add(messageExt2);
                            }
                        }
                    } catch (HTPException e2) {
                        e2.printStackTrace();
                    }
                    if (!arrayList2.isEmpty()) {
                        consumeRequest.getMsgs().removeAll(arrayList2);
                        submitConsumeRequestLater(arrayList2, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), true, pullResult, tLQClientInstance);
                        break;
                    }
                    break;
            }
        }
        consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    }

    private boolean sendMessageBackBroker(MessageExt messageExt, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        messageExt.setReconsumeTimes(messageExt.getReconsumeTimes() == 0 ? 1 : messageExt.getReconsumeTimes());
        int nextDelayLevel = consumeConcurrentlyContext.getNextDelayLevel();
        try {
            log.debug("sendMessageBackBroker,delayLeve:{},msg:{}", Integer.valueOf(nextDelayLevel), messageExt.toString());
            this.consumerInner.sendMessageBack(messageExt, nextDelayLevel, consumeConcurrentlyContext.getMessageQueue());
            return true;
        } catch (Exception e) {
            log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + messageExt, (Throwable) e);
            return false;
        }
    }

    private void submitConsumeRequestLater(final List<MessageExt> list, final ProcessQueue processQueue, final TopicBrokerInfo topicBrokerInfo, final boolean z, final PullResult pullResult, final TLQClientInstance tLQClientInstance) {
        this.scheduledExecutorService.schedule(new Runnable() { // from class: com.tongtech.client.consumer.service.ConsumeMessageConcurrentlyService.1
            @Override // java.lang.Runnable
            public void run() {
                ConsumeMessageConcurrentlyService.this.submitConsumeRequest(list, processQueue, topicBrokerInfo, z, pullResult, tLQClientInstance);
            }
        }, 5000L, TimeUnit.MILLISECONDS);
    }

    private void submitConsumeRequestLater(final ConsumeRequest consumeRequest) {
        this.scheduledExecutorService.schedule(new Runnable() { // from class: com.tongtech.client.consumer.service.ConsumeMessageConcurrentlyService.2
            @Override // java.lang.Runnable
            public void run() {
                ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
            }
        }, 5000L, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setPullResultInfo(PullResult pullResult, PullResult pullResult2) {
        pullResult.setConsumerId(pullResult2.getConsumerId());
        pullResult.setGroupName(pullResult2.getGroupName());
        pullResult.setClientId(pullResult2.getClientId());
        pullResult.setTopic(pullResult2.getTopic());
        pullResult.setDomain(pullResult2.getDomain());
        pullResult.setMinConsumeQueueOffset(pullResult2.getMinConsumeQueueOffset());
        pullResult.setMaxConsumeQueueOffset(pullResult2.getMaxConsumeQueueOffset());
        pullResult.setMaxConsumeOffset(pullResult2.getMaxConsumeOffset());
        pullResult.setQueueId(pullResult2.getQueueId());
        pullResult.setPullStatus(pullResult2.getPullStatus());
        pullResult.setStatueCode(pullResult2.getStatueCode());
        pullResult.setConsumeHistoryOffset(pullResult2.getConsumeHistoryOffset());
        pullResult.setMessageQueue(pullResult2.getMessageQueue());
    }
}
