package com.tongtech.client.consumer.rebalance;

import com.tongtech.client.consumer.AllocateMessageQueueStrategy;
import com.tongtech.client.consumer.TLQConsumerInner;
import com.tongtech.client.consumer.common.ConsumeModel;
import com.tongtech.client.consumer.common.ProcessQueue;
import com.tongtech.client.consumer.common.PullRequest;
import com.tongtech.client.consumer.common.SubscriptionData;
import com.tongtech.client.factory.TLQClientInstance;
import com.tongtech.client.producer.TopicBrokerInfo;
import com.tongtech.commons.collections.CollectionUtils;
import com.tongtech.commons.collections.MapUtils;
import com.tongtech.slf4j.Logger;
import com.tongtech.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/* loaded from: input_file:com/tongtech/client/consumer/rebalance/RebalanceImpl.class */
public abstract class RebalanceImpl {
    private static Logger log = LoggerFactory.getLogger((Class<?>) RebalanceImpl.class);
    protected final ConcurrentMap<TopicBrokerInfo, ProcessQueue> processQueueTable = new ConcurrentHashMap(64);
    protected final ConcurrentMap<String, SubscriptionData> subscriptionInnerMap = new ConcurrentHashMap();
    protected final ConcurrentMap<String, Set<TopicBrokerInfo>> topicSubscribeInfoTable = new ConcurrentHashMap();
    protected TLQClientInstance mQClientFactory;
    protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
    protected ConsumeModel consumeModel;
    protected String consumerGroup;

    public RebalanceImpl(TLQClientInstance tLQClientInstance, AllocateMessageQueueStrategy allocateMessageQueueStrategy, ConsumeModel consumeModel, String str) {
        this.mQClientFactory = tLQClientInstance;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        this.consumeModel = consumeModel;
        this.consumerGroup = str;
    }

    public void unlock(TopicBrokerInfo topicBrokerInfo, boolean z) {
    }

    public void doRebalance(boolean z, TLQConsumerInner tLQConsumerInner) {
        if (tLQConsumerInner.getSubscriptionInner() != null && MapUtils.isNotEmpty(this.topicSubscribeInfoTable)) {
            Iterator<Map.Entry<String, Set<TopicBrokerInfo>>> it = this.topicSubscribeInfoTable.entrySet().iterator();
            while (it.hasNext()) {
                rebalanceByTopic(it.next().getKey(), z, tLQConsumerInner);
            }
        }
        truncateMessageQueueNotMyTopic(tLQConsumerInner.getConsumerId());
    }

    public void unSubscriptionInner(String str) {
        if (getSubscriptionInnerMap().get(str) != null) {
            getSubscriptionInnerMap().get(str).setUnSubscribe(true);
        }
    }

