/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer;

import apache.rocketmq.v1.Message;
import apache.rocketmq.v1.NotifyClientTerminationRequest;
import apache.rocketmq.v1.PullMessageRequest;
import apache.rocketmq.v1.PullMessageResponse;
import apache.rocketmq.v1.QueryOffsetRequest;
import apache.rocketmq.v1.QueryOffsetResponse;
import apache.rocketmq.v1.ReceiveMessageRequest;
import apache.rocketmq.v1.ReceiveMessageResponse;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.Futures;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.ListenableFuture;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.MoreExecutors;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.SettableFuture;
import com.aliyun.openservices.ons.shaded.com.google.protobuf.util.Durations;
import com.aliyun.openservices.ons.shaded.com.google.protobuf.util.Timestamps;
import com.aliyun.openservices.ons.shaded.com.google.rpc.Code;
import com.aliyun.openservices.ons.shaded.com.google.rpc.Status;
import com.aliyun.openservices.ons.shaded.io.grpc.Metadata;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.PullMessageResult;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.PullStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.ReceiveMessageResult;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.ReceiveStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.exception.ClientException;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.exception.ErrorCode;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.ClientImpl;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageExt;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageImpl;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageImplAccessor;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.route.Endpoints;
import com.aliyun.openservices.ons.shaded.org.slf4j.Logger;
import com.aliyun.openservices.ons.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public abstract class ConsumerImpl
extends ClientImpl {
    private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);

    public ConsumerImpl(String group) throws ClientException {
        super(group);
    }

    @Override
    public NotifyClientTerminationRequest wrapNotifyClientTerminationRequest() {
        return NotifyClientTerminationRequest.newBuilder().setClientId(this.id).setConsumerGroup(this.getPbGroup()).build();
    }

    protected ListenableFuture<Long> queryOffset(QueryOffsetRequest request, Endpoints endpoints) {
        SettableFuture<Long> future0 = SettableFuture.create();
        try {
            Metadata metadata = this.sign();
            ListenableFuture<QueryOffsetResponse> future = this.clientManager.queryOffset(endpoints, metadata, request, this.ioTimeoutMillis, TimeUnit.MILLISECONDS);
            return Futures.transformAsync(future, response -> {
                Status status = response.getCommon().getStatus();
                Code code = Code.forNumber(status.getCode());
                if (!Code.OK.equals(code)) {
                    log.error("Failed to query offset, clientId={}, endpoints={}, code={}, status message=[{}]", this.id, endpoints, code, status.getMessage());
                    throw new ClientException(ErrorCode.SEEK_OFFSET_FAILURE, status.getMessage());
                }
                long offset = response.getOffset();
                future0.set(offset);
                return future0;
            }, MoreExecutors.directExecutor());
        }
        catch (Throwable t) {
            future0.setException(t);
            return future0;
        }
    }

    public ListenableFuture<PullMessageResult> pullMessage(PullMessageRequest request, Endpoints endpoints, long timeoutMillis) {
        SettableFuture<PullMessageResult> future0 = SettableFuture.create();
        try {
            Metadata metadata = this.sign();
            ListenableFuture<PullMessageResponse> future = this.clientManager.pullMessage(endpoints, metadata, request, timeoutMillis, TimeUnit.MILLISECONDS);
            return Futures.transform(future, response -> this.processPullMessageResponse(endpoints, (PullMessageResponse)response), MoreExecutors.directExecutor());
        }
        catch (Throwable t) {
            future0.setException(t);
            return future0;
        }
    }

    public PullMessageResult processPullMessageResponse(Endpoints endpoints, PullMessageResponse response) {
        PullStatus pullStatus;
        Status status = response.getCommon().getStatus();
        Code code = Code.forNumber(status.getCode());
        switch (null != code ? code : Code.UNKNOWN) {
            case OK: {
                pullStatus = PullStatus.OK;
                break;
            }
            case RESOURCE_EXHAUSTED: {
                pullStatus = PullStatus.RESOURCE_EXHAUSTED;
                log.warn("Too many request in server, clientId={}, endpoints={}, status message=[{}]", this.id, endpoints, status.getMessage());
                break;
            }
            case DEADLINE_EXCEEDED: {
                pullStatus = PullStatus.DEADLINE_EXCEEDED;
                log.warn("Gateway timeout, clientId={}, endpoints={}, status message=[{}]", endpoints, this.id, status.getMessage());
                break;
            }
            case NOT_FOUND: {
                pullStatus = PullStatus.NOT_FOUND;
                log.warn("Target partition does not exist, clientId={}, endpoints={}, status message=[{}]", this.id, endpoints, status.getMessage());
                break;
            }
            case OUT_OF_RANGE: {
                pullStatus = PullStatus.OUT_OF_RANGE;
                log.warn("Pulled offset is out of range, clientId={}, endpoints={}, status message=[{}]", this.id, endpoints, status.getMessage());
                break;
            }
            default: {
                pullStatus = PullStatus.INTERNAL;
                log.warn("Pull response indicated server-side error, clientId={}, endpoints={}, code={}, status message=[{}]", this.id, endpoints, code, status.getMessage());
            }
        }
        ArrayList<MessageExt> msgFoundList = new ArrayList<MessageExt>();
        if (PullStatus.OK.equals((Object)pullStatus)) {
            List<Message> messageList = response.getMessagesList();
            for (Message message : messageList) {
                MessageImpl messageImpl = MessageImplAccessor.wrapMessageImpl(message);
                msgFoundList.add(new MessageExt(messageImpl));
            }
        }
        return new PullMessageResult(pullStatus, response.getNextOffset(), response.getMinOffset(), response.getMaxOffset(), msgFoundList);
    }

    protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRequest request, Endpoints endpoints, long timeoutMillis) {
        SettableFuture<ReceiveMessageResult> future0 = SettableFuture.create();
        try {
            Metadata metadata = this.sign();
            ListenableFuture<ReceiveMessageResponse> future = this.clientManager.receiveMessage(endpoints, metadata, request, timeoutMillis, TimeUnit.MILLISECONDS);
            return Futures.transform(future, response -> this.processReceiveMessageResponse(endpoints, (ReceiveMessageResponse)response), MoreExecutors.directExecutor());
        }
        catch (Throwable t) {
            future0.setException(t);
            return future0;
        }
    }

    public ReceiveMessageResult processReceiveMessageResponse(Endpoints endpoints, ReceiveMessageResponse response) {
        ReceiveStatus receiveStatus;
        Status status = response.getCommon().getStatus();
        Code code = Code.forNumber(status.getCode());
        switch (null != code ? code : Code.UNKNOWN) {
            case OK: {
                receiveStatus = ReceiveStatus.OK;
                break;
            }
            case RESOURCE_EXHAUSTED: {
                receiveStatus = ReceiveStatus.RESOURCE_EXHAUSTED;
                log.warn("Too many request in server, clientId={}, endpoints={}, status message=[{}]", this.id, endpoints, status.getMessage());
                break;
            }
            case DEADLINE_EXCEEDED: {
                receiveStatus = ReceiveStatus.DEADLINE_EXCEEDED;
                log.warn("Gateway timeout, clientId={}, endpoints={}, status message=[{}]", this.id, endpoints, status.getMessage());
                break;
            }
            default: {
                receiveStatus = ReceiveStatus.INTERNAL;
                log.warn("Receive response indicated server-side error, clientId={}, endpoints={}, code={}, status message=[{}]", this.id, endpoints, code, status.getMessage());
            }
        }
        ArrayList<MessageExt> msgFoundList = new ArrayList<MessageExt>();
        if (ReceiveStatus.OK.equals((Object)receiveStatus)) {
            List<Message> messageList = response.getMessagesList();
            for (Message message : messageList) {
                MessageImpl messageImpl = MessageImplAccessor.wrapMessageImpl(message);
                messageImpl.getSystemAttribute().setEndpoints(endpoints);
                msgFoundList.add(new MessageExt(messageImpl));
            }
        }
        return new ReceiveMessageResult(endpoints, receiveStatus, Timestamps.toMillis(response.getDeliveryTimestamp()), Durations.toMillis(response.getInvisibleDuration()), msgFoundList);
    }

    @Override
    public void doHealthCheck() {
    }
}

