/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.cloud.stream.binder.rocketmq.integration;

import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class RocketMQMessageHandler
extends AbstractMessageHandler
implements Lifecycle {
    private static final Logger log = LoggerFactory.getLogger(RocketMQMessageHandler.class);
    private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
    private MessageChannel sendFailureChannel;
    private final RocketMQTemplate rocketMQTemplate;
    private RocketMQHeaderMapper headerMapper;
    private final Boolean transactional;
    private final String destination;
    private final String groupName;
    private final InstrumentationManager instrumentationManager;
    private boolean sync = false;
    private volatile boolean running = false;
    private ExtendedProducerProperties<RocketMQProducerProperties> producerProperties;
    private MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor;

    public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String destination, String groupName, Boolean transactional, InstrumentationManager instrumentationManager, ExtendedProducerProperties<RocketMQProducerProperties> producerProperties, MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor) {
        this.rocketMQTemplate = rocketMQTemplate;
        this.destination = destination;
        this.groupName = groupName;
        this.transactional = transactional;
        this.instrumentationManager = instrumentationManager;
        this.producerProperties = producerProperties;
        this.partitioningInterceptor = partitioningInterceptor;
    }

    public void start() {
        if (!this.transactional.booleanValue()) {
            this.instrumentationManager.addHealthInstrumentation(new Instrumentation(this.destination));
            try {
                this.rocketMQTemplate.afterPropertiesSet();
                this.instrumentationManager.getHealthInstrumentation(this.destination).markStartedSuccessfully();
            }
            catch (Exception e) {
                this.instrumentationManager.getHealthInstrumentation(this.destination).markStartFailed(e);
                log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage());
                throw new MessagingException(org.springframework.integration.support.MessageBuilder.withPayload((Object)("RocketMQTemplate startup failed, Caused by " + e.getMessage())).build(), (Throwable)e);
            }
        }
        if (this.producerProperties.isPartitioned()) {
            try {
                List messageQueues = this.rocketMQTemplate.getProducer().fetchPublishMessageQueues(this.destination);
                if (this.producerProperties.getPartitionCount() != messageQueues.size()) {
                    this.logger.info((Object)String.format("The partition count of topic '%s' will change from '%s' to '%s'", this.destination, this.producerProperties.getPartitionCount(), messageQueues.size()));
                    this.producerProperties.setPartitionCount(messageQueues.size());
                    this.partitioningInterceptor.setPartitionCount(this.producerProperties.getPartitionCount());
                }
            }
            catch (MQClientException e) {
                this.logger.error((Object)"fetch publish message queues fail", (Throwable)e);
            }
        }
        this.running = true;
    }

    public void stop() {
        if (!this.transactional.booleanValue()) {
            this.rocketMQTemplate.destroy();
        }
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    protected void handleMessageInternal(Message<?> message) {
        block16: {
            try {
                Map<String, String> jsonHeaders = this.headerMapper.fromHeaders(message.getHeaders());
                message = MessageBuilder.fromMessage(message).copyHeaders(jsonHeaders).build();
                final StringBuilder topicWithTags = new StringBuilder(this.destination);
                String tags = Optional.ofNullable(message.getHeaders().get((Object)"TAGS")).orElse("").toString();
                if (!StringUtils.isEmpty((Object)tags)) {
                    topicWithTags.append(":").append(tags);
                }
                TransactionSendResult sendRes = null;
                if (this.transactional.booleanValue()) {
                    sendRes = this.rocketMQTemplate.sendMessageInTransaction(this.groupName, topicWithTags.toString(), message, message.getHeaders().get((Object)"TRANSACTIONAL_ARG"));
                    log.debug("transactional send to topic " + topicWithTags + " " + sendRes);
                } else {
                    int delayLevel = 0;
                    try {
                        Object delayLevelObj = message.getHeaders().getOrDefault((Object)"DELAY", (Object)0);
                        if (delayLevelObj instanceof Number) {
                            delayLevel = ((Number)delayLevelObj).intValue();
                        } else if (delayLevelObj instanceof String) {
                            delayLevel = Integer.parseInt((String)delayLevelObj);
                        }
                    }
                    catch (Exception delayLevelObj) {
                        // empty catch block
                    }
                    boolean needSelectQueue = message.getHeaders().containsKey((Object)"scst_partition");
                    if (this.sync) {
                        sendRes = needSelectQueue ? this.rocketMQTemplate.syncSendOrderly(topicWithTags.toString(), message, "", (long)this.rocketMQTemplate.getProducer().getSendMsgTimeout()) : this.rocketMQTemplate.syncSend(topicWithTags.toString(), message, (long)this.rocketMQTemplate.getProducer().getSendMsgTimeout(), delayLevel);
                        log.debug("sync send to topic " + topicWithTags + " " + sendRes);
                    } else {
                        final Message finalMessage = message;
                        SendCallback sendCallback = new SendCallback(){

                            public void onSuccess(SendResult sendResult) {
                                log.debug("async send to topic " + topicWithTags + " " + sendResult);
                            }

                            public void onException(Throwable e) {
                                log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
                                if (RocketMQMessageHandler.this.getSendFailureChannel() != null) {
                                    RocketMQMessageHandler.this.getSendFailureChannel().send((Message)RocketMQMessageHandler.this.errorMessageStrategy.buildErrorMessage((Throwable)new MessagingException(finalMessage, e), null));
                                }
                            }
                        };
                        if (needSelectQueue) {
                            this.rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(), message, "", sendCallback, (long)this.rocketMQTemplate.getProducer().getSendMsgTimeout());
                        } else {
                            this.rocketMQTemplate.asyncSend(topicWithTags.toString(), message, sendCallback);
                        }
                    }
                }
                if (sendRes == null || sendRes.getSendStatus().equals((Object)SendStatus.SEND_OK)) break block16;
                if (this.getSendFailureChannel() != null) {
                    this.getSendFailureChannel().send(message);
                    break block16;
                }
                throw new MessagingException(message, (Throwable)new MQClientException("message hasn't been sent", null));
            }
            catch (Exception e) {
                log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
                if (this.getSendFailureChannel() != null) {
                    this.getSendFailureChannel().send((Message)this.errorMessageStrategy.buildErrorMessage((Throwable)new MessagingException(message, (Throwable)e), null));
                }
                throw new MessagingException(message, (Throwable)e);
            }
        }
    }

    public void setSendFailureChannel(MessageChannel sendFailureChannel) {
        this.sendFailureChannel = sendFailureChannel;
    }

    public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
        Assert.notNull((Object)errorMessageStrategy, (String)"'errorMessageStrategy' cannot be null");
        this.errorMessageStrategy = errorMessageStrategy;
    }

    public MessageChannel getSendFailureChannel() {
        return this.sendFailureChannel;
    }

    public void setSync(boolean sync) {
        this.sync = sync;
    }

    public RocketMQHeaderMapper getHeaderMapper() {
        return this.headerMapper;
    }

    public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
        this.headerMapper = headerMapper;
    }
}

