/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer;

import apache.rocketmq.v1.AckMessageResponse;
import apache.rocketmq.v1.Broker;
import apache.rocketmq.v1.ConsumePolicy;
import apache.rocketmq.v1.FilterExpression;
import apache.rocketmq.v1.FilterType;
import apache.rocketmq.v1.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v1.Partition;
import apache.rocketmq.v1.PullMessageRequest;
import apache.rocketmq.v1.QueryOffsetPolicy;
import apache.rocketmq.v1.QueryOffsetRequest;
import apache.rocketmq.v1.ReceiveMessageRequest;
import apache.rocketmq.v1.Resource;
import com.aliyun.openservices.ons.shaded.com.google.common.annotations.VisibleForTesting;
import com.aliyun.openservices.ons.shaded.com.google.common.base.Optional;
import com.aliyun.openservices.ons.shaded.com.google.common.base.Stopwatch;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.Futures;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.ListenableFuture;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.MoreExecutors;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.RateLimiter;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.SettableFuture;
import com.aliyun.openservices.ons.shaded.com.google.errorprone.annotations.concurrent.GuardedBy;
import com.aliyun.openservices.ons.shaded.com.google.protobuf.Duration;
import com.aliyun.openservices.ons.shaded.com.google.protobuf.util.Durations;
import com.aliyun.openservices.ons.shaded.com.google.protobuf.util.Timestamps;
import com.aliyun.openservices.ons.shaded.com.google.rpc.Code;
import com.aliyun.openservices.ons.shaded.com.google.rpc.Status;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.ConsumeStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.MessageModel;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.PullMessageResult;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.PullStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.ReceiveMessageResult;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.ReceiveStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.filter.ExpressionType;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.filter.FilterExpression;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.listener.MessageListenerType;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ConsumeService;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.NextOffsetRecord;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.PushConsumerImpl;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageExt;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageHookPoint;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageHookPointStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageImpl;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageImplAccessor;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageInterceptor;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageInterceptorContext;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageQueue;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.protocol.SystemAttribute;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.route.Endpoints;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.SimpleFuture;
import com.aliyun.openservices.ons.shaded.org.slf4j.Logger;
import com.aliyun.openservices.ons.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ProcessQueueImpl
implements ProcessQueue {
    public static final long RECEIVE_LONG_POLLING_TIMEOUT_MILLIS = 30000L;
    public static final long RECEIVE_LATER_DELAY_MILLIS = 1000L;
    public static final long PULL_LONG_POLLING_TIMEOUT_MILLIS = 30000L;
    public static final long PULL_LATER_DELAY_MILLIS = 1000L;
    public static final long MAX_IDLE_MILLIS = 2L * Math.max(30000L, 30000L);
    public static final long ACK_FIFO_MESSAGE_DELAY_MILLIS = 100L;
    public static final long FORWARD_FIFO_MESSAGE_TO_DLQ_DELAY_MILLIS = 100L;
    private static final Logger log = LoggerFactory.getLogger(ProcessQueueImpl.class);
    private volatile boolean dropped;
    private final MessageQueue mq;
    private final FilterExpression filterExpression;
    private final PushConsumerImpl consumerImpl;
    @GuardedBy(value="pendingMessagesLock")
    private final List<MessageExt> pendingMessages;
    private final ReadWriteLock pendingMessagesLock;
    @GuardedBy(value="inflightMessagesLock")
    private final List<MessageExt> inflightMessages;
    private final ReadWriteLock inflightMessagesLock;
    private final NextOffsetRecord nextOffsetRecord;
    private final AtomicLong cachedMessagesBytes;
    private final AtomicBoolean fifoConsumptionOccupied;
    private volatile long activityNanoTime = System.nanoTime();
    private volatile long cacheFullNanoTime = Long.MIN_VALUE;

    public ProcessQueueImpl(PushConsumerImpl consumerImpl, MessageQueue mq, FilterExpression filterExpression) {
        this.consumerImpl = consumerImpl;
        this.mq = mq;
        this.filterExpression = filterExpression;
        this.dropped = false;
        this.pendingMessages = new ArrayList<MessageExt>();
        this.pendingMessagesLock = new ReentrantReadWriteLock();
        this.inflightMessages = new ArrayList<MessageExt>();
        this.inflightMessagesLock = new ReentrantReadWriteLock();
        this.cachedMessagesBytes = new AtomicLong(0L);
        this.fifoConsumptionOccupied = new AtomicBoolean(false);
        this.nextOffsetRecord = new NextOffsetRecord();
    }

    @Override
    public void drop() {
        this.dropped = true;
    }

    private boolean fifoConsumptionInbound() {
        return this.fifoConsumptionOccupied.compareAndSet(false, true);
    }

    private void fifoConsumptionOutbound() {
        this.fifoConsumptionOccupied.compareAndSet(true, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void cacheMessages(List<MessageExt> messageExtList) {
        ArrayList<Long> offsetList = new ArrayList<Long>();
        MessageListenerType listenerType = this.consumerImpl.getMessageListener().getListenerType();
        MessageModel messageModel = this.consumerImpl.getMessageModel();
        String namespace = this.consumerImpl.getNamespace();
        this.pendingMessagesLock.writeLock().lock();
        try {
            for (MessageExt messageExt : messageExtList) {
                if (MessageImplAccessor.getMessageImpl(messageExt).isCorrupted()) {
                    if (MessageModel.BROADCASTING.equals((Object)messageModel)) {
                        log.error("Message is corrupted, ignore it in broadcasting mode, namespace={}, mq={}, messageId={}, clientId={}", namespace, this.mq, messageExt.getMsgId(), this.consumerImpl.getId());
                        continue;
                    }
                    if (MessageListenerType.CONCURRENTLY.equals((Object)listenerType)) {
                        log.error("Message is corrupted, nack it for concurrently consumption, namespace={}, mq={}, messageId={}, clientId={}", namespace, this.mq, messageExt.getMsgId(), this.consumerImpl.getId());
                        this.consumerImpl.nackMessage(messageExt);
                    }
                    if (!MessageListenerType.ORDERLY.equals((Object)listenerType)) continue;
                    log.error("Message is corrupted, forward it to DLQ for fifo consumption, namespace={}, mq={}, messageId={}, clientId={}", namespace, this.mq, messageExt.getMsgId(), this.consumerImpl.getId());
                    this.forwardToDeadLetterQueue(messageExt);
                    continue;
                }
                this.pendingMessages.add(messageExt);
                this.cachedMessagesBytes.addAndGet(messageExt.getBody().length);
                offsetList.add(messageExt.getQueueOffset());
            }
        }
        finally {
            this.pendingMessagesLock.writeLock().unlock();
        }
        if (!this.consumerImpl.isOffsetRecorded()) {
            return;
        }
        this.nextOffsetRecord.add(offsetList);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<MessageExt> tryTakeMessages(int batchMaxSize) {
        this.pendingMessagesLock.writeLock().lock();
        this.inflightMessagesLock.writeLock().lock();
        try {
            ArrayList<MessageExt> messageExtList = new ArrayList<MessageExt>();
            String topic = this.mq.getTopic();
            RateLimiter rateLimiter = this.consumerImpl.rateLimiter(topic);
            if (null == rateLimiter) {
                int actualSize = Math.min(this.pendingMessages.size(), batchMaxSize);
                ArrayList<MessageExt> subList = new ArrayList<MessageExt>(this.pendingMessages.subList(0, actualSize));
                messageExtList.addAll(subList);
                this.inflightMessages.addAll(subList);
                this.pendingMessages.removeAll(subList);
                ArrayList<MessageExt> arrayList = messageExtList;
                return arrayList;
            }
            while (this.pendingMessages.size() > 0 && messageExtList.size() < batchMaxSize && rateLimiter.tryAcquire()) {
                MessageExt messageExt = this.pendingMessages.iterator().next();
                messageExtList.add(messageExt);
                this.inflightMessages.add(messageExt);
                this.pendingMessages.remove(messageExt);
            }
            ArrayList<MessageExt> arrayList = messageExtList;
            return arrayList;
        }
        finally {
            this.inflightMessagesLock.writeLock().unlock();
            this.pendingMessagesLock.writeLock().unlock();
        }
    }

    private void eraseMessage(MessageExt messageExt) {
        ArrayList<MessageExt> messageExtList = new ArrayList<MessageExt>();
        messageExtList.add(messageExt);
        this.eraseMessages(messageExtList);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Optional<MessageExt> tryTakeFifoMessage() {
        this.pendingMessagesLock.writeLock().lock();
        this.inflightMessagesLock.writeLock().lock();
        try {
            if (this.pendingMessages.isEmpty()) {
                Optional<MessageExt> optional = Optional.absent();
                return optional;
            }
            if (!this.fifoConsumptionInbound()) {
                log.debug("Fifo consumption task are not finished, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
                Optional<MessageExt> optional = Optional.absent();
                return optional;
            }
            String topic = this.mq.getTopic();
            RateLimiter rateLimiter = this.consumerImpl.rateLimiter(topic);
            if (null == rateLimiter) {
                MessageExt first = this.pendingMessages.iterator().next();
                this.pendingMessages.remove(first);
                this.inflightMessages.add(first);
                Optional<MessageExt> optional = Optional.of(first);
                return optional;
            }
            if (!rateLimiter.tryAcquire()) {
                this.fifoConsumptionOutbound();
                Optional<MessageExt> first = Optional.absent();
                return first;
            }
            MessageExt first = this.pendingMessages.iterator().next();
            this.pendingMessages.remove(first);
            this.inflightMessages.add(first);
            Optional<MessageExt> optional = Optional.of(first);
            return optional;
        }
        finally {
            this.inflightMessagesLock.writeLock().unlock();
            this.pendingMessagesLock.writeLock().unlock();
        }
    }

    @Override
    public void doStats() {
        this.pendingMessagesLock.readLock().lock();
        this.inflightMessagesLock.readLock().lock();
        try {
            log.info("clientId={}, namespace={}, mq={}, pendingMessageQuantity={}, inflightMessageQuantity={}, cachedMessagesBytes={}", this.consumerImpl.getId(), this.consumerImpl.getNamespace(), this.mq, this.pendingMessages.size(), this.inflightMessages.size(), this.cachedMessagesBytes.get());
        }
        finally {
            this.inflightMessagesLock.readLock().unlock();
            this.pendingMessagesLock.readLock().unlock();
        }
    }

    @Override
    public void eraseFifoMessage(final MessageExt messageExt, ConsumeStatus status) {
        this.statsConsumptionStatus(status);
        MessageModel messageModel = this.consumerImpl.getMessageModel();
        if (MessageModel.BROADCASTING.equals((Object)messageModel)) {
            this.eraseMessage(messageExt);
            this.fifoConsumptionOutbound();
            return;
        }
        final int maxAttempts = this.consumerImpl.getMaxDeliveryAttempts();
        int attempt = messageExt.getDeliveryAttempt();
        ConsumeService consumeService = this.consumerImpl.getConsumeService();
        final String namespace = this.consumerImpl.getNamespace();
        if (ConsumeStatus.ERROR.equals((Object)status) && attempt < maxAttempts) {
            MessageImpl messageImpl = MessageImplAccessor.getMessageImpl(messageExt);
            SystemAttribute systemAttribute = messageImpl.getSystemAttribute();
            systemAttribute.setDeliveryAttempt(1 + attempt);
            long fifoConsumptionSuspendTimeMillis = this.consumerImpl.getFifoConsumptionSuspendTimeMillis();
            log.debug("Prepare to redeliver the fifo message because of consumption failure, maxAttempt={}, attempt={}, namespace={}, mq={}, messageId={}, suspendTime={}ms, clientId={}", maxAttempts, messageExt.getDeliveryAttempt(), namespace, this.mq, messageExt.getMsgId(), fifoConsumptionSuspendTimeMillis, this.consumerImpl.getId());
            ListenableFuture<ConsumeStatus> future = consumeService.consume(messageExt, fifoConsumptionSuspendTimeMillis, TimeUnit.MILLISECONDS);
            Futures.addCallback(future, new FutureCallback<ConsumeStatus>(){

                @Override
                public void onSuccess(ConsumeStatus consumeStatus) {
                    ProcessQueueImpl.this.eraseFifoMessage(messageExt, consumeStatus);
                }

                @Override
                public void onFailure(Throwable t) {
                    log.error("[Bug] Exception raised while fifo message redelivery, namespace={}, mq={}, messageId={}, attempt={}, maxAttempts={}, clientId={}", namespace, ProcessQueueImpl.this.mq, messageExt.getMsgId(), messageExt.getDeliveryAttempt(), maxAttempts, ProcessQueueImpl.this.consumerImpl.getId(), t);
                }
            }, MoreExecutors.directExecutor());
            return;
        }
        boolean ok = ConsumeStatus.OK.equals((Object)status);
        if (!ok) {
            log.info("Failed to consume fifo message finally, run out of attempt times, maxAttempts={}, attempt={}, namespace={}, mq={}, messageId={}, clientId={}", maxAttempts, attempt, namespace, this.mq, messageExt.getMsgId(), this.consumerImpl.getId());
        }
        SimpleFuture future = ok ? this.ackFifoMessage(messageExt) : this.forwardToDeadLetterQueue(messageExt);
        future.addListener(() -> {
            this.eraseMessage(messageExt);
            this.fifoConsumptionOutbound();
            consumeService.signal();
        }, this.consumerImpl.getConsumptionExecutor());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void eraseMessages(List<MessageExt> messageExtList) {
        ArrayList<Long> offsetList = new ArrayList<Long>();
        this.inflightMessagesLock.writeLock().lock();
        try {
            for (MessageExt messageExt : messageExtList) {
                if (this.inflightMessages.remove(messageExt)) {
                    this.cachedMessagesBytes.addAndGet(-messageExt.getBody().length);
                }
                offsetList.add(messageExt.getQueueOffset());
            }
        }
        finally {
            this.inflightMessagesLock.writeLock().unlock();
        }
        if (!this.consumerImpl.isOffsetRecorded()) {
            return;
        }
        this.nextOffsetRecord.remove(offsetList);
        Optional<Long> optionalOffset = this.nextOffsetRecord.next();
        if (!optionalOffset.isPresent()) {
            return;
        }
        Long offset = optionalOffset.get();
        this.consumerImpl.updateOffset(this.mq, offset);
    }

    @Override
    public void eraseMessages(List<MessageExt> messageExtList, ConsumeStatus status) {
        this.statsConsumptionStatus(messageExtList.size(), status);
        this.eraseMessages(messageExtList);
        MessageModel messageModel = this.consumerImpl.getMessageModel();
        if (MessageModel.CLUSTERING.equals((Object)messageModel)) {
            if (ConsumeStatus.OK.equals((Object)status)) {
                for (MessageExt messageExt : messageExtList) {
                    this.ackMessage(messageExt);
                }
                return;
            }
            for (MessageExt messageExt : messageExtList) {
                int attempt;
                int maxAttempts = this.consumerImpl.getMaxDeliveryAttempts();
                if (maxAttempts <= (attempt = messageExt.getDeliveryAttempt())) {
                    log.error("Failed to consume message finally, run out of attempt times, maxAttempts={}, attempt={}, namespace={}, mq={}, messageId={}, clientId={}", maxAttempts, attempt, this.consumerImpl.getNamespace(), this.mq, messageExt.getMsgId(), this.consumerImpl.getId());
                }
                this.consumerImpl.nackMessage(messageExt);
            }
        }
    }

    private void onReceiveMessageResult(ReceiveMessageResult result) {
        ReceiveStatus receiveStatus = result.getReceiveStatus();
        List<MessageExt> messagesFound = result.getMessagesFound();
        Endpoints endpoints = result.getEndpoints();
        switch (receiveStatus) {
            case OK: {
                if (!messagesFound.isEmpty()) {
                    this.cacheMessages(messagesFound);
                    this.consumerImpl.getReceivedMessagesQuantity().getAndAdd(messagesFound.size());
                    this.consumerImpl.getConsumeService().signal();
                }
                log.debug("Receive message with OK, namespace={}, mq={}, endpoints={}, messages found count={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, endpoints, messagesFound.size(), this.consumerImpl.getId());
                this.receiveMessage();
                break;
            }
            default: {
                log.error("Receive message with status={}, namespace={}, mq={}, endpoints={}, messages found count={}, clientId={}", new Object[]{receiveStatus, this.consumerImpl.getNamespace(), this.mq, endpoints, messagesFound.size(), this.consumerImpl.getId()});
                this.receiveMessageLater();
            }
        }
    }

    private void onPullMessageResult(PullMessageResult result) {
        PullStatus pullStatus = result.getPullStatus();
        List<MessageExt> messagesFound = result.getMessagesFound();
        switch (pullStatus) {
            case OK: {
                if (!messagesFound.isEmpty()) {
                    this.cacheMessages(messagesFound);
                    this.consumerImpl.getPulledMessagesQuantity().getAndAdd(messagesFound.size());
                    this.consumerImpl.getConsumeService().signal();
                }
                log.debug("Pull message with OK, namespace={}, mq={}, messages found count={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, messagesFound.size(), this.consumerImpl.getId());
                this.pullMessage(result.getNextBeginOffset());
                break;
            }
            default: {
                log.error("Pull message with status={}, namespace={}, mq={}, messages found count={}, clientId={}", new Object[]{pullStatus, this.consumerImpl.getNamespace(), this.mq, messagesFound.size(), this.consumerImpl.getId()});
                this.pullMessageLater(result.getNextBeginOffset());
            }
        }
    }

    @VisibleForTesting
    public void receiveMessage() {
        if (this.dropped) {
            log.debug("Process queue has been dropped, no longer receive message, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
            return;
        }
        if (this.isCacheFull()) {
            log.warn("Process queue cache is full, would receive message later, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
            this.receiveMessageLater();
            return;
        }
        this.receiveMessageImmediately();
    }

    public void receiveMessageLater() {
        ScheduledExecutorService scheduler = this.consumerImpl.getScheduler();
        try {
            scheduler.schedule(this::receiveMessage, 1000L, TimeUnit.MILLISECONDS);
        }
        catch (Throwable t) {
            if (scheduler.isShutdown()) {
                return;
            }
            log.error("[Bug] Failed to schedule receive message request, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId(), t);
            this.receiveMessageLater();
        }
    }

    public boolean isCacheFull() {
        long actualCachedMessagesBytes;
        long actualMessagesQuantity = this.cachedMessagesQuantity();
        int cachedMessageQuantityThresholdPerQueue = this.consumerImpl.cachedMessagesQuantityThresholdPerQueue();
        if ((long)cachedMessageQuantityThresholdPerQueue <= actualMessagesQuantity) {
            log.warn("Process queue total cached messages quantity exceeds the threshold, threshold={}, actual={}, namespace={}, mq={}, clientId={}", cachedMessageQuantityThresholdPerQueue, actualMessagesQuantity, this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
            this.cacheFullNanoTime = System.nanoTime();
            return true;
        }
        int cachedMessagesBytesPerQueue = this.consumerImpl.cachedMessagesBytesThresholdPerQueue();
        if ((long)cachedMessagesBytesPerQueue <= (actualCachedMessagesBytes = this.cachedMessageBytes())) {
            log.warn("Process queue total cached messages memory exceeds the threshold, threshold={} bytes, actual={} bytes, namespace={}, mq={}, clientId={}", cachedMessagesBytesPerQueue, actualCachedMessagesBytes, this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
            this.cacheFullNanoTime = System.nanoTime();
            return true;
        }
        return false;
    }

    @Override
    public boolean expired() {
        long idleMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.activityNanoTime);
        if (idleMillis < MAX_IDLE_MILLIS) {
            return false;
        }
        long cacheFullIdleMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.cacheFullNanoTime);
        if (cacheFullIdleMillis < MAX_IDLE_MILLIS) {
            return false;
        }
        log.warn("Process queue is idle, reception idle time={}ms, cache full idle time={}ms, namespace={}, mq={}, clientId={}", idleMillis, cacheFullIdleMillis, this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
        return true;
    }

    private ListenableFuture<Long> queryOffset() {
        QueryOffsetRequest.Builder builder = QueryOffsetRequest.newBuilder();
        switch (this.consumerImpl.getConsumeFromWhere()) {
            case CONSUME_FROM_TIMESTAMP: {
                builder.setPolicy(QueryOffsetPolicy.TIME_POINT);
                builder.setTimePoint(Timestamps.fromMillis(this.consumerImpl.getConsumeFromTimeMillis()));
                break;
            }
            case CONSUME_FROM_FIRST_OFFSET: {
                builder.setPolicy(QueryOffsetPolicy.BEGINNING);
                break;
            }
            default: {
                builder.setPolicy(QueryOffsetPolicy.END);
            }
        }
        builder.setPartition(this.getPbPartition());
        QueryOffsetRequest request = builder.build();
        Endpoints endpoints = this.mq.getPartition().getBroker().getEndpoints();
        return this.consumerImpl.queryOffset(request, endpoints);
    }

    private void pullMessageImmediately() {
        ListenableFuture<Long> future;
        if (this.consumerImpl.isOffsetRecorded()) {
            try {
                Optional<Long> optionalOffset = this.consumerImpl.readOffset(this.mq);
                if (optionalOffset.isPresent()) {
                    this.pullMessage(optionalOffset.get());
                    return;
                }
            }
            catch (Throwable t) {
                log.error("Exception raised while reading offset from offset store, drop message queue, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId(), t);
                this.consumerImpl.dropProcessQueue(this.mq);
                return;
            }
        }
        log.info("Offset not found, try to query offset from remote, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
        try {
            future = this.queryOffset();
        }
        catch (Throwable t) {
            SettableFuture<Long> future0 = SettableFuture.create();
            future0.setException(t);
            future = future0;
        }
        final Endpoints endpoints = this.mq.getPartition().getBroker().getEndpoints();
        Futures.addCallback(future, new FutureCallback<Long>(){

            @Override
            public void onSuccess(Long offset) {
                log.info("Query offset successfully, namespace={}, mq={}, endpoints={}, offset={}, clientId={}", ProcessQueueImpl.this.consumerImpl.getNamespace(), ProcessQueueImpl.this.mq, endpoints, offset, ProcessQueueImpl.this.consumerImpl.getId());
                if (ProcessQueueImpl.this.consumerImpl.isOffsetRecorded()) {
                    ProcessQueueImpl.this.consumerImpl.updateOffset(ProcessQueueImpl.this.mq, offset);
                }
                ProcessQueueImpl.this.pullMessage(offset);
            }

            @Override
            public void onFailure(Throwable t) {
                log.error("Exception raised while querying offset to pull, drop message queue, namespace={}, mq={}, endpoints={}, clientId={}", ProcessQueueImpl.this.consumerImpl.getNamespace(), ProcessQueueImpl.this.mq, endpoints, ProcessQueueImpl.this.consumerImpl.getId(), t);
                ProcessQueueImpl.this.consumerImpl.dropProcessQueue(ProcessQueueImpl.this.mq);
            }
        }, MoreExecutors.directExecutor());
    }

    private void pullMessageImmediately(final long offset) {
        if (!this.consumerImpl.isRunning()) {
            log.info("Stop to pull message because consumer is not running, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
            return;
        }
        try {
            final Endpoints endpoints = this.mq.getPartition().getBroker().getEndpoints();
            PullMessageRequest request = this.wrapPullMessageRequest(offset);
            this.activityNanoTime = System.nanoTime();
            final MessageInterceptorContext preContext = MessageInterceptorContext.builder().setBatchSize(request.getBatchSize()).setTopic(this.mq.getTopic()).build();
            this.consumerImpl.intercept(MessageHookPoint.PRE_PULL, preContext);
            final Stopwatch stopwatch = Stopwatch.createStarted();
            ListenableFuture<PullMessageResult> future = this.consumerImpl.pullMessage(request, endpoints, 30000L);
            Futures.addCallback(future, new FutureCallback<PullMessageResult>(){

                @Override
                public void onSuccess(PullMessageResult pullMessageResult) {
                    long duration = stopwatch.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
                    List<MessageExt> messagesFound = pullMessageResult.getMessagesFound();
                    MessageHookPointStatus status = PullStatus.OK.equals((Object)pullMessageResult.getPullStatus()) ? MessageHookPointStatus.OK : MessageHookPointStatus.ERROR;
                    MessageInterceptorContext postContext = preContext.toBuilder().setDuration(duration).setStatus(status).build();
                    if (messagesFound.isEmpty()) {
                        ProcessQueueImpl.this.consumerImpl.intercept(MessageHookPoint.POST_PULL, postContext);
                    }
                    for (MessageExt messageExt : messagesFound) {
                        ProcessQueueImpl.this.consumerImpl.intercept(MessageHookPoint.POST_PULL, messageExt, postContext);
                    }
                    try {
                        ProcessQueueImpl.this.onPullMessageResult(pullMessageResult);
                    }
                    catch (Throwable t) {
                        log.error("[Bug] Exception raised while handling pull result, would pull later, namespace={} mq={}, endpoints={}, clientId={}", ProcessQueueImpl.this.consumerImpl.getNamespace(), ProcessQueueImpl.this.mq, endpoints, ProcessQueueImpl.this.consumerImpl.getId(), t);
                        ProcessQueueImpl.this.pullMessageLater(offset);
                    }
                }

                @Override
                public void onFailure(Throwable t) {
                    long duration = stopwatch.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
                    MessageInterceptorContext context = preContext.toBuilder().setDuration(duration).setStatus(MessageHookPointStatus.ERROR).setThrowable(t).build();
                    ProcessQueueImpl.this.consumerImpl.intercept(MessageHookPoint.POST_PULL, context);
                    log.error("Exception raised while pull message, would pull later, namespace={}, mq={}, endpoints={}, clientId={}", ProcessQueueImpl.this.consumerImpl.getNamespace(), ProcessQueueImpl.this.mq, endpoints, ProcessQueueImpl.this.consumerImpl.getId(), t);
                    ProcessQueueImpl.this.pullMessageLater(offset);
                }
            }, MoreExecutors.directExecutor());
            this.consumerImpl.getPullTimes().getAndIncrement();
        }
        catch (Throwable t) {
            log.error("Exception raised while pull message, would pull message later, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId(), t);
            this.pullMessageLater(offset);
        }
    }

    public void pullMessageLater(long offset) {
        ScheduledExecutorService scheduler = this.consumerImpl.getScheduler();
        try {
            scheduler.schedule(() -> this.pullMessage(offset), 1000L, TimeUnit.MILLISECONDS);
        }
        catch (Throwable t) {
            if (scheduler.isShutdown()) {
                return;
            }
            log.error("[Bug] Failed to schedule pull message request, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId(), t);
            this.pullMessageLater(offset);
        }
    }

    private void pullMessage(long offset) {
        String id = this.consumerImpl.getId();
        if (this.dropped) {
            log.info("Process queue has been dropped, no longer pull message, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, id);
            return;
        }
        if (this.isCacheFull()) {
            log.warn("Process queue cache is full, would pull message later, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, id);
            this.pullMessageLater(offset);
            return;
        }
        this.pullMessageImmediately(offset);
    }

    private int getMaxAwaitBatchSize() {
        int cacheBufferSize = Math.max(this.consumerImpl.cachedMessagesQuantityThresholdPerQueue() - this.cachedMessagesQuantity(), 1);
        return Math.min(cacheBufferSize, this.consumerImpl.getMaxAwaitBatchSizePerQueue());
    }

    @Override
    public void fetchMessageImmediately() {
        if (MessageModel.CLUSTERING.equals((Object)this.consumerImpl.getMessageModel())) {
            this.receiveMessageImmediately();
            return;
        }
        this.pullMessageImmediately();
    }

    ReceiveMessageRequest wrapReceiveMessageRequest() {
        int maxAwaitBatchSize = this.getMaxAwaitBatchSize();
        Duration invisibleDuration = Durations.fromMillis(this.consumerImpl.getConsumptionTimeoutMillis());
        Duration maxAwaitTimeMillis = Durations.fromMillis(this.consumerImpl.getMaxAwaitTimeMillisPerQueue());
        ReceiveMessageRequest.Builder builder = ReceiveMessageRequest.newBuilder().setGroup(this.consumerImpl.getPbGroup()).setClientId(this.consumerImpl.getId()).setPartition(this.getPbPartition()).setBatchSize(maxAwaitBatchSize).setInvisibleDuration(invisibleDuration).setAwaitTime(maxAwaitTimeMillis);
        switch (this.consumerImpl.getConsumeFromWhere()) {
            case CONSUME_FROM_FIRST_OFFSET: {
                builder.setConsumePolicy(ConsumePolicy.PLAYBACK);
                break;
            }
            case CONSUME_FROM_TIMESTAMP: {
                builder.setConsumePolicy(ConsumePolicy.TARGET_TIMESTAMP);
                break;
            }
            case CONSUME_FROM_MAX_OFFSET: {
                builder.setConsumePolicy(ConsumePolicy.DISCARD);
                break;
            }
            default: {
                builder.setConsumePolicy(ConsumePolicy.RESUME);
            }
        }
        builder.setFilterExpression(this.getPbFilterExpression());
        builder.setFifoFlag(MessageListenerType.ORDERLY.equals((Object)this.consumerImpl.getMessageListener().getListenerType()));
        return builder.build();
    }

    PullMessageRequest wrapPullMessageRequest(long offset) {
        int maxAwaitBatchSize = this.getMaxAwaitBatchSize();
        long maxAwaitTimeMillis = this.consumerImpl.getMaxAwaitTimeMillisPerQueue();
        return PullMessageRequest.newBuilder().setGroup(this.consumerImpl.getPbGroup()).setPartition(this.getPbPartition()).setOffset(offset).setBatchSize(maxAwaitBatchSize).setAwaitTime(Durations.fromMillis(maxAwaitTimeMillis)).setFilterExpression(this.getPbFilterExpression()).setClientId(this.consumerImpl.getId()).build();
    }

    private void receiveMessageImmediately() {
        if (!this.consumerImpl.isRunning()) {
            log.info("Stop to receive message because consumer is not running, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
            return;
        }
        try {
            final Endpoints endpoints = this.mq.getPartition().getBroker().getEndpoints();
            ReceiveMessageRequest request = this.wrapReceiveMessageRequest();
            this.activityNanoTime = System.nanoTime();
            final MessageInterceptorContext preContext = MessageInterceptorContext.builder().setBatchSize(request.getBatchSize()).setTopic(this.mq.getTopic()).build();
            this.consumerImpl.intercept(MessageHookPoint.PRE_RECEIVE, preContext);
            final Stopwatch stopwatch = Stopwatch.createStarted();
            ListenableFuture<ReceiveMessageResult> future = this.consumerImpl.receiveMessage(request, endpoints, 30000L);
            Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>(){

                @Override
                public void onSuccess(ReceiveMessageResult receiveMessageResult) {
                    long elapsed = stopwatch.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
                    List<MessageExt> messagesFound = receiveMessageResult.getMessagesFound();
                    MessageHookPointStatus status = ReceiveStatus.OK.equals((Object)receiveMessageResult.getReceiveStatus()) ? MessageHookPointStatus.OK : MessageHookPointStatus.ERROR;
                    MessageInterceptorContext postContext = preContext.toBuilder().setDuration(elapsed).setStatus(status).build();
                    if (messagesFound.isEmpty()) {
                        ProcessQueueImpl.this.consumerImpl.intercept(MessageHookPoint.POST_RECEIVE, postContext);
                    }
                    for (MessageExt messageExt : messagesFound) {
                        ProcessQueueImpl.this.consumerImpl.intercept(MessageHookPoint.POST_RECEIVE, messageExt, postContext);
                    }
                    try {
                        ProcessQueueImpl.this.onReceiveMessageResult(receiveMessageResult);
                    }
                    catch (Throwable t) {
                        log.error("[Bug] Exception raised while handling receive result, would receive later, namespace={}, mq={}, endpoints={}, clientId={}", ProcessQueueImpl.this.consumerImpl.getNamespace(), ProcessQueueImpl.this.mq, endpoints, ProcessQueueImpl.this.consumerImpl.getId(), t);
                        ProcessQueueImpl.this.receiveMessageLater();
                    }
                }

                @Override
                public void onFailure(Throwable t) {
                    long elapsed = stopwatch.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
                    MessageInterceptorContext context = preContext.toBuilder().setDuration(elapsed).setStatus(MessageHookPointStatus.ERROR).setThrowable(t).build();
                    ProcessQueueImpl.this.consumerImpl.intercept(MessageHookPoint.POST_RECEIVE, context);
                    log.error("Exception raised while message reception, would receive later, namespace={}, mq={}, endpoints={}, clientId={}", ProcessQueueImpl.this.consumerImpl.getNamespace(), ProcessQueueImpl.this.mq, endpoints, ProcessQueueImpl.this.consumerImpl.getId(), t);
                    ProcessQueueImpl.this.receiveMessageLater();
                }
            }, MoreExecutors.directExecutor());
            this.consumerImpl.getReceptionTimes().getAndIncrement();
        }
        catch (Throwable t) {
            log.error("Exception raised while message reception, would receive later, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId(), t);
            this.receiveMessageLater();
        }
    }

    private SimpleFuture ackFifoMessage(MessageExt messageExt) {
        SimpleFuture future0 = new SimpleFuture();
        this.ackFifoMessage(messageExt, 1, future0);
        return future0;
    }

    private void ackFifoMessage(final MessageExt messageExt, final int attempt, final SimpleFuture future0) {
        final Endpoints endpoints = messageExt.getEndpoints();
        final String namespace = this.consumerImpl.getNamespace();
        final String clientId = this.consumerImpl.getId();
        ListenableFuture<AckMessageResponse> future = this.consumerImpl.ackMessage(messageExt, attempt);
        Futures.addCallback(future, new FutureCallback<AckMessageResponse>(){

            @Override
            public void onSuccess(AckMessageResponse response) {
                Status status = response.getCommon().getStatus();
                Code code = Code.forNumber(status.getCode());
                if (!Code.OK.equals(code)) {
                    log.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, attempt={}, messageId={}, namespace={}, mq={}, code={}, endpoints={}, status message=[{}]", clientId, attempt, messageExt.getMsgId(), namespace, ProcessQueueImpl.this.mq, code, endpoints, status.getMessage());
                    ProcessQueueImpl.this.ackFifoMessageLater(messageExt, 1 + attempt, future0);
                    return;
                }
                if (1 < attempt) {
                    log.info("Re-ack fifo message successfully, clientId={}, attempt={}, messageId={}, namespace={}, mq={}, endpoints={}", clientId, attempt, messageExt.getMsgId(), namespace, ProcessQueueImpl.this.mq, endpoints);
                }
                future0.markAsDone();
            }

            @Override
            public void onFailure(Throwable t) {
                log.error("Exception raised while ack fifo message, clientId={}, would attempt to re-ack later, attempt={}, messageId={}, namespace={}, mq={}, endpoints={}", clientId, attempt, messageExt.getMsgId(), namespace, ProcessQueueImpl.this.mq, endpoints, t);
                ProcessQueueImpl.this.ackFifoMessageLater(messageExt, 1 + attempt, future0);
            }
        }, MoreExecutors.directExecutor());
    }

    private void ackFifoMessageLater(MessageExt messageExt, int attempt, SimpleFuture future0) {
        String msgId = messageExt.getMsgId();
        String clientId = this.consumerImpl.getId();
        if (this.dropped) {
            log.info("Process queue was dropped, give up to ack message, namespace={}, mq={}, messageId={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, msgId, clientId);
            return;
        }
        ScheduledExecutorService scheduler = this.consumerImpl.getScheduler();
        try {
            scheduler.schedule(() -> this.ackFifoMessage(messageExt, attempt, future0), 100L, TimeUnit.MILLISECONDS);
        }
        catch (Throwable t) {
            if (scheduler.isShutdown()) {
                return;
            }
            log.error("[Bug] Failed to schedule ack fifo message request, namespace={}, mq={}, msgId={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, msgId, clientId);
            this.ackFifoMessageLater(messageExt, 1 + attempt, future0);
        }
    }

    private SimpleFuture forwardToDeadLetterQueue(MessageExt messageExt) {
        SimpleFuture future0 = new SimpleFuture();
        this.forwardToDeadLetterQueue(messageExt, 1, future0);
        return future0;
    }

    private void forwardToDeadLetterQueue(final MessageExt messageExt, final int attempt, final SimpleFuture future0) {
        ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future = this.consumerImpl.forwardMessageToDeadLetterQueue(messageExt, attempt);
        final String namespace = this.consumerImpl.getNamespace();
        final String clientId = this.consumerImpl.getId();
        Futures.addCallback(future, new FutureCallback<ForwardMessageToDeadLetterQueueResponse>(){

            @Override
            public void onSuccess(ForwardMessageToDeadLetterQueueResponse response) {
                Status status = response.getCommon().getStatus();
                Code code = Code.forNumber(status.getCode());
                if (!Code.OK.equals(code)) {
                    log.error("Failed to forward message to DLQ, would attempt to re-forward later, clientId={}, messageId={}, attempt={}, namespace={}, mq={}, code={}, status message=[{}]", clientId, messageExt.getMsgId(), attempt, namespace, ProcessQueueImpl.this.mq, code, status.getMessage());
                    ProcessQueueImpl.this.forwardToDeadLetterQueueLater(messageExt, 1 + attempt, future0);
                    return;
                }
                if (1 < attempt) {
                    log.info("Re-forward message to DLQ successfully, clientId={}, attempt={}, messageId={}, namespace={}, mq={}", clientId, attempt, messageExt.getMsgId(), namespace, ProcessQueueImpl.this.mq);
                }
                future0.markAsDone();
            }

            @Override
            public void onFailure(Throwable t) {
                log.error("Exception raised while forward message to DLQ, would attempt to re-forward later, clientId={}, attempt={}, messageId={}, namespace={}, mq={}", clientId, attempt, messageExt.getMsgId(), namespace, ProcessQueueImpl.this.mq, t);
                ProcessQueueImpl.this.forwardToDeadLetterQueueLater(messageExt, 1 + attempt, future0);
            }
        }, MoreExecutors.directExecutor());
    }

    private void forwardToDeadLetterQueueLater(MessageExt messageExt, int attempt, SimpleFuture future0) {
        String msgId = messageExt.getMsgId();
        String clientId = this.consumerImpl.getId();
        if (this.dropped) {
            log.info("Process queue was dropped, give up to forward message to DLQ, namespace={}, mq={}, messageId={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, msgId, clientId);
            return;
        }
        ScheduledExecutorService scheduler = this.consumerImpl.getScheduler();
        try {
            scheduler.schedule(() -> this.forwardToDeadLetterQueue(messageExt, attempt, future0), 100L, TimeUnit.MILLISECONDS);
        }
        catch (Throwable t) {
            if (scheduler.isShutdown()) {
                return;
            }
            log.error("[Bug] Failed to schedule DLQ message request, namespace={}, mq={}, msgId={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, msgId, clientId);
            this.forwardToDeadLetterQueueLater(messageExt, 1 + attempt, future0);
        }
    }

    public void ackMessage(final MessageExt messageExt) {
        ListenableFuture<AckMessageResponse> future = this.consumerImpl.ackMessage(messageExt);
        final String namespace = this.consumerImpl.getNamespace();
        final String clientId = this.consumerImpl.getId();
        Futures.addCallback(future, new FutureCallback<AckMessageResponse>(){

            @Override
            public void onSuccess(AckMessageResponse response) {
                Status status = response.getCommon().getStatus();
                Code code = Code.forNumber(status.getCode());
                if (!Code.OK.equals(code)) {
                    log.error("Failed to ack message, clientId={}, messageId={}, namespace={}, mq={}, code={}, status message=[{}]", clientId, messageExt.getMsgId(), namespace, ProcessQueueImpl.this.mq, code, status.getMessage());
                    return;
                }
                log.trace("Ack message successfully, clientId={}, messageId={}, namespace={}, mq={}", clientId, messageExt.getMsgId(), namespace, ProcessQueueImpl.this.mq);
            }

            @Override
            public void onFailure(Throwable t) {
                log.error("Exception raised while ack message, clientId={}, messageId={}, namespace={}, mq={}", clientId, messageExt.getMsgId(), namespace, ProcessQueueImpl.this.mq, t);
            }
        }, MoreExecutors.directExecutor());
    }

    private apache.rocketmq.v1.FilterExpression getPbFilterExpression() {
        ExpressionType expressionType = this.filterExpression.getExpressionType();
        FilterExpression.Builder expressionBuilder = apache.rocketmq.v1.FilterExpression.newBuilder();
        String expression = this.filterExpression.getExpression();
        expressionBuilder.setExpression(expression);
        switch (expressionType) {
            case SQL92: {
                return expressionBuilder.setType(FilterType.SQL).build();
            }
        }
        return expressionBuilder.setType(FilterType.TAG).build();
    }

    private Partition getPbPartition() {
        Resource protoTopic = Resource.newBuilder().setResourceNamespace(this.consumerImpl.getNamespace()).setName(this.mq.getTopic()).build();
        Broker broker = Broker.newBuilder().setName(this.mq.getBrokerName()).build();
        return Partition.newBuilder().setTopic(protoTopic).setId(this.mq.getQueueId()).setBroker(broker).build();
    }

    public int cachedMessagesQuantity() {
        this.pendingMessagesLock.readLock().lock();
        this.inflightMessagesLock.readLock().lock();
        try {
            int n = this.pendingMessages.size() + this.inflightMessages.size();
            return n;
        }
        finally {
            this.inflightMessagesLock.readLock().unlock();
            this.pendingMessagesLock.readLock().unlock();
        }
    }

    public int inflightMessagesQuantity() {
        this.inflightMessagesLock.readLock().lock();
        try {
            int n = this.inflightMessages.size();
            return n;
        }
        finally {
            this.inflightMessagesLock.readLock().unlock();
        }
    }

    public long cachedMessageBytes() {
        return this.cachedMessagesBytes.get();
    }

    private void statsConsumptionStatus(ConsumeStatus status) {
        this.statsConsumptionStatus(1, status);
    }

    private void statsConsumptionStatus(int messageQuantity, ConsumeStatus status) {
        if (ConsumeStatus.OK.equals((Object)status)) {
            this.consumerImpl.getConsumptionOkQuantity().addAndGet(messageQuantity);
            return;
        }
        this.consumerImpl.getConsumptionErrorQuantity().addAndGet(messageQuantity);
    }

    @Override
    public MessageQueue getMessageQueue() {
        return this.mq;
    }
}

