package com.tongtech.client.consumer.service;

import com.tongtech.client.common.ClientErrorCode;
import com.tongtech.client.common.CommunicationMode;
import com.tongtech.client.consumer.PullCallback;
import com.tongtech.client.consumer.PullFileCallback;
import com.tongtech.client.consumer.PullFileResult;
import com.tongtech.client.consumer.PullResult;
import com.tongtech.client.consumer.PullStatus;
import com.tongtech.client.consumer.TLQConsumerInner;
import com.tongtech.client.consumer.TLQConsumerPullInner;
import com.tongtech.client.consumer.common.ConsumeModel;
import com.tongtech.client.consumer.common.ConsumerCommon;
import com.tongtech.client.consumer.common.ConsumerRelationInfo;
import com.tongtech.client.consumer.common.DownloadRequest;
import com.tongtech.client.consumer.common.DownloadResponse;
import com.tongtech.client.consumer.common.PullMessageRequest;
import com.tongtech.client.exception.TLQBrokerException;
import com.tongtech.client.exception.TLQClientException;
import com.tongtech.client.factory.TLQClientInstance;
import com.tongtech.client.message.MessageOffset;
import com.tongtech.client.producer.TopicBrokerInfo;
import com.tongtech.client.remoting.common.IpUtils;
import com.tongtech.client.remoting.enums.ResponseCode;
import com.tongtech.client.remoting.exception.RemotingException;
import com.tongtech.client.remoting.exception.RemotingTooMuchRequestException;
import com.tongtech.client.remoting.version.VersionController;
import com.tongtech.client.utils.Validators;
import com.tongtech.htp.client.proto.CommonHeader;
import com.tongtech.slf4j.Logger;
import com.tongtech.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/tongtech/client/consumer/service/PullAPIWrapper.class */
public class PullAPIWrapper {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PullAPIWrapper.class);
    private final TLQClientInstance mQClientFactory;
    private final String consumerGroup;

    public PullAPIWrapper(TLQClientInstance tLQClientInstance, String str) {
        this.mQClientFactory = tLQClientInstance;
        this.consumerGroup = str;
    }

    public PullResult pullMessageKernelImpl(TopicBrokerInfo topicBrokerInfo, MessageOffset messageOffset, String str, TLQConsumerPullInner tLQConsumerPullInner, int i, long j, CommunicationMode communicationMode, PullCallback pullCallback) throws TLQClientException, RemotingException, TLQBrokerException, InterruptedException {
        if (topicBrokerInfo == null || !topicBrokerInfo.ok()) {
            throw new TLQClientException("The broker node corresponding to topic is not available!", (Throwable) null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
        }
        String addr = this.mQClientFactory.getAddr(topicBrokerInfo);
        PullMessageRequest pullMessageRequest = getPullMessageRequest(str, tLQConsumerPullInner.getConsumerId(), Validators.isEmpty(topicBrokerInfo.getTopicName()) ? topicBrokerInfo.getQueueName() : topicBrokerInfo.getTopicName(), tLQConsumerPullInner.getConsumerGroupName(), topicBrokerInfo.getDomain(), i, tLQConsumerPullInner.getRecvBufSize(), messageOffset);
        pullMessageRequest.setAutoCommit(tLQConsumerPullInner.autoCommit());
        pullMessageRequest.setBrokerName(topicBrokerInfo.getBrokerName());
        pullMessageRequest.setConsumeModeInt(ConsumerCommon.getConsumeModelInt(tLQConsumerPullInner.getMessageModel(), tLQConsumerPullInner.getPullType(), tLQConsumerPullInner.getAllocateStrategy()));
        pullMessageRequest.setTagFilter(tLQConsumerPullInner.getTagFilter());
        PullResult pullMessage = this.mQClientFactory.getMQClientAPIImpl().pullMessage(addr, pullMessageRequest, j, communicationMode, pullCallback);
        if (pullMessage != null && pullMessage.getPullStatus() == PullStatus.MSG_DELETED && tLQConsumerPullInner.getMessageModel().equals(ConsumeModel.BROADCASTING)) {
            log.info("repull offset=" + messageOffset + " minOffset=" + pullMessage.getMinConsumeQueueOffset());
            pullMessageRequest.setOffset(pullMessage.getMinConsumeOffset());
            pullMessage = this.mQClientFactory.getMQClientAPIImpl().pullMessage(addr, pullMessageRequest, j, communicationMode, pullCallback);
        }
        return pullMessage;
    }

    public PullResult pullSuspendMessageKernelImpl(TopicBrokerInfo topicBrokerInfo, MessageOffset messageOffset, String str, TLQConsumerPullInner tLQConsumerPullInner, int i, long j, long j2, CommunicationMode communicationMode, PullCallback pullCallback) throws TLQClientException, RemotingException, TLQBrokerException, InterruptedException {
        if (topicBrokerInfo == null || !topicBrokerInfo.ok()) {
            throw new TLQClientException("The broker node corresponding to topic is not available!", (Throwable) null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
        }
        String addr = this.mQClientFactory.getAddr(topicBrokerInfo);
        PullMessageRequest pullMessageRequest = getPullMessageRequest(str, tLQConsumerPullInner.getConsumerId(), Validators.isEmpty(topicBrokerInfo.getTopicName()) ? topicBrokerInfo.getQueueName() : topicBrokerInfo.getTopicName(), tLQConsumerPullInner.getConsumerGroupName(), topicBrokerInfo.getDomain(), i, tLQConsumerPullInner.getRecvBufSize(), messageOffset);
        pullMessageRequest.setAutoCommit(tLQConsumerPullInner.autoCommit());
        pullMessageRequest.setBrokerName(topicBrokerInfo.getBrokerName());
        pullMessageRequest.setConsumeModeInt(ConsumerCommon.getConsumeModelInt(tLQConsumerPullInner.getMessageModel(), tLQConsumerPullInner.getPullType(), tLQConsumerPullInner.getAllocateStrategy()));
        pullMessageRequest.setTagFilter(tLQConsumerPullInner.getTagFilter());
        if (j2 > j - 2000) {
            j = j2 + 2000;
        }
        int i2 = ((int) j2) / 1000;
        pullMessageRequest.setSuspendTimeout(i2);
        log.info("start SuspendTimeout Query suspendTimeout=" + i2);
        PullResult pullMessage = this.mQClientFactory.getMQClientAPIImpl().pullMessage(addr, pullMessageRequest, j, CommunicationMode.SYNC, pullCallback);
        log.info("-1 SuspendTimeout  Query result = " + pullMessage.getStatueCode());
        if (pullMessage.aclErrorCode()) {
            TimeUnit.MILLISECONDS.sleep(j2);
            return pullMessage;
        }
        PullResult pullResult = topicNotExistSuspend(pullCallback, addr, pullMessageRequest, j, i2, pullMessage);
        if (PullStatus.FOUND.equals(pullResult.getPullStatus())) {
            log.info("msg count = " + pullResult.getMsgFoundList().size());
            return pullResult;
        }
        pullMessageRequest.setSuspendTimeout(0);
        return this.mQClientFactory.getMQClientAPIImpl().pullMessage(addr, pullMessageRequest, j, communicationMode, pullCallback);
    }

    public PullFileResult pullFileMessageKernelImpl(TopicBrokerInfo topicBrokerInfo, String str, TLQConsumerPullInner tLQConsumerPullInner, int i, MessageOffset messageOffset, long j, CommunicationMode communicationMode, PullFileCallback pullFileCallback) throws TLQClientException, RemotingException, TLQBrokerException, InterruptedException {
        if (topicBrokerInfo == null || !topicBrokerInfo.ok()) {
            throw new TLQClientException("topic[" + topicBrokerInfo.getTopicName() + "]对应的broker节点不可用!", (Throwable) null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
        }
        return this.mQClientFactory.getMQClientAPIImpl().pullFileMessage(this.mQClientFactory.getAddr(topicBrokerInfo), getPullMessageRequest(str, tLQConsumerPullInner.getConsumerId(), topicBrokerInfo.getTopicName(), tLQConsumerPullInner.getConsumerGroupName(), topicBrokerInfo.getDomain(), i, tLQConsumerPullInner.getRecvBufSize(), messageOffset), j, communicationMode, pullFileCallback);
    }

    public DownloadResponse downloadFileRequest(TopicBrokerInfo topicBrokerInfo, String str, String str2, String str3, long j, long j2, long j3, long j4) throws TLQClientException, RemotingException, TLQBrokerException, InterruptedException {
        if (topicBrokerInfo == null || !topicBrokerInfo.ok()) {
            throw new TLQClientException("topic[" + topicBrokerInfo.getTopicName() + "]对应的broker节点不可用!", (Throwable) null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
        }
        return this.mQClientFactory.getMQClientAPIImpl().downloadFileMessage(this.mQClientFactory.getAddr(topicBrokerInfo), getDownloadRequest(str, str2, str3, j, j2, j3), j4);
    }

    public void pullKernelImpl(TopicBrokerInfo topicBrokerInfo, MessageOffset messageOffset, String str, TLQConsumerInner tLQConsumerInner, String str2, String str3, String str4, int i, long j, CommunicationMode communicationMode, PullCallback pullCallback) throws TLQClientException, RemotingException, TLQBrokerException, InterruptedException {
        if (topicBrokerInfo == null || !topicBrokerInfo.ok()) {
            throw new TLQClientException("topic[” + topic + ”] corresponding broker node is not available!", (Throwable) null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
        }
        String addr = IpUtils.getAddr(topicBrokerInfo);
        PullMessageRequest pullMessageRequest = getPullMessageRequest(str, tLQConsumerInner.getConsumerId(), str3, str2, str4, i, tLQConsumerInner.getRecvBufSize(), messageOffset);
        pullMessageRequest.setBrokerName(topicBrokerInfo.getBrokerName());
        pullMessageRequest.setConsumeModeInt(ConsumerCommon.getConsumeModelInt(tLQConsumerInner.getMessageModel(), tLQConsumerInner.getPullType(), tLQConsumerInner.getAllocateStrategy()));
        pullMessageRequest.setTagFilter(tLQConsumerInner.getTagFilter());
        this.mQClientFactory.getMQClientAPIImpl().pullMessage(addr, pullMessageRequest, j, communicationMode, pullCallback);
    }

    public void pullAsyncSuspendMessageKernelImpl(TopicBrokerInfo topicBrokerInfo, MessageOffset messageOffset, String str, TLQConsumerInner tLQConsumerInner, String str2, int i, long j, CommunicationMode communicationMode, PullCallback pullCallback, long j2, ExecutorService executorService) throws TLQClientException {
        if (topicBrokerInfo == null || !topicBrokerInfo.ok()) {
            throw new TLQClientException("The topic's corresponding broker node is unavailable!", (Throwable) null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
        }
        long currentTimeMillis = System.currentTimeMillis();
        try {
            executorService.submit(() -> {
                if (j <= System.currentTimeMillis() - currentTimeMillis) {
                    pullCallback.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC pull call timeout"));
                    return;
                }
                try {
                    pullAsyncSuspendImpl(topicBrokerInfo, messageOffset, str, tLQConsumerInner, str2, i, j, communicationMode, pullCallback, j2);
                } catch (Exception e) {
                    pullCallback.onException(e);
                }
            });
        } catch (RejectedExecutionException e) {
            throw new TLQClientException("executor rejected ", e);
        }
    }

    public void pullAsyncSuspendImpl(TopicBrokerInfo topicBrokerInfo, MessageOffset messageOffset, String str, TLQConsumerInner tLQConsumerInner, String str2, int i, long j, CommunicationMode communicationMode, PullCallback pullCallback, long j2) throws InterruptedException, TLQBrokerException, RemotingException, TLQClientException {
        int consumeModelInt = ConsumerCommon.getConsumeModelInt(tLQConsumerInner.getMessageModel(), tLQConsumerInner.getPullType(), tLQConsumerInner.getAllocateStrategy());
        String addr = IpUtils.getAddr(topicBrokerInfo);
        PullMessageRequest pullMessageRequest = getPullMessageRequest(str, tLQConsumerInner.getConsumerId(), str2, tLQConsumerInner.getConsumerGroupName(), tLQConsumerInner.getDomain(), i, tLQConsumerInner.getRecvBufSize(), messageOffset);
        pullMessageRequest.setBrokerName(topicBrokerInfo.getBrokerName());
        pullMessageRequest.setConsumeModeInt(consumeModelInt);
        long j3 = j;
        if (j2 > j3 - 2000) {
            j3 = j2 + 2000;
        }
        int i2 = ((int) j2) / 1000;
        pullMessageRequest.setSuspendTimeout(i2);
        pullMessageRequest.setTagFilter(tLQConsumerInner.getTagFilter());
        log.info("start SuspendTimeout Query suspendTimeout=" + i2);
        PullResult pullMessage = this.mQClientFactory.getMQClientAPIImpl().pullMessage(addr, pullMessageRequest, j3, CommunicationMode.SYNC, pullCallback);
        log.info("-1 SuspendTimeout  Query result = " + pullMessage.getStatueCode());
        if (pullMessage.aclErrorCode()) {
            TimeUnit.MILLISECONDS.sleep(j2);
            pullCallback.onSuccess(pullMessage);
            return;
        }
        if (ResponseCode.CB_MSG_WAITACK.getStateCode() == pullMessage.getStatueCode() && VersionController.getServerVersion(CommonHeader.HtpCode.CB_CONSUMER_PULL_REQ_VALUE, addr) == 0) {
            TimeUnit.MILLISECONDS.sleep(j2);
        }
        PullResult pullResult = topicNotExistSuspend(pullCallback, addr, pullMessageRequest, j3, i2, pullMessage);
        if (PullStatus.FOUND.equals(pullResult.getPullStatus())) {
            log.info("msg count = " + pullResult.getMsgFoundList().size());
            pullCallback.onSuccess(pullResult);
        } else {
            pullMessageRequest.setSuspendTimeout(0);
            this.mQClientFactory.getMQClientAPIImpl().pullMessage(addr, pullMessageRequest, j, communicationMode, pullCallback);
        }
    }

    private PullResult topicNotExistSuspend(PullCallback pullCallback, String str, PullMessageRequest pullMessageRequest, long j, int i, PullResult pullResult) throws InterruptedException, RemotingException, TLQBrokerException, TLQClientException {
        if (CommonHeader.StatusCode.CB_TOPIC_NOT_EXIST.getNumber() == pullResult.getStatueCode()) {
            for (int i2 = 0; i2 < i; i2++) {
                Thread.sleep(1000L);
                pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(str, pullMessageRequest, j, CommunicationMode.SYNC, pullCallback);
                log.info(i2 + " SuspendTimeout  Query result = " + pullResult.getStatueCode());
                if (CommonHeader.StatusCode.CB_TOPIC_NOT_EXIST.getNumber() != pullResult.getStatueCode()) {
                    break;
                }
            }
        }
        return pullResult;
    }

    private DownloadRequest getDownloadRequest(String str, String str2, String str3, long j, long j2, long j3) {
        DownloadRequest downloadRequest = new DownloadRequest();
        downloadRequest.setMsgId(str);
        downloadRequest.setFileName(str2);
        downloadRequest.setConsumerId(str3);
        downloadRequest.setBeginOffset(j);
        downloadRequest.setEndOffset(j2);
        downloadRequest.setFileId(j3);
        return downloadRequest;
    }

    private PullMessageRequest getPullMessageRequest(String str, String str2, String str3, String str4, String str5, int i, int i2, MessageOffset messageOffset) {
        PullMessageRequest pullMessageRequest = new PullMessageRequest();
        pullMessageRequest.setClientId(str);
        pullMessageRequest.setConsumerId(str2);
        pullMessageRequest.setTopic(str3);
        pullMessageRequest.setGroupName(str4);
        pullMessageRequest.setPullNum(i);
        pullMessageRequest.setRecvBufSize(i2);
        pullMessageRequest.setConsumeFlag(0);
        pullMessageRequest.setQueid(0);
        pullMessageRequest.setOffset(messageOffset);
        if (!Validators.isEmpty(str5)) {
            pullMessageRequest.setDomain(str5);
        }
        return pullMessageRequest;
    }

    public void processPullResult(PullResult pullResult, ConsumerRelationInfo consumerRelationInfo, TopicBrokerInfo topicBrokerInfo) {
        if (consumerRelationInfo != null) {
            pullResult.setClientId(consumerRelationInfo.getClientId());
            pullResult.setGroupName(consumerRelationInfo.getConsumerGroup());
            pullResult.setDomain(consumerRelationInfo.getDomain());
            pullResult.setTopic(Validators.isEmpty(topicBrokerInfo.getTopicName()) ? topicBrokerInfo.getQueueName() : topicBrokerInfo.getTopicName());
        }
    }
}
