package com.tongtech.client.htp.consumer.impl;

import com.tongtech.client.common.BrokerSelector;
import com.tongtech.client.common.CheckType;
import com.tongtech.client.common.ClientErrorCode;
import com.tongtech.client.common.ClientRegisterType;
import com.tongtech.client.common.CommunicationMode;
import com.tongtech.client.common.FilterAPI;
import com.tongtech.client.common.MixAll;
import com.tongtech.client.common.ModeType;
import com.tongtech.client.common.NodeRollPolicy;
import com.tongtech.client.common.ServiceState;
import com.tongtech.client.config.ClientConfig;
import com.tongtech.client.consumer.AllocateMessageQueueStrategy;
import com.tongtech.client.consumer.PullCallback;
import com.tongtech.client.consumer.PullMsgCallback;
import com.tongtech.client.consumer.PullResult;
import com.tongtech.client.consumer.PullStatus;
import com.tongtech.client.consumer.TLQConsumerPullInner;
import com.tongtech.client.consumer.common.ConsumeModel;
import com.tongtech.client.consumer.common.ConsumerCommon;
import com.tongtech.client.consumer.common.ConsumerRelationInfo;
import com.tongtech.client.consumer.common.PullFromWhere;
import com.tongtech.client.consumer.common.PullType;
import com.tongtech.client.consumer.common.SubscribeType;
import com.tongtech.client.consumer.common.SubscriptionData;
import com.tongtech.client.consumer.rebalance.RebalanceImpl;
import com.tongtech.client.consumer.rebalance.RebalancePullImpl;
import com.tongtech.client.consumer.service.PullAPIWrapper;
import com.tongtech.client.consumer.store.LocalFileOffsetStore;
import com.tongtech.client.consumer.store.OffsetStore;
import com.tongtech.client.consumer.store.ReadOffsetType;
import com.tongtech.client.exception.HTPException;
import com.tongtech.client.exception.TLQBrokerException;
import com.tongtech.client.exception.TLQClientException;
import com.tongtech.client.factory.TLQClientInstance;
import com.tongtech.client.factory.TLQClientManager;
import com.tongtech.client.factory.TLQResetConsumerOffset;
import com.tongtech.client.htp.consumer.HTPPullConsumer;
import com.tongtech.client.message.Message;
import com.tongtech.client.message.MessageExt;
import com.tongtech.client.message.MessageOffset;
import com.tongtech.client.message.TopicMapping;
import com.tongtech.client.producer.TopicBrokerInfo;
import com.tongtech.client.remoting.common.IpUtils;
import com.tongtech.client.remoting.enums.ResponseCode;
import com.tongtech.client.remoting.exception.RemotingConnectException;
import com.tongtech.client.remoting.netty.ProtocolType;
import com.tongtech.client.trace.AsyncTraceDispatcher;
import com.tongtech.client.trace.TraceDispatcherType;
import com.tongtech.client.trace.hook.ConsumeMessageContext;
import com.tongtech.client.utils.OffsetUtils;
import com.tongtech.client.utils.TopicUtils;
import com.tongtech.client.utils.Validators;
import com.tongtech.commons.cli.HelpFormatter;
import com.tongtech.commons.collections.CollectionUtils;
import com.tongtech.slf4j.Logger;
import com.tongtech.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