    private void rebalanceByTopic(String str, boolean z, TLQConsumerInner tLQConsumerInner) {
        log.debug("---------------------------------------rebalance Topic 【{}】", str);
        log.info(str + " topic.count=" + this.topicSubscribeInfoTable.size());
        Set<TopicBrokerInfo> set = this.topicSubscribeInfoTable.get(str);
        if (set == null) {
            topicBrokerChanged(str, Collections.emptySet(), Collections.emptySet());
            log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, str);
            return;
        }
        log.info(str + " broker.count=" + set.size());
        switch (this.consumeModel) {
            case BROADCASTING:
                if (updateProcessQueueTableInRebalance(str, z, tLQConsumerInner.getConsumerId(), set, tLQConsumerInner.getConsumerGroupName())) {
                    topicBrokerChanged(str, set, set);
                    return;
                }
                return;
            case CLUSTERING:
                List<String> findConsumerIdList = this.mQClientFactory.findConsumerIdList(this.consumerGroup);
                if (CollectionUtils.isNotEmpty(set) && CollectionUtils.isNotEmpty(findConsumerIdList)) {
                    log.info(this.consumerGroup + " consumer.count =" + findConsumerIdList.size());
                    ArrayList arrayList = new ArrayList();
                    arrayList.addAll(set);
                    Collections.sort(arrayList);
                    Collections.sort(findConsumerIdList);
                    log.debug("---------------------------------------start");
                    arrayList.forEach(topicBrokerInfo -> {
                        log.debug("brokername = " + topicBrokerInfo.getBrokerName());
                    });
                    findConsumerIdList.forEach(str2 -> {
                        log.debug("cid = " + str2);
                    });
                    AllocateMessageQueueStrategy allocateMessageQueueStrategy = this.allocateMessageQueueStrategy;
                    log.debug("AllocateMessageName = " + allocateMessageQueueStrategy.getName());
                    try {
                        List<TopicBrokerInfo> allocate = allocateMessageQueueStrategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), arrayList, findConsumerIdList);
                        HashSet hashSet = new HashSet();
                        if (allocate != null) {
                            hashSet.addAll(allocate);
                        }
                        log.info(this.mQClientFactory.getClientId() + " after allocate.count = " + hashSet.size());
                        hashSet.forEach(topicBrokerInfo2 -> {
                            log.debug(this.mQClientFactory.getClientId() + " -> " + topicBrokerInfo2.getBrokerName());
                        });
                        log.debug("-----------------------------------------end");
                        if (updateProcessQueueTableInRebalance(str, z, tLQConsumerInner.getConsumerId(), hashSet, tLQConsumerInner.getConsumerGroupName())) {
                            topicBrokerChanged(str, set, hashSet);
                            return;
                        }
                        return;
                    } catch (Throwable th) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", allocateMessageQueueStrategy.getName(), th);
                        return;
                    }
                }
                return;
            default:
                return;
        }
    }

    private void truncateMessageQueueNotMyTopic(String str) {
        ConcurrentMap<String, SubscriptionData> subscriptionInnerMap = getSubscriptionInnerMap();
        for (TopicBrokerInfo topicBrokerInfo : this.processQueueTable.keySet()) {
            if ((subscriptionInnerMap != null && !subscriptionInnerMap.containsKey(TLQClientInstance.getTopicOrQueue(topicBrokerInfo))) || (subscriptionInnerMap != null && subscriptionInnerMap.get(TLQClientInstance.getTopicOrQueue(topicBrokerInfo)).isUnSubscribe() && subscriptionInnerMap.containsKey(TLQClientInstance.getTopicOrQueue(topicBrokerInfo)))) {
                ProcessQueue remove = this.processQueueTable.remove(topicBrokerInfo);
                if (remove != null) {
                    remove.setDropped(true);
                    log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", str, topicBrokerInfo);
                }
            }
        }
    }

    private boolean updateProcessQueueTableInRebalance(String str, boolean z, String str2, Set<TopicBrokerInfo> set, String str3) {
        boolean z2 = false;
        Iterator<Map.Entry<TopicBrokerInfo, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TopicBrokerInfo, ProcessQueue> next = it.next();
            TopicBrokerInfo key = next.getKey();
            ProcessQueue value = next.getValue();
            if (key.getTopicName().equals(str) && !set.contains(key)) {
                value.setDropped(true);
                if (removeUnnecessaryTopicBrokerInfo(key, value)) {
                    it.remove();
                    z2 = true;
                    log.info("doRebalance, {}, remove unnecessary mq, {}", this.consumerGroup, key);
                }
            }
        }
        ArrayList arrayList = new ArrayList();
        for (TopicBrokerInfo topicBrokerInfo : set) {
            if (!this.processQueueTable.containsKey(topicBrokerInfo)) {
                ProcessQueue processQueue = new ProcessQueue();
                processQueue.setLocked(true);
                if (this.processQueueTable.putIfAbsent(topicBrokerInfo, processQueue) != null) {
                    log.info("doRequest, {}, mq already exists, {}", this.consumerGroup, topicBrokerInfo);
                } else {
                    log.info("doRequest, {}, add a new mq, {}", this.consumerGroup, topicBrokerInfo);
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerId(str2);
                    pullRequest.setMessageQueue(topicBrokerInfo);
                    pullRequest.setProcessQueue(processQueue);
                    pullRequest.setGroup(str3);
                    arrayList.add(pullRequest);
                    z2 = true;
                }
            }
        }
        log.info(str + " allocate size = " + arrayList.size());
        arrayList.forEach(pullRequest2 -> {
            log.info(pullRequest2.toString());
        });
        dispatchPullRequest(arrayList);
        return z2;
    }

    public abstract boolean removeUnnecessaryTopicBrokerInfo(TopicBrokerInfo topicBrokerInfo, ProcessQueue processQueue);

    public abstract void topicBrokerChanged(String str, Set<TopicBrokerInfo> set, Set<TopicBrokerInfo> set2);

    public abstract void dispatchPullRequest(List<PullRequest> list);

    public ConcurrentMap<TopicBrokerInfo, ProcessQueue> getProcessQueueTable() {
        return this.processQueueTable;
    }

    public TLQClientInstance getmQClientFactory() {
        return this.mQClientFactory;
    }

    public void setmQClientFactory(TLQClientInstance tLQClientInstance) {
        this.mQClientFactory = tLQClientInstance;
    }

    public void destroy() {
        Iterator<Map.Entry<TopicBrokerInfo, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().setDropped(true);
        }
        this.processQueueTable.clear();
    }

    public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
        return this.allocateMessageQueueStrategy;
    }

    public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    }

    public ConsumeModel getMessageModel() {
        return this.consumeModel;
    }

    public void setMessageModel(ConsumeModel consumeModel) {
        this.consumeModel = consumeModel;
    }

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    public void setConsumerGroup(String str) {
        this.consumerGroup = str;
    }

    public ConcurrentMap<String, Set<TopicBrokerInfo>> getTopicSubscribeInfoTable() {
        return this.topicSubscribeInfoTable;
    }

    public ConcurrentMap<String, SubscriptionData> getSubscriptionInnerMap() {
        return this.subscriptionInnerMap;
    }
}
