package com.tongtech.client.consumer.impl;

import com.tongtech.client.common.ClientRegisterType;
import com.tongtech.client.common.ModeType;
import com.tongtech.client.consumer.AllocateMessageQueueStrategy;
import com.tongtech.client.consumer.PullResult;
import com.tongtech.client.consumer.TLQConsumerPushInner;
import com.tongtech.client.consumer.TLQPushConsumerAbstract;
import com.tongtech.client.consumer.common.ConsumeModel;
import com.tongtech.client.consumer.common.PullFromWhere;
import com.tongtech.client.consumer.common.PullRequest;
import com.tongtech.client.consumer.common.PullType;
import com.tongtech.client.consumer.common.SubscribeType;
import com.tongtech.client.consumer.common.SubscriptionData;
import com.tongtech.client.exception.TLQBrokerException;
import com.tongtech.client.exception.TLQClientException;
import com.tongtech.client.message.MessageExt;
import com.tongtech.client.message.MessageOffset;
import com.tongtech.client.message.TopicMapping;
import com.tongtech.client.producer.TopicBrokerInfo;
import com.tongtech.client.remoting.exception.RemotingConnectException;
import com.tongtech.client.remoting.exception.RemotingException;
import com.tongtech.client.remoting.exception.RemotingSendRequestException;
import com.tongtech.client.remoting.exception.RemotingTimeoutException;
import com.tongtech.client.remoting.netty.ProtocolType;
import com.tongtech.commons.collections.CollectionUtils;
import com.tongtech.slf4j.Logger;
import com.tongtech.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/* loaded from: input_file:com/tongtech/client/consumer/impl/TLQPushConsumerImpl.class */
public class TLQPushConsumerImpl extends TLQPushConsumerAbstract implements TLQConsumerPushInner {
    private static Logger log = LoggerFactory.getLogger((Class<?>) TLQPushConsumerImpl.class);
    private final TLQPushConsumer defaultMQPushConsumer;
    private Map<TopicBrokerInfo, Long> pullConsumerOffset = new ConcurrentHashMap();
    private AllocateMessageQueueStrategy allocateStrategy;

    public TLQPushConsumerImpl(TLQPushConsumer tLQPushConsumer) {
        this.defaultMQPushConsumer = tLQPushConsumer;
        this.pullTimeDelayMillsWhenException = tLQPushConsumer.getPullTimeDelayMillsWhenException();
    }

