/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.cloud.stream.binder.rocketmq.integration;

import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderUtils;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQMessageQueueChooser;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallbackFactory;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class RocketMQMessageSource
extends AbstractMessageSource<Object>
implements DisposableBean,
Lifecycle {
    private static final Logger log = LoggerFactory.getLogger(RocketMQMessageSource.class);
    private final RocketMQCallbackFactory ackCallbackFactory;
    private final RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties;
    private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties;
    private final String topic;
    private final String group;
    private final Object consumerMonitor = new Object();
    private DefaultMQPullConsumer consumer;
    private boolean running;
    private MessageSelector messageSelector;
    private RocketMQMessageQueueChooser messageQueueChooser = new RocketMQMessageQueueChooser();

    public RocketMQMessageSource(RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties, ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties, String topic, String group) {
        this(new RocketMQCallbackFactory(), rocketMQBinderConfigurationProperties, rocketMQConsumerProperties, topic, group);
    }

    public RocketMQMessageSource(RocketMQCallbackFactory ackCallbackFactory, RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties, ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties, String topic, String group) {
        this.ackCallbackFactory = ackCallbackFactory;
        this.rocketMQBinderConfigurationProperties = rocketMQBinderConfigurationProperties;
        this.rocketMQConsumerProperties = rocketMQConsumerProperties;
        this.topic = topic;
        this.group = group;
    }

    public synchronized void start() {
        if (this.isRunning()) {
            throw new IllegalStateException("pull consumer already running. " + ((Object)((Object)this)).toString());
        }
        try {
            this.consumer = new DefaultMQPullConsumer(this.group);
            this.consumer.setNamesrvAddr(RocketMQBinderUtils.getNameServerStr(this.rocketMQBinderConfigurationProperties.getNameServer()));
            this.consumer.setConsumerPullTimeoutMillis(((RocketMQConsumerProperties)this.rocketMQConsumerProperties.getExtension()).getPullTimeout());
            this.consumer.setMessageModel(MessageModel.CLUSTERING);
            String tags = ((RocketMQConsumerProperties)this.rocketMQConsumerProperties.getExtension()).getTags();
            String sql = ((RocketMQConsumerProperties)this.rocketMQConsumerProperties.getExtension()).getSql();
            if (!StringUtils.isEmpty((Object)tags) && !StringUtils.isEmpty((Object)sql)) {
                this.messageSelector = MessageSelector.byTag((String)tags);
            } else if (!StringUtils.isEmpty((Object)tags)) {
                this.messageSelector = MessageSelector.byTag((String)tags);
            } else if (!StringUtils.isEmpty((Object)sql)) {
                this.messageSelector = MessageSelector.bySql((String)sql);
            }
            this.consumer.registerMessageQueueListener(this.topic, new MessageQueueListener(){

                public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
                    log.info("messageQueueChanged, topic='{}', mqAll=`{}`, mqDivided=`{}`", new Object[]{topic, mqAll, mqDivided});
                    switch (RocketMQMessageSource.this.consumer.getMessageModel()) {
                        case BROADCASTING: {
                            RocketMQMessageSource.this.resetMessageQueues(mqAll);
                            break;
                        }
                        case CLUSTERING: {
                            RocketMQMessageSource.this.resetMessageQueues(mqDivided);
                            break;
                        }
                    }
                }
            });
            this.consumer.start();
        }
        catch (MQClientException e) {
            log.error("DefaultMQPullConsumer startup error: " + e.getMessage(), (Throwable)e);
        }
        this.setRunning(true);
    }

    public synchronized void stop() {
        if (this.isRunning()) {
            this.setRunning(false);
            this.consumer.shutdown();
        }
    }

    public synchronized boolean isRunning() {
        return this.running;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected synchronized Object doReceive() {
        if (this.messageQueueChooser.getMessageQueues() == null || this.messageQueueChooser.getMessageQueues().size() == 0) {
            return null;
        }
        try {
            for (int count = 0; count < this.messageQueueChooser.getMessageQueues().size(); ++count) {
                MessageQueue messageQueue;
                Object object = this.consumerMonitor;
                synchronized (object) {
                    messageQueue = this.messageQueueChooser.choose();
                    this.messageQueueChooser.increment();
                }
                long offset = this.consumer.fetchConsumeOffset(messageQueue, ((RocketMQConsumerProperties)this.rocketMQConsumerProperties.getExtension()).isFromStore());
                log.debug("topic='{}', group='{}', messageQueue='{}', offset now='{}'", new Object[]{this.topic, this.group, messageQueue, offset});
                PullResult pullResult = this.messageSelector != null ? this.consumer.pull(messageQueue, this.messageSelector, offset, 1) : this.consumer.pull(messageQueue, (String)null, offset, 1);
                if (pullResult.getPullStatus() == PullStatus.FOUND) {
                    List messageExtList = pullResult.getMsgFoundList();
                    Message message = RocketMQUtil.convertToSpringMessage((MessageExt)((MessageExt)messageExtList.get(0)));
                    AcknowledgmentCallback ackCallback = this.ackCallbackFactory.createCallback(new RocketMQAckInfo(messageQueue, pullResult, this.consumer, offset));
                    Message messageResult = MessageBuilder.fromMessage((Message)message).setHeader("acknowledgmentCallback", (Object)ackCallback).build();
                    return messageResult;
                }
                log.debug("messageQueue='{}' PullResult='{}' with topic `{}`", new Object[]{this.messageQueueChooser.getMessageQueues(), pullResult.getPullStatus(), this.topic});
            }
        }
        catch (Exception e) {
            log.error("Consumer pull error: " + e.getMessage(), (Throwable)e);
        }
        return null;
    }

    public String getComponentType() {
        return "rocketmq:message-source";
    }

    public synchronized void setRunning(boolean running) {
        this.running = running;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void resetMessageQueues(Set<MessageQueue> queueSet) {
        log.info("resetMessageQueues, topic='{}', messageQueue=`{}`", (Object)this.topic, queueSet);
        Object object = this.consumerMonitor;
        synchronized (object) {
            this.messageQueueChooser.reset(queueSet);
        }
    }

    public class RocketMQAckInfo {
        private final MessageQueue messageQueue;
        private final PullResult pullResult;
        private final DefaultMQPullConsumer consumer;
        private final long oldOffset;

        public RocketMQAckInfo(MessageQueue messageQueue, PullResult pullResult, DefaultMQPullConsumer consumer, long oldOffset) {
            this.messageQueue = messageQueue;
            this.pullResult = pullResult;
            this.consumer = consumer;
            this.oldOffset = oldOffset;
        }

        public MessageQueue getMessageQueue() {
            return this.messageQueue;
        }

        public PullResult getPullResult() {
            return this.pullResult;
        }

        public DefaultMQPullConsumer getConsumer() {
            return this.consumer;
        }

        public RocketMQMessageQueueChooser getMessageQueueChooser() {
            return RocketMQMessageSource.this.messageQueueChooser;
        }

        public long getOldOffset() {
            return this.oldOffset;
        }

        public Object getConsumerMonitor() {
            return RocketMQMessageSource.this.consumerMonitor;
        }

        public String toString() {
            return "RocketMQAckInfo{messageQueue=" + this.messageQueue + ", pullResult=" + this.pullResult + ", consumer=" + this.consumer + ", oldOffset=" + this.oldOffset + '}';
        }
    }

    public static class RocketMQAckCallback
    implements AcknowledgmentCallback {
        private final RocketMQAckInfo ackInfo;
        private boolean acknowledged;
        private boolean autoAckEnabled = true;

        public RocketMQAckCallback(RocketMQAckInfo ackInfo) {
            this.ackInfo = ackInfo;
        }

        protected void setAcknowledged(boolean acknowledged) {
            this.acknowledged = acknowledged;
        }

        public boolean isAcknowledged() {
            return this.acknowledged;
        }

        public void noAutoAck() {
            this.autoAckEnabled = false;
        }

        public boolean isAutoAck() {
            return this.autoAckEnabled;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         * Converted monitor instructions to comments
         * Lifted jumps to return sites
         */
        public void acknowledge(AcknowledgmentCallback.Status status) {
            Assert.notNull((Object)status, (String)"'status' cannot be null");
            if (this.acknowledged) {
                throw new IllegalStateException("Already acknowledged");
            }
            log.debug("acknowledge(" + status.name() + ") for " + this);
            Object object = this.ackInfo.getConsumerMonitor();
            // MONITORENTER : object
            try {
                switch (status) {
                    case ACCEPT: 
                    case REJECT: {
                        this.ackInfo.getConsumer().updateConsumeOffset(this.ackInfo.getMessageQueue(), this.ackInfo.getPullResult().getNextBeginOffset());
                        log.debug("messageQueue='{}' offset update to `{}`", (Object)this.ackInfo.getMessageQueue(), (Object)String.valueOf(this.ackInfo.getPullResult().getNextBeginOffset()));
                        return;
                    }
                    case REQUEUE: {
                        int oldIndex = this.ackInfo.getMessageQueueChooser().requeue();
                        this.ackInfo.getConsumer().updateConsumeOffset(this.ackInfo.getMessageQueue(), this.ackInfo.getOldOffset());
                        log.debug("messageQueue='{}' offset requeue to index:`{}`, oldOffset:'{}'", new Object[]{this.ackInfo.getMessageQueue(), oldIndex, this.ackInfo.getOldOffset()});
                        return;
                    }
                }
                return;
            }
            catch (MQClientException e) {
                log.error("acknowledge error: " + e.getErrorMessage(), (Throwable)e);
                return;
            }
            finally {
                this.acknowledged = true;
            }
        }

        public String toString() {
            return "RocketMQAckCallback{ackInfo=" + this.ackInfo + ", acknowledged=" + this.acknowledged + ", autoAckEnabled=" + this.autoAckEnabled + '}';
        }
    }

    public static class RocketMQCallbackFactory
    implements AcknowledgmentCallbackFactory<RocketMQAckInfo> {
        public AcknowledgmentCallback createCallback(RocketMQAckInfo info) {
            return new RocketMQAckCallback(info);
        }
    }
}

