package com.tongtech.client.htp.producer;

import com.tongtech.client.common.BreakPointState;
import com.tongtech.client.common.BrokerSelector;
import com.tongtech.client.common.ClientErrorCode;
import com.tongtech.client.common.ClientRegisterType;
import com.tongtech.client.common.CommunicationMode;
import com.tongtech.client.common.MixAll;
import com.tongtech.client.common.ModeType;
import com.tongtech.client.common.ServiceState;
import com.tongtech.client.common.UtilAll;
import com.tongtech.client.exception.HTPClientException;
import com.tongtech.client.exception.HTPException;
import com.tongtech.client.exception.TLQClientException;
import com.tongtech.client.htp.producer.container.BatchMessageContainerBase;
import com.tongtech.client.htp.producer.container.BatchMessageContainerImpl;
import com.tongtech.client.htp.producer.container.ResultFuture;
import com.tongtech.client.message.Message;
import com.tongtech.client.producer.SendBatchCallback;
import com.tongtech.client.producer.SendBatchResult;
import com.tongtech.client.producer.SendCallback;
import com.tongtech.client.producer.SendResult;
import com.tongtech.client.producer.SendStatus;
import com.tongtech.client.producer.TLQProducerAbstract;
import com.tongtech.client.producer.TLQProducerInner;
import com.tongtech.client.producer.TopicBrokerInfo;
import com.tongtech.client.remoting.enums.ResponseCode;
import com.tongtech.client.remoting.exception.RemotingConnectException;
import com.tongtech.client.remoting.exception.RemotingSendRequestException;
import com.tongtech.client.remoting.exception.RemotingTimeoutException;
import com.tongtech.client.remoting.exception.RemotingTooMuchRequestException;
import com.tongtech.client.remoting.netty.NettyRemotingAbstract;
import com.tongtech.client.remoting.netty.ProtocolType;
import com.tongtech.client.trace.AsyncTraceDispatcher;
import com.tongtech.client.trace.TraceDispatcher;
import com.tongtech.client.trace.hook.SendMessageTraceHookImpl;
import com.tongtech.client.utils.MessageIdUtils;
import com.tongtech.client.utils.Validators;
import com.tongtech.commons.collections.CollectionUtils;
import com.tongtech.htp.client.proto.CommonHeader;
import com.tongtech.slf4j.Logger;
import com.tongtech.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/* loaded from: input_file:com/tongtech/client/htp/producer/HtpProducerImpl.class */
public class HtpProducerImpl extends TLQProducerAbstract implements TLQProducerInner {
    private static Logger log = LoggerFactory.getLogger((Class<?>) HtpProducerImpl.class);
    private final HtpProducer htpProducer;
    private final BatchMessageContainerBase batchMessageContainer;
    private final ScheduledExecutorService sendMessageExecutorService;
    private final int putGet = 0;
    private final String producerId;
    private String producerGroup;
    private int maxMsgSize;
    private int maxBatchSize;
    private ModeType modeType;
    private long batchTimeout;
    private int sendMsgTimeout;
    private String topic;
    private long batchDelayMs;
    private ClientRegisterType clientRegisterType;
    private boolean innerBatch;
    private final Lock batchLock;
    private final Condition canSend;
    private volatile boolean sending;
    private AsyncTraceDispatcher traceDispatcher;

