package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull;

import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQConsumerFactory;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport;
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;

/* loaded from: input_file:com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.class */
public class RocketMQMessageSource extends AbstractMessageSource<Object> implements DisposableBean, Lifecycle {
    private static final Logger log = LoggerFactory.getLogger(RocketMQMessageSource.class);
    private DefaultLitePullConsumer consumer;
    private volatile boolean running;
    private final String topic;
    private final MessageSelector messageSelector;
    private final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties;
    private final Map<String, Collection<MessageQueue>> messageQueuesForTopic = new ConcurrentHashMap();
    private volatile Iterator<MessageExt> messageExtIterator = null;

    public RocketMQMessageSource(String str, ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
        this.topic = str;
        this.messageSelector = RocketMQUtils.getMessageSelector(((RocketMQConsumerProperties) extendedConsumerProperties.getExtension()).getSubscription());
        this.extendedConsumerProperties = extendedConsumerProperties;
    }

    public synchronized void start() {
        Instrumentation instrumentation = new Instrumentation(this.topic, this);
        try {
        } catch (MQClientException e) {
            instrumentation.markStartFailed(e);
            log.error("DefaultMQPullConsumer startup error: " + e.getMessage(), e);
        } finally {
            InstrumentationManager.addHealthInstrumentation(instrumentation);
        }
        if (isRunning()) {
            throw new IllegalStateException("pull consumer already running. " + toString());
        }
        this.consumer = RocketMQConsumerFactory.initPullConsumer(this.topic, this.extendedConsumerProperties);
        this.consumer.subscribe(this.topic, this.messageSelector);
        this.consumer.setAutoCommit(false);
        DefaultLitePullConsumer defaultLitePullConsumer = this.consumer;
        String str = this.topic;
        Map<String, Collection<MessageQueue>> map = this.messageQueuesForTopic;
        Objects.requireNonNull(map);
        defaultLitePullConsumer.registerTopicMessageQueueChangeListener(str, (v1, v2) -> {
            r2.put(v1, v2);
        });
        this.consumer.start();
        this.messageQueuesForTopic.put(this.topic, this.consumer.fetchMessageQueues(this.topic));
        instrumentation.markStartedSuccessfully();
        this.running = true;
    }

    private MessageQueue acquireCurrentMessageQueue(String str, int i, String str2) {
        Collection<MessageQueue> collection = this.messageQueuesForTopic.get(str);
        if (CollectionUtils.isEmpty(collection)) {
            return null;
        }
        for (MessageQueue messageQueue : collection) {
            if (messageQueue.getQueueId() == i && ObjectUtils.nullSafeEquals(str2, messageQueue.getBrokerName())) {
                return messageQueue;
            }
        }
        return null;
    }

    public synchronized void stop() {
        if (!isRunning() || null == this.consumer) {
            return;
        }
        this.consumer.unsubscribe(this.topic);
        this.consumer.shutdown();
        this.running = false;
    }

    public synchronized boolean isRunning() {
        return this.running;
    }

    protected synchronized Object doReceive() {
        if (this.messageExtIterator == null) {
            List poll = this.consumer.poll();
            if (CollectionUtils.isEmpty(poll)) {
                return null;
            }
            this.messageExtIterator = poll.iterator();
        }
        MessageExt next = this.messageExtIterator.next();
        if (!this.messageExtIterator.hasNext()) {
            this.messageExtIterator = null;
        }
        if (null == next) {
            return null;
        }
        MessageQueue acquireCurrentMessageQueue = acquireCurrentMessageQueue(next.getTopic(), next.getQueueId(), next.getBrokerName());
        if (acquireCurrentMessageQueue == null) {
            throw new IllegalArgumentException("The message queue is not in assigned list");
        }
        return MessageBuilder.fromMessage(RocketMQMessageConverterSupport.convertMessage2Spring(next)).setHeader("acknowledgmentCallback", new RocketMQAckCallback(this.consumer, acquireCurrentMessageQueue, next)).build();
    }

    public String getComponentType() {
        return "rocketmq:message-source";
    }
}
