/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer;

import com.aliyun.openservices.ons.shaded.com.google.common.base.Stopwatch;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.ConsumeContext;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.ConsumeStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.listener.MessageListener;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageExt;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageHookPoint;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageHookPointStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageInterceptor;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageInterceptorContext;
import com.aliyun.openservices.ons.shaded.org.slf4j.Logger;
import com.aliyun.openservices.ons.shaded.org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.Callable;

public class ConsumeTask
implements Callable<ConsumeStatus> {
    private static final Logger log = LoggerFactory.getLogger(ConsumeTask.class);
    private final MessageInterceptor interceptor;
    private final MessageListener messageListener;
    private final List<MessageExt> messageExtList;

    public ConsumeTask(MessageInterceptor interceptor, MessageListener messageListener, List<MessageExt> messageExtList) {
        this.interceptor = interceptor;
        this.messageListener = messageListener;
        this.messageExtList = messageExtList;
    }

    @Override
    public ConsumeStatus call() {
        ConsumeStatus status;
        for (MessageExt messageExt : this.messageExtList) {
            int attempt = messageExt.getDeliveryAttempt();
            MessageInterceptorContext context = MessageInterceptorContext.builder().setTopic(messageExt.getTopic()).setAttempt(attempt).build();
            this.interceptor.intercept(MessageHookPoint.PRE_MESSAGE_CONSUMPTION, messageExt, context);
        }
        ConsumeContext consumeContext = new ConsumeContext();
        Stopwatch stopwatch = Stopwatch.createStarted();
        try {
            status = this.messageListener.consume(this.messageExtList, consumeContext);
        }
        catch (Throwable t) {
            status = ConsumeStatus.ERROR;
            log.error("Message listener raised an exception while consuming messages.", t);
        }
        if (null == status) {
            log.error("Message listener returns NPE for consume status");
            status = ConsumeStatus.ERROR;
        }
        long duration = stopwatch.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
        MessageHookPointStatus hookPointStatus = ConsumeStatus.OK.equals((Object)status) ? MessageHookPointStatus.OK : MessageHookPointStatus.ERROR;
        int batchSize = this.messageExtList.size();
        for (MessageExt messageExt : this.messageExtList) {
            int attempt = messageExt.getDeliveryAttempt();
            MessageInterceptorContext context = MessageInterceptorContext.builder().setTopic(messageExt.getTopic()).setAttempt(attempt).setDuration(duration).setBatchSize(batchSize).setStatus(hookPointStatus).build();
            this.interceptor.intercept(MessageHookPoint.POST_MESSAGE_CONSUMPTION, messageExt, context);
        }
        return status;
    }
}

