package com.tongtech.client.high.producer;

import com.tongtech.client.common.BreakPointState;
import com.tongtech.client.common.ClientRegisterType;
import com.tongtech.client.common.CommunicationMode;
import com.tongtech.client.common.ModeType;
import com.tongtech.client.common.ServiceState;
import com.tongtech.client.exception.TLQClientException;
import com.tongtech.client.high.batch.BatchMessageContainerBase;
import com.tongtech.client.high.batch.BatchMessageContainerImpl;
import com.tongtech.client.message.Message;
import com.tongtech.client.producer.SendBatchResult;
import com.tongtech.client.producer.SendCallback;
import com.tongtech.client.producer.SendResult;
import com.tongtech.client.producer.SendStatus;
import com.tongtech.client.producer.TLQProducerAbstract;
import com.tongtech.client.producer.TLQProducerInner;
import com.tongtech.client.producer.TopicBrokerInfo;
import com.tongtech.client.remoting.exception.RemotingConnectException;
import com.tongtech.client.remoting.exception.RemotingException;
import com.tongtech.client.remoting.exception.RemotingSendRequestException;
import com.tongtech.client.remoting.exception.RemotingTimeoutException;
import com.tongtech.client.remoting.netty.ProtocolType;
import com.tongtech.client.utils.Validators;
import com.tongtech.slf4j.Logger;
import com.tongtech.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Deprecated
/* loaded from: input_file:com/tongtech/client/high/producer/TLQHighLevelProducerImpl.class */
public class TLQHighLevelProducerImpl extends TLQProducerAbstract implements TLQProducerInner {
    private static Logger log = LoggerFactory.getLogger((Class<?>) TLQHighLevelProducerImpl.class);
    private final TLQHighLevelProducer tlqTopicProducer;
    private final BatchMessageContainerBase batchMessageContainer = new BatchMessageContainerImpl();
    private final ScheduledExecutorService sendMessageExecutorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
        return new Thread(runnable, "TLQHighLevelSendThread");
    });

    public TLQHighLevelProducerImpl(TLQHighLevelProducer tLQHighLevelProducer) {
        this.tlqTopicProducer = tLQHighLevelProducer;
    }

    public void start() throws TLQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        this.batchMessageContainer.setProducer(this.tlqTopicProducer);
        start(true, this.tlqTopicProducer.getClientConfig(), this);
        this.sendMessageExecutorService.scheduleAtFixedRate(() -> {
            try {
                synchronized (this) {
                    if (getServiceState() != ServiceState.RUNNING) {
                        return;
                    }
                    batchMessageAndSend();
                }
            } catch (Exception e) {
                log.error("ScheduledTask batchMessageAndSend exception", (Throwable) e);
            }
        }, 0L, this.tlqTopicProducer.getBatchingMaxPublishDelayMs(), TimeUnit.MILLISECONDS);
    }

    public void shutdown() {
        shutdown(true, this, getAsyncSenderExecutor());
        this.sendMessageExecutorService.shutdown();
    }

    public void send(Message message, SendCallback sendCallback) throws TLQClientException, RemotingException, InterruptedException {
        if (getServiceState() != ServiceState.RUNNING) {
            throw new TLQClientException("The producer service state not OK, " + getServiceState(), (Throwable) null);
        }
        Validators.checkMessage(message, this);
        setMessageInfo(message, this);
        synchronized (this) {
            batchAndSendMessage(message, sendCallback);
        }
    }

    public void setCallbackExecutor(ExecutorService executorService) {
        getmQClientFactory().getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(executorService);
    }

    public TLQHighLevelProducer getTLQTopicProducer() {
        return this.tlqTopicProducer;
    }

    public void setSendLatencyFaultEnable(boolean z) {
        getMqFaultStrategy().setSendLatencyFaultEnable(z);
    }

    private void batchAndSendMessage(Message message, SendCallback sendCallback) {
        if (!canAddToCurrentBatch(message)) {
            doBatchSendAndAdd(message, sendCallback);
        } else if (this.batchMessageContainer.add(message, sendCallback)) {
            batchMessageAndSend();
        }
    }

    private boolean canAddToCurrentBatch(Message message) {
        return this.batchMessageContainer.haveEnoughSpace(message);
    }

    private void doBatchSendAndAdd(Message message, SendCallback sendCallback) {
        batchMessageAndSend();
        this.batchMessageContainer.add(message, sendCallback);
    }

    private void batchMessageAndSend() {
        if (this.batchMessageContainer.isEmpty()) {
            return;
        }
        List<Message> sendMessages = this.batchMessageContainer.getSendMessages();
        Map<String, SendCallback> sendCallbackMap = this.batchMessageContainer.getSendCallbackMap();
        this.batchMessageContainer.clear();
        asyncSendBatch(sendMessages, sendCallbackMap);
    }

    private void asyncSendBatch(List<Message> list, Map<String, SendCallback> map) {
        getAsyncSenderExecutor().submit(() -> {
            SendBatchResult sendBatchResult = null;
            Exception exc = null;
            try {
                sendBatchResult = sendBatchDefaultImpl(list, CommunicationMode.SYNC, null, this.tlqTopicProducer.getSendMsgTimeout(), this);
            } catch (Exception e) {
                exc = e;
            }
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Message message = (Message) it.next();
                SendCallback sendCallback = (SendCallback) map.get(message.getMsgId());
                if (sendCallback != null) {
                    if (sendBatchResult != null) {
                        sendCallback.onSuccess(new SendResult(SendStatus.SEND_OK, message.getMsgId(), sendBatchResult.getBrokerId()));
                    } else {
                        if (exc == null) {
                            exc = new TLQClientException("Failed to send message", (Throwable) null);
                        }
                        sendCallback.onException(exc);
                    }
                }
            }
        });
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public int getPutGet() {
        return getTLQTopicProducer().getPutGet();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public String getProducerGroupName() {
        return this.tlqTopicProducer.getProducerGroup();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public ModeType getModeType() {
        return this.tlqTopicProducer.getModeType();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public String getDomain() {
        return this.tlqTopicProducer.getDomain();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public String getClusterName() {
        return this.tlqTopicProducer.getCluster();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public int getRetryTimesWhenSendFailed() {
        return this.tlqTopicProducer.getRetryTimesWhenSendFailed();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public String getProducerId() {
        return this.tlqTopicProducer.getProducerId();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public void removeProducerTable(TopicBrokerInfo topicBrokerInfo) {
        getmQClientFactory().removeAndUnRegisterBroker(this.tlqTopicProducer.getProducerId(), topicBrokerInfo);
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public ProtocolType getProtocolType() {
        return getTLQTopicProducer().getProtocolType();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public int getMaxMessageSize() {
        return this.tlqTopicProducer.getMaxMessageSize();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public String getNamesrvAddr() {
        return this.tlqTopicProducer.getNamesrvAddr();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public int getRetryTimesWhenSendFileFailed() {
        return 0;
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public long getSendFileTimeout() {
        return 0L;
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public BreakPointState getBreakPointTrans() {
        return null;
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public int getUdpMaxMessageSize() {
        return 0;
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public ClientRegisterType getClientRegisterType() {
        return this.tlqTopicProducer.getClientRegisterType();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public boolean isPublishTopicNeedUpdate(String str) {
        return isPublishTopicNeedUpdateAbstract(str);
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public Set<String> getPublishTopicList() {
        return getPublishTopicListAbstract();
    }

    @Override // com.tongtech.client.producer.TLQProducerInner
    public void updateTopicPublishInfo(String str, Boolean bool) {
        updateTopicPublishInfoAbstract(str, bool);
    }
}