    public TLQPushConsumer getDefaultMQPushConsumer() {
        return this.defaultMQPushConsumer;
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public void pullMessage(PullRequest pullRequest) {
        pullMessage(pullRequest, this);
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public void processPullResult(PullResult pullResult, TopicBrokerInfo topicBrokerInfo) {
        switch (getMessageModel()) {
            case BROADCASTING:
                updateConsumeOffset(topicBrokerInfo, new MessageOffset(pullResult.getMaxBatchConsumeOffset().getConsumequeueOffset(), pullResult.getMaxBatchConsumeOffset().getRaftEntryIndex()));
                return;
            case CLUSTERING:
                if (this.defaultMQPushConsumer.getAutoCommit()) {
                    this.mQClientFactory.consumerCommitAck(pullResult, topicBrokerInfo);
                    return;
                } else {
                    this.mQClientFactory.saveLocalOffsetStore(pullResult);
                    return;
                }
            default:
                return;
        }
    }

    public void updateConsumeOffset(TopicBrokerInfo topicBrokerInfo, MessageOffset messageOffset) {
        try {
            makeSureStateOK();
            getOffsetStore().updateOffset(topicBrokerInfo, messageOffset);
        } catch (TLQClientException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public long getConsumeConcurrentlyMaxSpan() {
        return this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan();
    }

    public void shutdown() {
        shutdown(this);
    }

    public void start() throws TLQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        this.rebalanceImpl.setAllocateMessageQueueStrategy(getAllocateStrategy());
        start(this.defaultMQPushConsumer, this);
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public int getConsumeThreadMin() {
        return this.defaultMQPushConsumer.getConsumeThreadMin();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public int getConsumeThreadMax() {
        return this.defaultMQPushConsumer.getConsumeThreadMax();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public long getSuspendCurrentQueueTimeMillis() {
        return this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public int getPullThresholdForQueue() {
        return this.defaultMQPushConsumer.getPullThresholdForQueue();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public int getPullThresholdSizeForQueue() {
        return this.defaultMQPushConsumer.getPullThresholdSizeForQueue();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public long getPullInterval() {
        return this.defaultMQPushConsumer.getPullInterval();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public int getConsumeMessageBatchMaxSize() {
        return this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public String getClientId() {
        return this.defaultMQPushConsumer.getClientId();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public int getPullBatchSize() {
        if (this.defaultMQPushConsumer.getModeType() == ModeType.QUEUE) {
            return 1;
        }
        return this.defaultMQPushConsumer.getPullBatchSize();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner, com.tongtech.client.consumer.TLQConsumerInner
    public PullType getPullType() {
        return this.defaultMQPushConsumer.getPullType();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public AllocateMessageQueueStrategy getAllocateStrategy() {
        return this.allocateStrategy;
    }

    public void setAllocateStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.allocateStrategy = allocateMessageQueueStrategy;
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public boolean isAutoAck() {
        boolean z = true;
        if (ConsumeModel.CLUSTERING.equals(getMessageModel())) {
            z = this.defaultMQPushConsumer.getAutoCommit();
        }
        return z;
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
        return this.rebalanceImpl.getSubscriptionInnerMap();
    }

    public void unsubscribe(String str) {
        this.rebalanceImpl.unSubscriptionInner(str);
    }

    @Override // com.tongtech.client.consumer.TLQPushConsumerAbstract
    public boolean isConsumeOrderly() {
        return this.consumeOrderly;
    }

    public void setConsumeOrderly(boolean z) {
        this.consumeOrderly = z;
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public String getConsumerGroupName() {
        return this.defaultMQPushConsumer.getConsumerGroup();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public void doRebalance() {
        if (this.pause) {
            return;
        }
        this.rebalanceImpl.doRebalance(isConsumeOrderly(), this);
    }

    public void resume() {
        this.pause = false;
        doRebalance();
        log.info("resume this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup());
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public int getPutGet() {
        return getDefaultMQPushConsumer().getPutGet();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public String getConsumerId() {
        return this.defaultMQPushConsumer.getConsumerId();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public ModeType getModeType() {
        return this.defaultMQPushConsumer.getModeType();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public String getDomain() {
        return this.defaultMQPushConsumer.getDomain();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public String getClusterName() {
        return this.defaultMQPushConsumer.getCluster();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public int getRecvBufSize() {
        return this.defaultMQPushConsumer.getRecvBufSize();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public SubscribeType getSubscribeType() {
        return this.defaultMQPushConsumer.getSubscribeType();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public ProtocolType getProtocolType() {
        return this.defaultMQPushConsumer.getProtocolType();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public ClientRegisterType getClientRegisterType() {
        return this.defaultMQPushConsumer.getClientRegisterType();
    }

    @Override // com.tongtech.client.consumer.TLQPushConsumerAbstract, com.tongtech.client.consumer.TLQConsumerPushInner, com.tongtech.client.consumer.TLQConsumerInner
    public ConsumeModel getMessageModel() {
        return this.defaultMQPushConsumer.getConsumeModel();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public long getNextOffset() {
        return this.defaultMQPushConsumer.getNextOffset();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public PullFromWhere getPullFromWhere() {
        return this.defaultMQPushConsumer.getPullFromWhere();
    }

    @Override // com.tongtech.client.consumer.TLQPushConsumerAbstract
    public TLQPushConsumer getTLQPushConsumer() {
        return this.defaultMQPushConsumer;
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public void persistConsumerOffset() {
        try {
            if (ConsumeModel.BROADCASTING.equals(getMessageModel())) {
                makeSureStateOK();
                getOffsetStore().persistAll(new HashSet(this.rebalanceImpl.getProcessQueueTable().keySet()));
            }
        } catch (Exception e) {
            log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", (Throwable) e);
        }
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public void updateTopicSubscribeInfo(TopicMapping topicMapping, Set<TopicBrokerInfo> set) {
        ConcurrentMap<String, SubscriptionData> subscriptionInnerMap = getSubscriptionInnerMap();
        if (subscriptionInnerMap != null) {
            if (subscriptionInnerMap.containsKey(topicMapping.getTopic()) || subscriptionInnerMap.size() == 0) {
                this.rebalanceImpl.getTopicSubscribeInfoTable().put(topicMapping.getTopic(), set);
            }
        }
    }

    public ConcurrentMap<String, SubscriptionData> getSubscriptionInnerMap() {
        return this.rebalanceImpl.getSubscriptionInnerMap();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public boolean isSubscribeTopicNeedUpdate(String str) {
        return false;
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public List<String> getTagFilter() {
        return Collections.emptyList();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public int getMaxReconsumeTimes() {
        return this.defaultMQPushConsumer.getMaxReconsumeTimes();
    }

    public void suspend() {
        this.pause = true;
        log.info("suspend this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup());
    }

    public void setPullTimeDelayMillsWhenException(long j) {
        this.pullTimeDelayMillsWhenException = j;
    }

    private void checkeTopic() throws TLQClientException {
        if (getSubscriptionInner() == null || CollectionUtils.isEmpty(getSubscriptionInner().keySet())) {
            throw new TLQClientException("The subscription information cannot be empty! ", (Throwable) null);
        }
    }

    public Set<TopicBrokerInfo> fetchSubscribeMessageQueues() throws TLQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        if (getSubscriptionInner().size() > 1) {
            throw new TLQClientException("When subscribing to multiple topics, specify the topic to return routing information!", (Throwable) null);
        }
        return fetchSubscribeMessageQueues(getSubscriptionInner().keySet().iterator().next());
    }

    public Set<TopicBrokerInfo> fetchSubscribeMessageQueues(String str) throws TLQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        makeSureStateOK();
        checkeTopic();
        return this.mQClientFactory.getTopicBrokerInfo(getSubscriptionInner(), this.defaultMQPushConsumer.getDomain(), this.defaultMQPushConsumer.getPutGet(), this.defaultMQPushConsumer.getConsumerId(), this.defaultMQPushConsumer.getConsumerGroup()).get(str);
    }

    private TopicBrokerInfo getTopicBrokerInfo() throws TLQClientException, TLQBrokerException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        TopicBrokerInfo findBrokerAddrByTopic = this.mQClientFactory.findBrokerAddrByTopic(new ArrayList(fetchSubscribeMessageQueues()));
        if (findBrokerAddrByTopic == null) {
            throw new TLQBrokerException(0, "Route is empty!");
        }
        return findBrokerAddrByTopic;
    }

    @Override // com.tongtech.client.consumer.TLQConsumerInner
    public long getWaitInterval() {
        return this.defaultMQPushConsumer.getWaitInterval();
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public void sendMessageBack(MessageExt messageExt, int i, TopicBrokerInfo topicBrokerInfo) throws TLQBrokerException, RemotingException, TLQClientException, InterruptedException {
        sendMessageBackAbstract(buildNewMsg(messageExt, i), i, topicBrokerInfo);
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public void sendMessageBack(MessageExt messageExt) throws TLQBrokerException, RemotingException, TLQClientException, InterruptedException {
        sendMessageBackAsNormalMessage(buildNewMsg(messageExt, messageExt.getDelayTimeLevel()));
    }

    @Override // com.tongtech.client.consumer.TLQConsumerPushInner
    public String getDeadLetterTopic() {
        return this.defaultMQPushConsumer.getDeadLetterTopic();
    }

    @Override // com.tongtech.client.consumer.TLQPushConsumerAbstract
    public void setBroadCasting() {
        this.defaultMQPushConsumer.setBroadCasting();
    }
}