    public HtpProducerImpl(HtpProducer htpProducer) {
        this.batchMessageContainer = new BatchMessageContainerImpl();
        this.putGet = 0;
        this.producerId = MessageIdUtils.getRandomUUID(16);
        this.producerGroup = MixAll.DEFAULT_PRODUCER_GROUP;
        this.maxMsgSize = 4194304;
        this.maxBatchSize = 200;
        this.modeType = ModeType.TOPIC;
        this.batchTimeout = NettyRemotingAbstract.LOCK_TIMEOUT_MILLIS;
        this.sendMsgTimeout = CommonHeader.StatusCode.CB_PROTOCOL_VERSION_MISMATCH_VALUE;
        this.batchDelayMs = TimeUnit.MILLISECONDS.toMillis(30L);
        this.clientRegisterType = ClientRegisterType.PUBLISH_SUBSCRIBE;
        this.innerBatch = false;
        this.batchLock = new ReentrantLock(true);
        this.canSend = this.batchLock.newCondition();
        this.sending = false;
        this.traceDispatcher = null;
        this.htpProducer = htpProducer;
        this.sendMessageExecutorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
            return new Thread(runnable, "HTPSendThread");
        });
    }

    public HtpProducerImpl(HtpProducer htpProducer, String str) {
        this.batchMessageContainer = new BatchMessageContainerImpl();
        this.putGet = 0;
        this.producerId = MessageIdUtils.getRandomUUID(16);
        this.producerGroup = MixAll.DEFAULT_PRODUCER_GROUP;
        this.maxMsgSize = 4194304;
        this.maxBatchSize = 200;
        this.modeType = ModeType.TOPIC;
        this.batchTimeout = NettyRemotingAbstract.LOCK_TIMEOUT_MILLIS;
        this.sendMsgTimeout = CommonHeader.StatusCode.CB_PROTOCOL_VERSION_MISMATCH_VALUE;
        this.batchDelayMs = TimeUnit.MILLISECONDS.toMillis(30L);
        this.clientRegisterType = ClientRegisterType.PUBLISH_SUBSCRIBE;
        this.innerBatch = false;
        this.batchLock = new ReentrantLock(true);
        this.canSend = this.batchLock.newCondition();
        this.sending = false;
        this.traceDispatcher = null;
        this.htpProducer = htpProducer;
        this.sendMessageExecutorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
            return new Thread(runnable, "HTPSendThread");
        });
        this.topic = str;
    }

    public void start() throws HTPException {
        try {
            if (UtilAll.isNotBlank(this.topic)) {
                Validators.checkTopic(this.topic);
            }
            start(true, this.htpProducer.getClientConfig(), this);
            if (this.innerBatch) {
                this.batchMessageContainer.setProducer(this.maxBatchSize, this.maxMsgSize);
                this.sendMessageExecutorService.scheduleAtFixedRate(() -> {
                    try {
                        if (getServiceState() != ServiceState.RUNNING) {
                            return;
                        }
                        batchMessageAndSend();
                    } catch (Exception e) {
                        log.error("ScheduledTask batchMessageAndSend exception", (Throwable) e);
                    }
                }, 0L, this.batchDelayMs, TimeUnit.MILLISECONDS);
                log.info("Init Inner Batch Container=>maxBatchSize:{}，maxMsgSize:{}，batchDelayMs:{}", Integer.valueOf(this.maxBatchSize), Integer.valueOf(this.maxMsgSize), Long.valueOf(this.batchDelayMs));
            }
            try {
                if (checkNamesvrVersion()) {
                    return;
                }
                Properties properties = (Properties) getmQClientFactory().getMQClientAPIImpl().queryNameserverConfig(getChooseNamesrv(), this.htpProducer.getConnectTimeoutMills()).getData();
                String property = properties.getProperty("msg_trace.enable");
                String property2 = properties.getProperty("msg_trace.client_addr");
                String property3 = properties.getProperty("msg_trace.server_addr");
                if (!"1".equals(property)) {
                    log.info("message trace not enabled!");
                    return;
                }
                log.info("Checking URL reachability for client: {}, broker: {}", property2, property3);
                if (UtilAll.isBlank(property2) && UtilAll.isBlank(property3)) {
                    log.warn("Both client and broker addresses are blank");
                }
                try {
                    this.traceDispatcher = new AsyncTraceDispatcher(this.producerGroup, TraceDispatcher.Type.PRODUCE, (int) this.htpProducer.getConnectTimeoutMills());
                    registerSendMessageHook(new SendMessageTraceHookImpl(this.traceDispatcher));
                    this.traceDispatcher.start(property2, property3);
                    log.info("registerSendMessageHook and start traceDispatcher success!");
                } catch (Exception e) {
                    log.error("trace dispatcher init failed ", (Throwable) e);
                }
            } catch (Exception e2) {
                log.warn("Failed to query trace configuration items", (Throwable) e2);
            }
        } catch (InterruptedException e3) {
            e3.printStackTrace();
            throw new HTPException("Client Abnormal Interruptions", e3);
        }
    }

    private boolean checkNamesvrVersion() throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, TLQClientException {
        if (getmQClientFactory().getMQClientAPIImpl().isHTP202Nameserver(getChooseNamesrv(), this.htpProducer.getConnectTimeoutMills())) {
            log.warn(String.valueOf(ResponseCode.SERVER_VER_LOW));
            return true;
        }
        if (getmQClientFactory().getMQClientAPIImpl().getNameSrvProtocolVersion(17, getChooseNamesrv(), this.htpProducer.getConnectTimeoutMills()) != null) {
            return false;
        }
        log.warn(String.valueOf(ResponseCode.SERVER_VER_LOW));
        return true;
    }

    public void start(boolean z) throws TLQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        start(z, this.htpProducer.getClientConfig(), this);
    }

    public void shutdown() {
        if (this.innerBatch) {
            this.sendMessageExecutorService.shutdown();
        }
        shutdown(true, this, getAsyncSenderExecutor());
        if (null != this.traceDispatcher) {
            this.traceDispatcher.shutdown();
        }
    }

    public void shutdown(boolean z) {
        if (this.innerBatch) {
            this.sendMessageExecutorService.shutdown();
        }
        shutdown(z, this, getAsyncSenderExecutor());
        if (null != this.traceDispatcher) {
            this.traceDispatcher.shutdown();
        }
    }

    public void shutdown(long j) {
        if (this.innerBatch) {
            this.sendMessageExecutorService.shutdown();
        }
        shutdown(true, this, getAsyncSenderExecutor(), j);
        if (null != this.traceDispatcher) {
            this.traceDispatcher.shutdown();
        }
    }

    public SendResult send(Message message) throws HTPException {
        return send(message, this.innerBatch ? this.batchTimeout : this.sendMsgTimeout);
    }

    public SendResult send(Message message, long j) throws HTPException {
        long checkMsgAndSet = checkMsgAndSet(message, j);
        return (!this.innerBatch || Validators.delayMsg(message)) ? sendImpl(message, CommunicationMode.SYNC, null, j - checkMsgAndSet, this) : batchAndSendMessageSync(message, new ResultFuture(j - checkMsgAndSet));
    }

    private SendResult sendImpl(Message message, CommunicationMode communicationMode, SendCallback sendCallback, long j, TLQProducerInner tLQProducerInner) throws HTPException {
        try {
            return sendDefaultImpl(message, communicationMode, sendCallback, j, tLQProducerInner);
        } catch (HTPException | InterruptedException e) {
            e.printStackTrace();
            throw new HTPException(e);
        }
    }

    public void send(Message message, SendCallback sendCallback) throws HTPException {
        send(message, sendCallback, this.innerBatch ? this.batchTimeout : this.sendMsgTimeout);
    }

    public void send(Message message, SendCallback sendCallback, long j) throws HTPException {
        long checkMsgAndSet = checkMsgAndSet(message, j);
        if (!this.innerBatch || Validators.delayMsg(message)) {
            sendAsync(message, sendCallback, j - checkMsgAndSet, this);
        } else {
            batchAndSendMessageAsync(message, new ResultFuture(j - checkMsgAndSet, sendCallback));
        }
    }

    public void sendOneway(Message message) throws HTPException {
        long checkMsgAndSet = checkMsgAndSet(message, getBatchTimeout());
        if (!this.innerBatch || Validators.delayMsg(message)) {
            sendImpl(message, CommunicationMode.ONEWAY, null, this.sendMsgTimeout - checkMsgAndSet, this);
        } else {
            batchAndSendMessageOneWay(message, new ResultFuture(this.batchTimeout - checkMsgAndSet));
        }
    }

    public SendResult send(Message message, BrokerSelector brokerSelector, long j) throws HTPException, InterruptedException {
        return sendDefaultImpl(message, CommunicationMode.SYNC, null, j - checkMsgAndSet(message, j), this, Optional.ofNullable(brokerSelector));
    }

    public SendBatchResult sendBatch(Collection<Message> collection, long j) throws HTPException, InterruptedException {
        return sendBatchDefaultImpl(collection, CommunicationMode.SYNC, null, j - checkMsgAndSet(collection, j), this);
    }

    public void sendBatch(Collection<Message> collection, SendBatchCallback sendBatchCallback, long j) throws HTPException, InterruptedException {
        sendBatchAsync(collection, sendBatchCallback, j - checkMsgAndSet(collection, j), this);
    }

    public void sendBatch(Collection<Message> collection, SendBatchCallback sendBatchCallback, long j, BrokerSelector brokerSelector) throws HTPException, InterruptedException {
        sendBatchAsync(collection, sendBatchCallback, j - checkMsgAndSet(collection, j), this, Optional.ofNullable(brokerSelector));
    }

    private long checkMsgAndSet(Message message, long j) throws HTPClientException, RemotingTooMuchRequestException {
        long currentTimeMillis = System.currentTimeMillis();
        if (getServiceState() != ServiceState.RUNNING) {
            throw new HTPClientException("The producer service state not OK, " + getServiceState(), (Throwable) null);
        }
        setDefaultTopic(message);
        Validators.checkMessage(message, this);
        setMessageInfo(message, this);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        if (j >= currentTimeMillis2) {
            return currentTimeMillis2;
        }
        log.error("HtpProducerImpl call timeout");
        throw new RemotingTooMuchRequestException("HtpProducerImpl call timeout");
    }

    private long checkMsgAndSet(Collection<Message> collection, long j) throws HTPClientException, RemotingTooMuchRequestException {
        long currentTimeMillis = System.currentTimeMillis();
        if (getServiceState() != ServiceState.RUNNING) {
            throw new HTPClientException("The producer service state not OK, " + getServiceState(), (Throwable) null);
        }
        if (null == collection || collection.isEmpty()) {
            log.error("the batch message is null!");
            throw new TLQClientException(ClientErrorCode.MESSAGE_ILLEGAL, "the batch message is null");
        }
        int i = 0;
        boolean z = false;
        boolean z2 = false;
        for (Message message : collection) {
            setDefaultTopic(message);
            if (Validators.delayMsg(message)) {
                z = true;
            } else {
                z2 = true;
            }
            if (z && z2) {
                log.error("Batch messages do not support both normal and delayed messages!");
                throw new TLQClientException(ClientErrorCode.MESSAGE_ILLEGAL, "Batch messages do not support both normal and delayed messages");
            }
            setMessageInfo(message, this);
            Validators.checkMessage(message, this);
            i += getMessageSize(message);
        }
        Validators.checkBatchMessage(i, this);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        if (j >= currentTimeMillis2) {
            return currentTimeMillis2;
        }
        log.error("HtpProducerImpl sendBatch call timeout");
        throw new RemotingTooMuchRequestException("HtpProducerImpl sendBatch call timeout");
    }

    private void setDefaultTopic(Message message) {
        if (UtilAll.isBlank(message.getTopic())) {
            message.setTopic(this.topic);
        }
    }

    private int getMessageSize(Message message) {
        return message.getBody().length + Validators.getAttrMessageSize(message);
    }

    public void setCallbackExecutor(ExecutorService executorService) {
        getmQClientFactory().getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(executorService);
    }

    public HtpProducer getHtpProducer() {
        return this.htpProducer;
    }

    public void setSendLatencyFaultEnable(boolean z) {
        getMqFaultStrategy().setSendLatencyFaultEnable(z);
    }

    private void batchAndSendMessageOneWay(Message message, ResultFuture resultFuture) throws HTPException {
        try {
            boolean tryLockWithRetry = tryLockWithRetry(resultFuture.getTimeoutMillis());
            if (!tryLockWithRetry) {
                throw new HTPException("Unable to acquire lock within timeout");
            }
            if (!canAddToCurrentBatch(message)) {
                batchMessageAndSend();
                this.batchMessageContainer.addMsg(message, resultFuture, CommunicationMode.ONEWAY);
            } else if (this.batchMessageContainer.addMsg(message, resultFuture, CommunicationMode.ONEWAY)) {
                batchMessageAndSend();
            }
            if (tryLockWithRetry) {
                this.batchLock.unlock();
            }
            resultFuture.waitResponse(this.sendMsgTimeout, getRetryTimesWhenSendFailed());
        } catch (Throwable th) {
            if (0 != 0) {
                this.batchLock.unlock();
            }
            throw th;
        }
    }

    private SendResult batchAndSendMessageSync(Message message, ResultFuture resultFuture) throws HTPException {
        try {
            boolean tryLockWithRetry = tryLockWithRetry(resultFuture.getTimeoutMillis());
            if (!tryLockWithRetry) {
                SendResult sendResult = new SendResult(SendStatus.SEND_FAILED, message.getMsgId());
                if (tryLockWithRetry) {
                    this.batchLock.unlock();
                }
                return sendResult;
            }
            if (!canAddToCurrentBatch(message)) {
                batchMessageAndSend();
                this.batchMessageContainer.addMsg(message, resultFuture, CommunicationMode.SYNC);
            } else if (this.batchMessageContainer.addMsg(message, resultFuture, CommunicationMode.SYNC)) {
                batchMessageAndSend();
            }
            if (tryLockWithRetry) {
                this.batchLock.unlock();
            }
            return (SendResult) Optional.ofNullable(resultFuture.waitResponse(this.sendMsgTimeout, getRetryTimesWhenSendFailed())).orElse(new SendResult(SendStatus.SEND_FAILED, message.getMsgId()));
        } catch (Throwable th) {
            if (0 != 0) {
                this.batchLock.unlock();
            }
            throw th;
        }
    }

    private void batchAndSendMessageAsync(Message message, ResultFuture resultFuture) {
        try {
            try {
                boolean tryLockWithRetry = tryLockWithRetry(resultFuture.getTimeoutMillis());
                if (!tryLockWithRetry) {
                    resultFuture.onException(new HTPException("Unable to acquire lock within timeout"));
                    if (tryLockWithRetry) {
                        this.batchLock.unlock();
                        return;
                    }
                    return;
                }
                if (!canAddToCurrentBatch(message)) {
                    batchMessageAndSend();
                    this.batchMessageContainer.addMsg(message, resultFuture, CommunicationMode.ASYNC);
                } else if (this.batchMessageContainer.addMsg(message, resultFuture, CommunicationMode.ASYNC)) {
                    batchMessageAndSend();
                }
                if (tryLockWithRetry) {
                    this.batchLock.unlock();
                }
            } catch (HTPException e) {
                resultFuture.onException(e);
                if (0 != 0) {
                    this.batchLock.unlock();
                }
            }
        } catch (Throwable th) {
            if (0 != 0) {
                this.batchLock.unlock();
            }
            throw th;
        }
    }

    private boolean canAddToCurrentBatch(Message message) {
        return this.batchMessageContainer.haveEnoughSpace(message);
    }

    private boolean tryLockWithRetry(long j) throws HTPException {
        try {
            if (this.batchLock.tryLock(j, TimeUnit.MILLISECONDS)) {
                return true;
            }
            throw new HTPException("Unable to acquire lock within timeout");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new HTPException("Thread interrupted while acquiring locks", e);
        }
    }

    private void batchMessageAndSend() throws HTPException {
        if (this.sending || this.batchMessageContainer.isEmpty()) {
            return;
        }
        this.batchLock.lock();
        try {
            this.sending = true;
            if (!this.batchMessageContainer.isEmpty()) {
                List<Message> sendMessages = this.batchMessageContainer.getSendMessages();
                Map<String, ResultFuture> sendResultFutureMap = this.batchMessageContainer.getSendResultFutureMap();
                this.batchMessageContainer.clear();
                asyncSendBatch(sendMessages, sendResultFutureMap);
            }
        } finally {
            this.sending = false;
            this.canSend.signalAll();
            this.batchLock.unlock();
        }
    }

    private void asyncSendBatch(List<Message> list, Map<String, ResultFuture> map) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        ((Map) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getTopic();
        }, Collectors.groupingBy((v0) -> {
            return v0.getCommunicationMode();
        })))).forEach((str, map2) -> {
            map2.forEach((communicationMode, list2) -> {
                getAsyncSenderExecutor().execute(() -> {
                    Iterator it;
                    final AtomicReference atomicReference = new AtomicReference();
                    final AtomicReference atomicReference2 = new AtomicReference();
                    try {
                    } catch (Exception e) {
                        atomicReference2.set(new HTPException(communicationMode.name() + "aggregation mode send exception", e));
                    }
                    switch (communicationMode) {
                        case SYNC:
                            preHandleMessage(list2, map, communicationMode);
                            if (CollectionUtils.isEmpty(list2)) {
                                return;
                            }
                            atomicReference.set(sendBatchDefaultImpl(list2, communicationMode, null, getSendMsgTimeout(), this));
                            it = list2.iterator();
                            while (it.hasNext()) {
                                Message message = (Message) it.next();
                                ResultFuture resultFuture = (ResultFuture) map.get(message.getMsgId());
                                if (resultFuture != null) {
                                    resultFuture.setSendResultOK(true);
                                    if (resultFuture.isAsync()) {
                                        if (atomicReference.get() != null) {
                                            resultFuture.onSuccess(new SendResult(((SendBatchResult) atomicReference.get()).getSendStatus(), message.getMsgId(), getClusterName(), ((SendBatchResult) atomicReference.get()).getBrokerName(), ((SendBatchResult) atomicReference.get()).getOffsetMsgId(message.getMsgId())));
                                        } else {
                                            if (atomicReference2.get() == null) {
                                                atomicReference2.set(new HTPClientException("Failed to send message", (Throwable) null));
                                            }
                                            resultFuture.onException((Throwable) atomicReference2.get());
                                        }
                                    } else if (atomicReference.get() != null) {
                                        resultFuture.putResult(new SendResult(((SendBatchResult) atomicReference.get()).getSendStatus(), message.getMsgId(), getClusterName(), ((SendBatchResult) atomicReference.get()).getBrokerName(), ((SendBatchResult) atomicReference.get()).getOffsetMsgId(message.getMsgId())), (Throwable) atomicReference2.get());
                                    } else {
                                        resultFuture.putResult(new SendResult(SendStatus.SEND_FAILED, message.getMsgId()), (Throwable) atomicReference2.get());
                                    }
                                }
                            }
                            return;
                        case ASYNC:
                            preHandleMessage(list2, map, communicationMode);
                            if (CollectionUtils.isEmpty(list2)) {
                                return;
                            }
                            final CountDownLatch countDownLatch = new CountDownLatch(1);
                            try {
                                try {
                                    sendBatchDefaultImpl(list2, communicationMode, new SendBatchCallback() { // from class: com.tongtech.client.htp.producer.HtpProducerImpl.1
                                        @Override // com.tongtech.client.producer.SendBatchCallback
                                        public void onSuccess(SendBatchResult sendBatchResult) {
                                            atomicReference.set(sendBatchResult);
                                            countDownLatch.countDown();
                                        }

                                        @Override // com.tongtech.client.producer.SendBatchCallback
                                        public void onException(Throwable th) {
                                            HtpProducerImpl.log.error("Async aggregation mode onException：", th);
                                            atomicReference2.set(new HTPException("Async aggregation mode onException：", th));
                                            countDownLatch.countDown();
                                        }
                                    }, getSendMsgTimeout(), this);
                                    countDownLatch.await();
                                } catch (Throwable th) {
                                    countDownLatch.await();
                                    throw th;
                                }
                            } catch (Exception e2) {
                                log.error("Async aggregation mode send exception", (Throwable) e2);
                                atomicReference2.set(new HTPException(communicationMode.name() + "Async aggregation mode send exception", e2));
                                countDownLatch.countDown();
                                countDownLatch.await();
                            }
                            it = list2.iterator();
                            while (it.hasNext()) {
                            }
                            return;
                        case ONEWAY:
                            if (CollectionUtils.isEmpty(list2)) {
                                return;
                            }
                            try {
                                atomicReference.set(sendBatchDefaultImpl(list2, communicationMode, null, getSendMsgTimeout(), this));
                            } catch (Exception e3) {
                                log.error("Oneway aggregation mode send exception", (Throwable) e3);
                                atomicReference2.set(new HTPException(communicationMode.name() + "Oneway aggregation mode send exception", e3));
                            }
                            it = list2.iterator();
                            while (it.hasNext()) {
                            }
                            return;
                        default:
                            it = list2.iterator();
                            while (it.hasNext()) {
                            }
                            return;
                    }
                });
            });
        });
    }

    private void preHandleMessage(List<Message> list, Map<String, ResultFuture> map, CommunicationMode communicationMode) {
        list.removeIf(message -> {
            ResultFuture resultFuture = (ResultFuture) map.get(message.getMsgId());
            if (resultFuture == null) {
                return true;
            }
            if (!resultFuture.isBatchTimeout()) {
                return false;
            }
            if (communicationMode == CommunicationMode.SYNC) {
                resultFuture.putResult(new SendResult(SendStatus.SEND_FAILED, message.getMsgId()), null);
                return true;
            }
            if (communicationMode != CommunicationMode.ASYNC) {
                return true;
            }
            resultFuture.onSuccess(new SendResult(SendStatus.SEND_FAILED, message.getMsgId()));
            return true;
        });
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public int getPutGet() {
        getClass();
        return 0;
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public String getProducerGroupName() {
        return this.producerGroup;
    }

    public void setProducerGroupName(String str) {
        this.producerGroup = str;
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public ModeType getModeType() {
        return this.modeType;
    }

    public void setModeType(ModeType modeType) {
        this.modeType = modeType;
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public String getDomain() {
        return getHtpProducer().getDomain();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public String getClusterName() {
        return getHtpProducer().getCluster();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public int getRetryTimesWhenSendFailed() {
        return getHtpProducer().getRetryTimesWhenSendFailed();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public String getProducerId() {
        return this.producerId;
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public void removeProducerTable(TopicBrokerInfo topicBrokerInfo) {
        getmQClientFactory().removeAndUnRegisterBroker(getHtpProducer().getProducerId(), topicBrokerInfo);
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public ProtocolType getProtocolType() {
        return getHtpProducer().getProtocolType();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public int getMaxMessageSize() {
        return this.maxMsgSize;
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public String getNamesrvAddr() {
        return getHtpProducer().getNamesrvAddr();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public int getRetryTimesWhenSendFileFailed() {
        return 0;
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public long getSendFileTimeout() {
        return 0L;
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public BreakPointState getBreakPointTrans() {
        return null;
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public int getUdpMaxMessageSize() {
        return 0;
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public ClientRegisterType getClientRegisterType() {
        return this.clientRegisterType;
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public boolean isPublishTopicNeedUpdate(String str) {
        return isPublishTopicNeedUpdateAbstract(str);
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public Set<String> getPublishTopicList() {
        return getPublishTopicListAbstract();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public void updateTopicPublishInfo(String str, Boolean bool) {
        updateTopicPublishInfoAbstract(str, bool);
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    public void setMaxMsgSize(int i) {
        this.maxMsgSize = i;
    }

    public void setMaxBatchSize(int i) {
        this.maxBatchSize = i;
    }

    public long getBatchTimeout() {
        return this.batchTimeout;
    }

    public void setBatchTimeout(int i) {
        this.batchTimeout = i;
    }

    public int getSendMsgTimeout() {
        return this.sendMsgTimeout;
    }

    public void setSendMsgTimeout(int i) {
        this.sendMsgTimeout = i;
    }

    public long getBatchDelayMs() {
        return this.batchDelayMs;
    }

    public void setBatchDelayMs(long j) {
        this.batchDelayMs = j;
    }

    public void setInnerBatch(boolean z) throws HTPException {
        if (getServiceState() != ServiceState.CREATE_JUST) {
            throw new HTPClientException("The producer service state not OK, " + getServiceState(), (Throwable) null);
        }
        this.innerBatch = z;
    }
}
