/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.cloud.stream.binder.rocketmq.integration.outbound;

import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public final class RocketMQProduceFactory {
    private static final Logger log = LoggerFactory.getLogger(RocketMQProduceFactory.class);
    private static final Map<String, DefaultMQProducer> PRODUCER_REUSABLE_MAP = new ConcurrentHashMap<String, DefaultMQProducer>();

    private RocketMQProduceFactory() {
    }

    public static DefaultMQProducer initRocketMQProducer(String topic, RocketMQProducerProperties producerProperties) {
        SendMessageHook sendMessageHook;
        Object producer;
        if (!StringUtils.hasLength((String)producerProperties.getGroup())) {
            producerProperties.setGroup("anonymous");
        }
        Assert.notNull((Object)producerProperties.getNameServer(), (String)"Property 'nameServer' is required");
        AclClientRPCHook rpcHook = null;
        if (StringUtils.hasLength((String)producerProperties.getAccessKey()) && StringUtils.hasLength((String)producerProperties.getSecretKey())) {
            rpcHook = new AclClientRPCHook(new SessionCredentials(producerProperties.getAccessKey(), producerProperties.getSecretKey()));
        }
        if (RocketMQProducerProperties.ProducerType.Trans.equalsName(producerProperties.getProducerType())) {
            producer = new TransactionMQProducer(producerProperties.getNamespace(), producerProperties.getGroup(), rpcHook);
            if (producerProperties.getEnableMsgTrace()) {
                try {
                    AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerProperties.getGroup(), TraceDispatcher.Type.PRODUCE, producerProperties.getCustomizedTraceTopic(), (RPCHook)rpcHook);
                    dispatcher.setHostProducer(producer.getDefaultMQProducerImpl());
                    Field field = DefaultMQProducer.class.getDeclaredField("traceDispatcher");
                    field.setAccessible(true);
                    field.set(producer, dispatcher);
                    producer.getDefaultMQProducerImpl().registerSendMessageHook((SendMessageHook)new SendMessageTraceHookImpl((TraceDispatcher)dispatcher));
                }
                catch (Throwable e) {
                    log.error("system mq-trace hook init failed ,maybe can't send msg trace data");
                }
            }
        } else {
            String key = RocketMQProduceFactory.getKey(producerProperties);
            if (PRODUCER_REUSABLE_MAP.containsKey(key)) {
                return PRODUCER_REUSABLE_MAP.get(key);
            }
            producer = new ReusableMQProducer(producerProperties.getNamespace(), producerProperties.getGroup(), (RPCHook)rpcHook, producerProperties.getEnableMsgTrace(), producerProperties.getCustomizedTraceTopic(), key);
            PRODUCER_REUSABLE_MAP.put(key, (DefaultMQProducer)producer);
        }
        producer.setVipChannelEnabled(null == rpcHook && producerProperties.getVipChannelEnabled());
        producer.setInstanceName(RocketMQUtils.getInstanceName((RPCHook)rpcHook, topic + "|" + UtilAll.getPid()));
        producer.setNamesrvAddr(producerProperties.getNameServer());
        producer.setSendMsgTimeout(producerProperties.getSendMsgTimeout());
        producer.setRetryTimesWhenSendFailed(producerProperties.getRetryTimesWhenSendFailed());
        producer.setRetryTimesWhenSendAsyncFailed(producerProperties.getRetryTimesWhenSendAsyncFailed());
        producer.setCompressMsgBodyOverHowmuch(producerProperties.getCompressMsgBodyThreshold());
        producer.setRetryAnotherBrokerWhenNotStoreOK(producerProperties.getRetryAnotherBroker());
        producer.setMaxMessageSize(producerProperties.getMaxMessageSize());
        producer.setUseTLS(producerProperties.getUseTLS());
        producer.setUnitName(producerProperties.getUnitName());
        CheckForbiddenHook checkForbiddenHook = RocketMQBeanContainerCache.getBean(producerProperties.getCheckForbiddenHook(), CheckForbiddenHook.class);
        if (null != checkForbiddenHook) {
            producer.getDefaultMQProducerImpl().registerCheckForbiddenHook(checkForbiddenHook);
        }
        if (null != (sendMessageHook = RocketMQBeanContainerCache.getBean(producerProperties.getSendMessageHook(), SendMessageHook.class))) {
            producer.getDefaultMQProducerImpl().registerSendMessageHook(sendMessageHook);
        }
        return producer;
    }

    private static String getKey(RocketMQProducerProperties producerProperties) {
        return producerProperties.getNameServer() + "," + producerProperties.getGroup() + producerProperties.getSendCallBack();
    }

    protected static class ReusableMQProducer
    extends DefaultMQProducer {
        private final AtomicInteger atomicInteger = new AtomicInteger();
        private final String key;

        public ReusableMQProducer(String namespace, String group, RPCHook rpcHook, boolean enableMsgTrace, String customizedTraceTopic, String key) {
            super(namespace, group, rpcHook, enableMsgTrace, customizedTraceTopic);
            this.key = key;
        }

        public void start() throws MQClientException {
            if (this.atomicInteger.getAndIncrement() == 0) {
                super.start();
            }
        }

        public void shutdown() {
            if (this.atomicInteger.decrementAndGet() == 0) {
                PRODUCER_REUSABLE_MAP.remove(this.key);
                super.shutdown();
            }
        }
    }
}