/* loaded from: input_file:com/tongtech/client/htp/consumer/impl/HTPPullConsumerImpl.class */
public class HTPPullConsumerImpl implements TLQConsumerPullInner {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) HTPPullConsumerImpl.class);
    private final HTPPullConsumer htpPullConsumer;
    private volatile String subRule;
    protected TLQClientInstance mQClientFactory;
    protected PullAPIWrapper pullAPIWrapper;
    protected OffsetStore offsetStore;
    private TLQResetConsumerOffset tlqResetConsumerOffset;
    private static final long pollIntervalMillis = 100;
    private volatile List<String> tagFilter = new ArrayList(16);
    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
    private final RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
    private final int putGet = 1;
    private volatile boolean topicLevelRoundRobin = false;
    private AsyncTraceDispatcher traceDispatcher = null;
    private int recvBufSize = 4194304;

    public HTPPullConsumerImpl(HTPPullConsumer hTPPullConsumer) {
        this.htpPullConsumer = hTPPullConsumer;
    }

    public void start() throws HTPException {
        ClientConfig clientConfig = this.htpPullConsumer.getClientConfig();
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                if (!Validators.isEmpty(clientConfig.getNamesrvAddr())) {
                    log.info("the pull consumer [{}] start beginning.", this.htpPullConsumer.getConsumerId());
                    checkConfig();
                    this.mQClientFactory = TLQClientManager.getInstance().getOrCreateMQClientInstance(clientConfig, ClientRegisterType.PUBLISH_SUBSCRIBE);
                    log.info("clientid = {}", this.mQClientFactory.getClientId());
                    this.tlqResetConsumerOffset = new TLQResetConsumerOffset(this.mQClientFactory, clientConfig);
                    this.pullAPIWrapper = new PullAPIWrapper(this.mQClientFactory, this.htpPullConsumer.getConsumerGroup());
                    this.rebalanceImpl.setConsumerGroup(this.htpPullConsumer.getConsumerGroup());
                    this.rebalanceImpl.setMessageModel(this.htpPullConsumer.getConsumeModel());
                    this.rebalanceImpl.setAllocateMessageQueueStrategy(this.htpPullConsumer.getAllocateStrategy());
                    this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                    if (!this.mQClientFactory.registerConsumer(this.htpPullConsumer.getConsumerGroup(), this)) {
                        log.error("The pull consumer consumerGroupName [" + this.htpPullConsumer.getConsumerGroup() + "] has been created before, specify another name please.");
                        this.serviceState = ServiceState.CREATE_JUST;
                        throw new TLQClientException("The pull consumer consumerGroupName[" + this.htpPullConsumer.getConsumerGroup() + "] has been created before, specify another name please.", (Throwable) null);
                    }
                    this.mQClientFactory.start();
                    this.mQClientFactory.setConsumerRelationTable(this.htpPullConsumer.getConsumerId(), new ConsumerRelationInfo(getSubscriptionInner().keySet(), this.htpPullConsumer.getDomain(), this.mQClientFactory.getClientId(), this.htpPullConsumer.getConsumerGroup()));
                    log.info("the pull consumer [{}] start OK", this.htpPullConsumer.getConsumerId());
                    this.serviceState = ServiceState.RUNNING;
                    break;
                } else {
                    throw new TLQClientException("NamesrvAddr cannot be empty!", (Throwable) null);
                }
        }
        if (ProtocolType.TCP == this.htpPullConsumer.getProtocolType()) {
            if (!clientConfig.isAllowCreateTopicDelayed() || ConsumeModel.BROADCASTING.equals(getMessageModel())) {
                TopicUtils.getTopicListByRegex(getSubscriptionInner(), this.subRule, clientConfig, this.mQClientFactory.getChooseNamesrv(), this.mQClientFactory.getMQClientAPIImpl());
                checkSub();
                ConcurrentMap<String, Set<TopicBrokerInfo>> updateTopicSubscribeInfoWhenSubscriptionChanged = updateTopicSubscribeInfoWhenSubscriptionChanged();
                initOffsetStore(updateTopicSubscribeInfoWhenSubscriptionChanged, clientConfig);
                if (updateTopicSubscribeInfoWhenSubscriptionChanged.isEmpty()) {
                    throw new TLQClientException("No route info of this topic" + getSubscriptionInner().keySet(), (Throwable) null);
                }
            }
            this.mQClientFactory.startSendHeartbeat();
            this.mQClientFactory.startOffsetTask();
            this.mQClientFactory.rebalanceStart();
            this.mQClientFactory.rebalanceImmediately();
            this.mQClientFactory.startWorkScheduledTask();
            checkAllocateMq();
            this.mQClientFactory.startListenChangeTask(this.htpPullConsumer.getConsumerGroup());
            this.mQClientFactory.updateSubscriptionScheduledTask(this.subRule, getSubscriptionInner());
            this.mQClientFactory.registerPullRequestProcessor();
        }
        this.traceDispatcher = enableTrace(this.mQClientFactory, this.htpPullConsumer.getConnectTimeoutMills());
    }

    public synchronized void shutdown(long j) {
        switch (this.serviceState) {
            case CREATE_JUST:
            case SHUTDOWN_ALREADY:
                break;
            case RUNNING:
                try {
                    persistConsumerOffset();
                    this.mQClientFactory.unregisterConsumer(this.htpPullConsumer.getConsumerGroup());
                    if (this.htpPullConsumer.getProtocolType() == ProtocolType.TCP) {
                        unRegisterConsumerAllBroker(this);
                    }
                    this.mQClientFactory.shutdown(j);
                    log.info("the consumer [{}] shutdown OK", this.htpPullConsumer.getConsumerGroup());
                    this.rebalanceImpl.destroy();
                    this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                    if (this.tlqResetConsumerOffset != null) {
                        this.tlqResetConsumerOffset.clean();
                    }
                    break;
                } catch (Throwable th) {
                    log.error("", th);
                    break;
                }
            default:
                this.mQClientFactory.unregisterConsumer(this.htpPullConsumer.getConsumerGroup());
                if (this.htpPullConsumer.getProtocolType() == ProtocolType.TCP) {
                    unRegisterConsumerAllBroker(this);
                }
                this.mQClientFactory.shutdown(j);
                log.info("the consumer [{}] shutdown OK", this.htpPullConsumer.getConsumerGroup());
                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                if (this.tlqResetConsumerOffset != null) {
                    this.tlqResetConsumerOffset.clean();
                    break;
                }
                break;
        }
        if (null != this.traceDispatcher) {
            this.traceDispatcher.shutdown();
        }
    }

    public void subscribe(String str) throws TLQClientException {
        if (Validators.checkTopicIsRegex(str)) {
            this.subRule = str;
            Validators.checkTopicRegex(str);
        } else {
            Validators.checkTopic(str);
            getSubscriptionInner().put(str, FilterAPI.buildSubscriptionData(str, false));
        }
    }

    public void addTagFilter(String str) throws TLQClientException {
        isNotRunning();
        Validators.checkHtpCons(CheckType.GROUP_FILTER_RULE, str);
        String[] split = str.split("\\|\\|");
        if (this.tagFilter.size() + split.length > 16) {
            log.error("The number of specified group filter rules or tags exceeds 16");
            throw new TLQClientException("The number of specified group filter rules or tags exceeds 16", (Throwable) null);
        }
        this.tagFilter.addAll(Arrays.asList(split));
    }

    public void unRegisterConsumerAllBroker(TLQConsumerPullInner tLQConsumerPullInner) {
        this.mQClientFactory.unRegisterConsumerAllBroker(tLQConsumerPullInner);
    }

    public Message pull() throws HTPException {
        PullResult pullSyncImpl = pullSyncImpl(1, 0L);
        if (PullStatus.FOUND.equals(pullSyncImpl.getPullStatus())) {
            return pullSyncImpl.getMsgFoundList().get(0);
        }
        return null;
    }

    public PullResult pull(int i) throws HTPException {
        return pullSyncImpl(i, 0L);
    }

    public PullResult pull(int i, long j) throws HTPException {
        return pullSyncImpl(i, j);
    }

    public PullResult pullMessage(BrokerSelector brokerSelector, long j, int i, long j2) throws HTPException {
        return pullMessageImpl(brokerSelector, j, i, j2, null);
    }

    public void pullMessage(BrokerSelector brokerSelector, long j, int i, PullCallback pullCallback, long j2) throws HTPException {
        pullMessageImpl(brokerSelector, j, i, j2, pullCallback);
    }

    public void pullAsync(PullCallback pullCallback) throws HTPException {
        pullAsync(1, 0L, pullCallback);
    }

    public void pullAsync(final PullMsgCallback pullMsgCallback) throws HTPException {
        pullAsync(1, 0L, new PullCallback() { // from class: com.tongtech.client.htp.consumer.impl.HTPPullConsumerImpl.1
            @Override // com.tongtech.client.consumer.PullCallback
            public void onSuccess(PullResult pullResult) {
                if (PullStatus.FOUND.equals(pullResult.getPullStatus())) {
                    pullMsgCallback.onSuccess(pullResult.getMsgFoundList().get(0));
                } else {
                    pullMsgCallback.onSuccess(null);
                }
            }

            @Override // com.tongtech.client.consumer.PullCallback
            public void onException(Throwable th) {
                pullMsgCallback.onException(th);
            }
        });
    }

    public void pullAsync(int i, PullCallback pullCallback) throws HTPException {
        pullAsync(i, 0L, pullCallback);
    }

    public void pullAsync(int i, long j, PullCallback pullCallback) throws HTPException {
        pullAsyncImpl(i, j, pullCallback);
    }

    private PullResult pullSyncImpl(int i, long j) throws HTPException {
        return pullMessageImpl(i, j, null);
    }

    private void pullAsyncImpl(int i, long j, PullCallback pullCallback) throws HTPException {
        try {
            pullMessageImpl(i, j, pullCallback);
        } catch (Exception e) {
            pullCallback.onException(e);
        }
    }

    private PullResult pullMessageImpl(int i, long j, PullCallback pullCallback) throws HTPException {
        PullResult pullResult = null;
        isRunning();
        if (this.mQClientFactory.getClientConfig().isAllowCreateTopicDelayed() && !checkeTopic(j)) {
            PullResult pullResult2 = new PullResult(PullStatus.NO_NEW_MSG);
            if (pullCallback != null) {
                pullCallback.onSuccess(pullResult2);
            }
            return pullResult2;
        }
        ConcurrentMap<String, Set<TopicBrokerInfo>> fetchSubscribeMessageQueues = fetchSubscribeMessageQueues(j);
        checkPullMaxNums(i);
        checkPullType(fetchSubscribeMessageQueues);
        ConcurrentMap<String, BlockingQueue<TopicBrokerInfo>> consumerQueueMap = ConsumerCommon.getConsumerQueueMap(fetchSubscribeMessageQueues, this.topicLevelRoundRobin);
        ConsumeMessageContext consumeMessageContext = null;
        if (hasHook()) {
            consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setClusterName(getClusterName());
            consumeMessageContext.setNamespace(getDomain());
            consumeMessageContext.setConsumerGroup(getConsumerGroupName());
            consumeMessageContext.setClientId(this.mQClientFactory.getClientId());
            consumeMessageContext.setDispatcherType(pullCallback == null ? TraceDispatcherType.PULL_SYNC : TraceDispatcherType.PULL_ASYNC);
            consumeMessageContext.setConsumeModel(ConsumeModel.getConsumeModel(getMessageModel(), getAllocateStrategy()));
            consumeMessageContext.setPullType(this.htpPullConsumer.getPullType());
            consumeMessageContext.setSuccess(false);
            executeHookBefore(consumeMessageContext);
        }
        TopicBrokerInfo topicBrokerInfo = null;
        MessageOffset messageOffset = new MessageOffset(0L, 0L);
        for (String str : consumerQueueMap.keySet()) {
            BlockingQueue<TopicBrokerInfo> blockingQueue = consumerQueueMap.get(str);
            log.debug(str + ",queue size = " + blockingQueue.size());
            while (!blockingQueue.isEmpty()) {
                topicBrokerInfo = blockingQueue.poll();
                messageOffset = getConsumerOffset(topicBrokerInfo);
                log.debug(topicBrokerInfo.getBrokerName() + HelpFormatter.DEFAULT_LONG_OPT_SEPARATOR + topicBrokerInfo.getTopicName() + HelpFormatter.DEFAULT_LONG_OPT_SEPARATOR + topicBrokerInfo.getIp() + ":" + topicBrokerInfo.getPort() + " offset=" + messageOffset);
                pullResult = publicPullSync(messageOffset, topicBrokerInfo, this, i, getPullTimeoutMs(), 0L);
                if (pullResult != null && pullResult.getPullStatus() == PullStatus.FOUND) {
                    processPullResult(topicBrokerInfo, pullResult, consumeMessageContext);
                    if (pullCallback != null) {
                        pullCallback.onSuccess(pullResult);
                    }
                    return pullResult;
                }
            }
        }
        if (pullResult == null) {
            pullResult = new PullResult(PullStatus.NO_NEW_MSG);
        }
        if (PullStatus.NO_NEW_MSG.equals(pullResult.getPullStatus()) && j > 999 && topicBrokerInfo != null) {
            if (pullCallback != null) {
                publicSuspendPullAsync(messageOffset, topicBrokerInfo, this, i, j, pullCallback, consumeMessageContext);
                return null;
            }
            pullResult = publicPullSync(messageOffset, topicBrokerInfo, this, i, getPullTimeoutMs(), j);
            if (pullResult != null && pullResult.getPullStatus() == PullStatus.FOUND) {
                processPullResult(topicBrokerInfo, pullResult, consumeMessageContext);
            }
        }
        if (pullCallback != null) {
            pullCallback.onSuccess(pullResult);
        }
        return pullResult;
    }

    protected PullResult pullMessageImpl(BrokerSelector brokerSelector, long j, int i, long j2, PullCallback pullCallback) throws HTPException {
        PullResult pullResult = null;
        isRunning();
        if (this.mQClientFactory.getClientConfig().isAllowCreateTopicDelayed() && !checkeTopic(j2)) {
            return new PullResult(PullStatus.NO_NEW_MSG, this.subRule);
        }
        Validators.checkConsumerPullMode(this);
        ConcurrentMap<String, Set<TopicBrokerInfo>> fetchSubscribeMessageQueues = fetchSubscribeMessageQueues(j2);
        checkPullMaxNums(i);
        checkIsBroadcasting();
        checkOffset(j);
        MessageOffset messageOffset = new MessageOffset(j, 0L);
        ConsumeMessageContext consumeMessageContext = null;
        if (hasHook()) {
            consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setClusterName(getClusterName());
            consumeMessageContext.setNamespace(getDomain());
            consumeMessageContext.setConsumerGroup(getConsumerGroupName());
            consumeMessageContext.setClientId(this.mQClientFactory.getClientId());
            consumeMessageContext.setDispatcherType(pullCallback == null ? TraceDispatcherType.PULL_SYNC : TraceDispatcherType.PULL_ASYNC);
            consumeMessageContext.setConsumeModel(ConsumeModel.getConsumeModel(getMessageModel(), getAllocateStrategy()));
            consumeMessageContext.setPullType(this.htpPullConsumer.getPullType());
            consumeMessageContext.setSuccess(false);
            executeHookBefore(consumeMessageContext);
        }
        for (Map.Entry<String, Set<TopicBrokerInfo>> entry : fetchSubscribeMessageQueues.entrySet()) {
            if (pullCallback == null) {
                List<TopicBrokerInfo> list = (List) entry.getValue().stream().filter(topicBrokerInfo -> {
                    return Objects.equals(topicBrokerInfo.getBrokerName(), brokerSelector.getBrokerName()) || Objects.equals(new StringBuilder().append(MixAll.TCP_PREFIX).append(IpUtils.getAddr(topicBrokerInfo)).toString(), brokerSelector.getBrokerAddr());
                }).collect(Collectors.toList());
                if (CollectionUtils.isEmpty(list)) {
                    log.error("consumer No route info of namespace [" + getDomain() + "] and topic/queue [" + entry.getKey() + "]");
                } else {
                    for (TopicBrokerInfo topicBrokerInfo2 : list) {
                        pullResult = publicPullSync(messageOffset, topicBrokerInfo2, this, i, j2, 0L);
                        if (pullResult != null && pullResult.getPullStatus() == PullStatus.FOUND) {
                            processPullResult(topicBrokerInfo2, pullResult, consumeMessageContext);
                            return pullResult;
                        }
                    }
                }
            } else {
                publicPullAsync(messageOffset, entry.getValue().stream().filter(topicBrokerInfo3 -> {
                    return Objects.equals(topicBrokerInfo3.getBrokerName(), brokerSelector.getBrokerName());
                }).findAny().orElseThrow(() -> {
                    return new TLQClientException(ClientErrorCode.NOT_FIND_ROUTER, "consumer No route info of namespace [" + getDomain() + "] and topic/queue [" + ((String) entry.getKey()) + "] and brokerName [" + brokerSelector.getBrokerName() + "]");
                }), this, i, j2, pullCallback, consumeMessageContext);
            }
        }
        if (pullResult == null) {
            pullResult = new PullResult(PullStatus.NO_NEW_MSG);
        }
        if (pullCallback != null) {
            pullCallback.onSuccess(pullResult);
        }
        return pullResult;
    }

    private PullResult publicPullSync(MessageOffset messageOffset, TopicBrokerInfo topicBrokerInfo, TLQConsumerPullInner tLQConsumerPullInner, int i, long j, long j2) throws HTPException {
        try {
            PullResult pullSuspendMessageKernelImpl = j2 > 0 ? this.pullAPIWrapper.pullSuspendMessageKernelImpl(topicBrokerInfo, messageOffset, this.mQClientFactory.getClientId(), tLQConsumerPullInner, i, j, j2, CommunicationMode.SYNC, null) : this.pullAPIWrapper.pullMessageKernelImpl(topicBrokerInfo, messageOffset, this.mQClientFactory.getClientId(), tLQConsumerPullInner, i, j, CommunicationMode.SYNC, null);
            if (pullSuspendMessageKernelImpl != null && pullSuspendMessageKernelImpl.getPullStatus() == PullStatus.FOUND) {
                handleRaftMessageHeaderInfo(pullSuspendMessageKernelImpl, topicBrokerInfo);
                pullSuspendMessageKernelImpl.setGroupName(tLQConsumerPullInner.getConsumerGroupName());
                pullSuspendMessageKernelImpl.setConsumerId(tLQConsumerPullInner.getConsumerId());
            }
            return pullSuspendMessageKernelImpl;
        } catch (Exception e) {
            if ((e instanceof TLQBrokerException) && ResponseCode.CB_CONSUMER_NOT_REGISTER.getStateCode() == ((TLQBrokerException) e).getResponseCode()) {
                this.mQClientFactory.removeConsumerRegisterBrokerTableBroker(tLQConsumerPullInner.getConsumerId(), topicBrokerInfo);
            }
            log.error("consumer pull message exception {}", e.getMessage());
            if (e instanceof RemotingConnectException) {
                return null;
            }
            throw new HTPException("Failed to get message", e);
        }
    }

    private void publicPullAsync(MessageOffset messageOffset, TopicBrokerInfo topicBrokerInfo, TLQConsumerPullInner tLQConsumerPullInner, int i, long j, PullCallback pullCallback, ConsumeMessageContext consumeMessageContext) {
        try {
            this.pullAPIWrapper.pullKernelImpl(topicBrokerInfo, messageOffset, this.mQClientFactory.getClientId(), tLQConsumerPullInner, tLQConsumerPullInner.getConsumerGroupName(), topicBrokerInfo.getTopicName(), tLQConsumerPullInner.getDomain(), i, j, CommunicationMode.ASYNC, buildPullCallback(topicBrokerInfo, tLQConsumerPullInner, pullCallback, consumeMessageContext));
        } catch (Exception e) {
            log.error("consumer pullAsync unknow exception", (Throwable) e);
            pullCallback.onException(e);
        }
    }

    private void publicSuspendPullAsync(MessageOffset messageOffset, TopicBrokerInfo topicBrokerInfo, TLQConsumerPullInner tLQConsumerPullInner, int i, long j, PullCallback pullCallback, ConsumeMessageContext consumeMessageContext) {
        try {
            this.pullAPIWrapper.pullAsyncSuspendImpl(topicBrokerInfo, messageOffset, this.mQClientFactory.getClientId(), tLQConsumerPullInner, topicBrokerInfo.getTopicName(), i, tLQConsumerPullInner.getPullTimeoutMs(), CommunicationMode.ASYNC, buildPullCallback(topicBrokerInfo, tLQConsumerPullInner, pullCallback, consumeMessageContext), j);
        } catch (Exception e) {
            log.error("consumer suspend pull message exception {}", e.getMessage());
            pullCallback.onException(e);
        }
    }

    private PullCallback buildPullCallback(final TopicBrokerInfo topicBrokerInfo, final TLQConsumerPullInner tLQConsumerPullInner, final PullCallback pullCallback, final ConsumeMessageContext consumeMessageContext) {
        return new PullCallback() { // from class: com.tongtech.client.htp.consumer.impl.HTPPullConsumerImpl.2
            @Override // com.tongtech.client.consumer.PullCallback
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null && pullResult.getPullStatus() == PullStatus.FOUND) {
                    HTPPullConsumerImpl.this.handleRaftMessageHeaderInfo(pullResult, topicBrokerInfo);
                    pullResult.setGroupName(tLQConsumerPullInner.getConsumerGroupName());
                    pullResult.setConsumerId(tLQConsumerPullInner.getConsumerId());
                    try {
                        HTPPullConsumerImpl.this.processPullResult(topicBrokerInfo, pullResult, consumeMessageContext);
                    } catch (TLQClientException e) {
                        pullCallback.onException(e);
                        return;
                    }
                }
                pullCallback.onSuccess(pullResult);
            }

            @Override // com.tongtech.client.consumer.PullCallback
            public void onException(Throwable th) {
                pullCallback.onException(th);
            }
        };
    }

    private void executeConsumeMessageHookAfter(PullResult pullResult, ConsumeMessageContext consumeMessageContext, TopicBrokerInfo topicBrokerInfo) {
        if (consumeMessageContext != null) {
            consumeMessageContext.setSuccess(pullResult.getPullStatus() == PullStatus.FOUND);
            consumeMessageContext.setRequestId(pullResult.getRequestId());
            consumeMessageContext.setStatus(pullResult.getPullStatus().name());
            consumeMessageContext.setBrokerName(topicBrokerInfo.getBrokerName());
            consumeMessageContext.setBrokerAddr(IpUtils.getAddr(topicBrokerInfo));
            consumeMessageContext.setTopic(pullResult.getTopic());
            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
            executeHookAfter(consumeMessageContext);
        }
    }

    private void executeConsumeMessageHookAck(PullResult pullResult, ConsumeMessageContext consumeMessageContext, TopicBrokerInfo topicBrokerInfo) {
        if (consumeMessageContext != null) {
            consumeMessageContext.setBrokerName(topicBrokerInfo.getBrokerName());
            consumeMessageContext.setBrokerAddr(IpUtils.getAddr(topicBrokerInfo));
            consumeMessageContext.setDispatcherType(TraceDispatcherType.ACK);
            List<MessageOffset> buildOffsetList = pullResult.buildOffsetList();
            consumeMessageContext.setTopic(pullResult.getTopic());
            consumeMessageContext.setOffsetList(buildOffsetList);
            consumeMessageContext.setSuccess(true);
            consumeMessageContext.setStatus("ACK");
            executeHookAck(consumeMessageContext);
        }
    }

    public void acknowledge(Message message) throws HTPException {
        if (ConsumeModel.CLUSTERING.equals(this.htpPullConsumer.getConsumeModel())) {
            this.mQClientFactory.acknowledge(message);
        }
    }

    public void acknowledge(PullResult pullResult) throws HTPException {
        if (ConsumeModel.CLUSTERING.equals(this.htpPullConsumer.getConsumeModel())) {
            this.mQClientFactory.acknowledge(pullResult);
        }
    }

    public void unsubscribe(String str) {
        getSubscriptionInner().remove(str);
    }

    public MessageOffset fetchConsumeOffset(TopicBrokerInfo topicBrokerInfo) throws HTPException {
        checkIsBroadcasting();
        return fetchConsumeOffset(topicBrokerInfo, false);
    }

    public MessageOffset fetchConsumeOffset(TopicBrokerInfo topicBrokerInfo, boolean z) throws TLQClientException {
        isRunning();
        return this.offsetStore.readOffset(topicBrokerInfo, z ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE);
    }

    public void updateConsumeOffset(TopicBrokerInfo topicBrokerInfo, MessageOffset messageOffset) throws TLQClientException {
        isRunning();
        log.info("update offset = " + messageOffset.getConsumequeueOffset() + " index=" + messageOffset.getRaftEntryIndex());
        this.offsetStore.updateOffset(topicBrokerInfo, messageOffset);
    }

    public ConcurrentMap<String, Set<TopicBrokerInfo>> fetchSubscribeMessageQueues() throws TLQClientException {
        return fetchSubscribeMessageQueues(0L);
    }

    public ConcurrentMap<String, Set<TopicBrokerInfo>> fetchSubscribeMessageQueues(long j) throws TLQClientException {
        isRunning();
        long currentTimeMillis = System.currentTimeMillis();
        while (j > 0 && System.currentTimeMillis() - currentTimeMillis < j) {
            ConcurrentMap<String, Set<TopicBrokerInfo>> topicBrokerInfoBalance = getTopicBrokerInfoBalance(this.rebalanceImpl.getProcessQueueTable());
            if (!topicBrokerInfoBalance.isEmpty()) {
                return topicBrokerInfoBalance;
            }
            try {
                Thread.sleep(pollIntervalMillis);
            } catch (InterruptedException e) {
                log.warn("fetchSubscribeMessageQueues sleep fail", (Throwable) e);
                Thread.currentThread().interrupt();
            }
        }
        return getTopicBrokerInfoBalance(this.rebalanceImpl.getProcessQueueTable());
    }

    protected void isNotRunning() throws TLQClientException {
        if (this.serviceState == ServiceState.RUNNING) {
            log.error("The consumer is in running status, " + this.serviceState);
            throw new TLQClientException("The consumer is in running status, " + this.serviceState, (Throwable) null);
        }
    }

    protected void isRunning() throws TLQClientException {
        if (this.serviceState != ServiceState.RUNNING) {
            log.error("The consumer is not in running status, " + this.serviceState);
            throw new TLQClientException("The consumer is not in running status, " + this.serviceState, (Throwable) null);
        }
    }

    public MessageOffset getConsumerOffset(TopicBrokerInfo topicBrokerInfo) throws TLQClientException {
        MessageOffset messageOffset = null;
        if (this.htpPullConsumer.getPullType() == null) {
            throw new TLQClientException("PullType cannot be empty ", (Throwable) null);
        }
        switch (getMessageModel()) {
            case BROADCASTING:
                messageOffset = fetchConsumeOffset(topicBrokerInfo, false);
                break;
            case CLUSTERING:
                messageOffset = new MessageOffset(ConsumerCommon.getPullTypeInt(this.htpPullConsumer.getPullType()), 0L);
                break;
        }
        if (null == messageOffset) {
            throw new RuntimeException(" consumerOffset  is not supported  ");
        }
        return messageOffset;
    }

    private boolean checkeTopic() throws TLQClientException {
        if (getSubscriptionInner() != null && getSubscriptionInner().size() != 0) {
            return true;
        }
        if (!this.mQClientFactory.getClientConfig().isAllowCreateTopicDelayed()) {
            throw new TLQClientException("The subscription information cannot be empty! ", (Throwable) null);
        }
        log.info("The subscription information is empty!");
        return false;
    }

    private boolean checkeTopic(long j) throws TLQClientException {
        long currentTimeMillis = System.currentTimeMillis();
        while (j > 0 && System.currentTimeMillis() - currentTimeMillis < j) {
            boolean checkeTopic = checkeTopic();
            if (checkeTopic) {
                return checkeTopic;
            }
            try {
                Thread.sleep(pollIntervalMillis);
            } catch (InterruptedException e) {
                log.warn("checkTopic sleep fail", (Throwable) e);
                Thread.currentThread().interrupt();
            }
        }
        return checkeTopic();
    }

    public static void checkPullMaxNums(int i) throws TLQClientException {
        if (i < 1 || i > 2000) {
            log.error("the consumer pullMaxSize Out of range [1, 2000]");
            throw new TLQClientException("the consumer pullMaxSize Out of range [1, 2000]", (Throwable) null);
        }
    }

    private void checkIsBroadcasting() throws TLQClientException {
        if (!ConsumeModel.BROADCASTING.equals(this.htpPullConsumer.getConsumeModel())) {
            throw new TLQClientException("only support BROADCASTING", (Throwable) null);
        }
        if (!this.htpPullConsumer.getPullType().equals(PullType.PullOffset)) {
            throw new TLQClientException("BROADCASTING only support PullOffset", (Throwable) null);
        }
    }

    private void checkOffset(long j) throws TLQClientException {
        if (j < 0) {
            throw new TLQClientException("consumerOffset can't be less than 0", (Throwable) null);
        }
    }

    private void checkPullType(ConcurrentMap<String, Set<TopicBrokerInfo>> concurrentMap) throws TLQClientException {
        switch (this.htpPullConsumer.getConsumeModel()) {
            case BROADCASTING:
                if (!this.htpPullConsumer.getPullType().equals(PullType.PullOffset)) {
                    throw new TLQClientException("BROADCASTING only support PullOffset", (Throwable) null);
                }
                return;
            case CLUSTERING:
                if (!this.htpPullConsumer.getPullType().equals(PullType.PullEndContinue) || concurrentMap == null) {
                    return;
                }
                Iterator<Set<TopicBrokerInfo>> it = concurrentMap.values().iterator();
                while (it.hasNext()) {
                    for (TopicBrokerInfo topicBrokerInfo : it.next()) {
                        if (topicBrokerInfo != null) {
                            this.tlqResetConsumerOffset.resetConsumerOffset(topicBrokerInfo, getConsumerGroupName(), getConsumerOffset(topicBrokerInfo).getConsumequeueOffset());
                        }
                    }
                }
                return;
            default:
                return;
        }
    }

    private void checkAllocateMq() {
        for (int i = 0; i < 5 && this.rebalanceImpl.getProcessQueueTable().size() <= 0; i++) {
            try {
                Thread.sleep(20L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void checkSub() throws TLQClientException {
        if (null == getSubscriptionInner() || (getSubscriptionInner().size() == 0 && Validators.isEmpty(this.subRule))) {
            log.error("Consumer subscription information is null");
            throw new TLQClientException("Consumer subscription information is null", (Throwable) null);
        }
    }

    private void checkConfig() throws TLQClientException {
        Validators.checkConsumerId(this.htpPullConsumer.getConsumerId());
        Validators.checkGroup(this.htpPullConsumer.getConsumerGroup());
        Validators.checkWaitInterval(this.htpPullConsumer.getWaitInterval());
        if (CollectionUtils.isNotEmpty(this.tagFilter)) {
            Validators.checkHtpCons(CheckType.GROUP_FILTER_RULE, String.join("||", this.tagFilter));
        }
        switch (this.htpPullConsumer.getConsumeModel()) {
            case BROADCASTING:
                if (PullType.PullOffset.equals(this.htpPullConsumer.getPullType())) {
                    return;
                }
                log.error("BROADCASTING is not supported " + this.htpPullConsumer.getPullType());
                throw new TLQClientException("BROADCASTING is not supported " + this.htpPullConsumer.getPullType(), (Throwable) null);
            case CLUSTERING:
                if (PullType.PullOffset.equals(this.htpPullConsumer.getPullType())) {
                    log.error("CLUSTERING is not supported " + this.htpPullConsumer.getPullType());
                    throw new TLQClientException("CLUSTERING is not supported  " + this.htpPullConsumer.getPullType(), (Throwable) null);
                }
                return;
            default:
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processPullResult(TopicBrokerInfo topicBrokerInfo, PullResult pullResult, ConsumeMessageContext consumeMessageContext) throws TLQClientException {
        executeConsumeMessageHookAfter(pullResult, consumeMessageContext, topicBrokerInfo);
        switch (getMessageModel()) {
            case BROADCASTING:
                updateConsumeOffset(topicBrokerInfo, new MessageOffset(pullResult.getMaxBatchConsumeOffset().getConsumequeueOffset(), pullResult.getMaxBatchConsumeOffset().getRaftEntryIndex()));
                executeConsumeMessageHookAck(pullResult, consumeMessageContext, topicBrokerInfo);
                return;
            case CLUSTERING:
                if (this.htpPullConsumer.isAutoAck()) {
                    this.mQClientFactory.consumerCommitAck(pullResult, topicBrokerInfo);
                    return;
                } else {
                    this.mQClientFactory.saveLocalOffsetStore(pullResult);
                    return;
                }
            default:
                return;
        }
    }

    public HTPPullConsumer getHtpPullConsumer() {
        return this.htpPullConsumer;
    }

    private ConcurrentMap<String, Set<TopicBrokerInfo>> updateTopicSubscribeInfoWhenSubscriptionChanged() throws TLQClientException {
        return this.mQClientFactory.getTopicBrokerInfo(getSubscriptionInner(), this.htpPullConsumer.getDomain(), 1, this.htpPullConsumer.getConsumerId(), this.htpPullConsumer.getConsumerGroup());
    }

    public void initOffsetStore(ConcurrentMap<String, Set<TopicBrokerInfo>> concurrentMap, ClientConfig clientConfig) throws HTPException {
        if (ConsumeModel.BROADCASTING.equals(getMessageModel())) {
            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.htpPullConsumer.getConsumerGroup());
            if (PullFromWhere.RemoteOffset.equals(this.htpPullConsumer.getPullFromWhere())) {
                OffsetUtils.loadRemoteOffset(this.mQClientFactory.getChooseNamesrv(), concurrentMap, clientConfig, this.offsetStore);
            }
            try {
                this.offsetStore.load();
            } catch (IOException e) {
                throw new HTPException("Failed to load the offset file", e);
            }
        }
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public void doRebalance() {
        this.rebalanceImpl.doRebalance(false, this);
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public String getConsumerGroupName() {
        return this.htpPullConsumer.getConsumerGroup();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public int getPutGet() {
        return 1;
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public String getConsumerId() {
        return this.htpPullConsumer.getConsumerId();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public String getClientId() {
        return this.htpPullConsumer.getClientId();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public ModeType getModeType() {
        return ModeType.TOPIC;
    }

    public void setModeType(ModeType modeType) {
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public String getDomain() {
        return this.htpPullConsumer.getDomain();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public PullType getPullType() {
        return this.htpPullConsumer.getPullType();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public AllocateMessageQueueStrategy getAllocateStrategy() {
        return this.htpPullConsumer.getAllocateStrategy();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public boolean isAutoAck() {
        boolean z = true;
        if (ConsumeModel.CLUSTERING.equals(getMessageModel())) {
            z = this.htpPullConsumer.isAutoAck();
        }
        return z;
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
        return this.rebalanceImpl.getSubscriptionInnerMap();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public String getClusterName() {
        return this.htpPullConsumer.getCluster();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public int getRecvBufSize() {
        return this.recvBufSize;
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public SubscribeType getSubscribeType() {
        return SubscribeType.TLQ_SUB_DURABLE;
    }

    public void setSubscribeType(SubscribeType subscribeType) {
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public ProtocolType getProtocolType() {
        return ProtocolType.TCP;
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public ClientRegisterType getClientRegisterType() {
        return ClientRegisterType.PUBLISH_SUBSCRIBE;
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public ConsumeModel getMessageModel() {
        return this.htpPullConsumer.getConsumeModel();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public OffsetStore getOffsetStore() {
        return this.offsetStore;
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public long getWaitInterval() {
        return this.htpPullConsumer.getWaitInterval();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public void persistConsumerOffset() {
        try {
            if (ConsumeModel.BROADCASTING.equals(getMessageModel())) {
                isRunning();
                this.offsetStore.persistAll(new HashSet(this.rebalanceImpl.getProcessQueueTable().keySet()));
            }
        } catch (Exception e) {
        }
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public void updateTopicSubscribeInfo(TopicMapping topicMapping, Set<TopicBrokerInfo> set) {
        ConcurrentMap<String, SubscriptionData> subscriptionInner = getSubscriptionInner();
        if (subscriptionInner == null || !subscriptionInner.containsKey(topicMapping.getTopic())) {
            return;
        }
        this.rebalanceImpl.getTopicSubscribeInfoTable().put(topicMapping.getTopic(), set);
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public boolean isSubscribeTopicNeedUpdate(String str) {
        return false;
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPullInner
    public boolean autoCommit() {
        return ConsumeModel.BROADCASTING.equals(getMessageModel());
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPullInner
    public long getPullTimeoutMs() {
        return this.htpPullConsumer.getPullTimeoutMs();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPullInner
    public void setBroadCasting() {
    }

    public void setPullNodeRollPolicy(NodeRollPolicy nodeRollPolicy) {
        if (nodeRollPolicy.equals(NodeRollPolicy.BY_TOPIC)) {
            this.topicLevelRoundRobin = true;
        } else {
            this.topicLevelRoundRobin = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleRaftMessageHeaderInfo(PullResult pullResult, TopicBrokerInfo topicBrokerInfo) {
        if (pullResult != null) {
            if (CollectionUtils.isNotEmpty(pullResult.getMsgFoundList())) {
                long minConsumeQueueOffset = pullResult.getMinConsumeQueueOffset();
                for (MessageExt messageExt : pullResult.getMsgFoundList()) {
                    messageExt.setBrokerName(topicBrokerInfo.getBrokerName());
                    messageExt.setDomain(topicBrokerInfo.getDomain());
                    messageExt.setTopic(topicBrokerInfo.getTopicName());
                    messageExt.setCluster(topicBrokerInfo.getCluster());
                    long j = minConsumeQueueOffset;
                    minConsumeQueueOffset = j + 1;
                    messageExt.setConsumeQueueOffset(j);
                }
            }
            pullResult.setMessageQueue(topicBrokerInfo);
            pullResult.setTopic(topicBrokerInfo.getTopicName());
            pullResult.setDomain(topicBrokerInfo.getDomain());
            pullResult.setClientId(this.mQClientFactory.getClientId());
        }
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public List<String> getTagFilter() {
        return this.tagFilter;
    }

    public ServiceState getServiceState() {
        return this.serviceState;
    }
}
