package com.tongtech.client.request;

import com.tongtech.client.factory.ThreadFactoryImpl;
import com.tongtech.client.remoting.InvokePullCallback;
import com.tongtech.client.remoting.common.RemotingHelper;
import com.tongtech.client.remoting.netty.NettyRemotingAbstract;
import com.tongtech.client.remoting.netty.NettyRequestProcessor;
import com.tongtech.client.remoting.protocol.RemotingCommand;
import com.tongtech.client.request.common.PullRequestResult;
import com.tongtech.client.request.common.RequestMessage;
import com.tongtech.client.request.inner.RequestConsumer;
import com.tongtech.client.request.listener.MessageListenerImpl;
import com.tongtech.client.request.listener.RequestContext;
import com.tongtech.netty.channel.ChannelHandlerContext;
import com.tongtech.slf4j.Logger;
import com.tongtech.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/* loaded from: input_file:com/tongtech/client/request/PullRequestMessageServiceProcessor.class */
public class PullRequestMessageServiceProcessor implements NettyRequestProcessor {
    private static Logger log = LoggerFactory.getLogger((Class<?>) PullRequestMessageServiceProcessor.class);
    private final RequestConsumer requestConsumer;
    private final MessageListenerImpl messageListener;
    private final ThreadPoolExecutor consumeExecutor;
    public static final int REQUEST_MESSAGE_SIZE = 10000;
    private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
    private final BlockingQueue<RequestMessage> requestMessageTables = new LinkedBlockingQueue(10000);
    private final BlockingQueue<Runnable> consumeRequestQueue = new LinkedBlockingQueue();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RequestConsumeMessageScheduledThread_"));

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/tongtech/client/request/PullRequestMessageServiceProcessor$ConsumeRequest.class */
    public class ConsumeRequest implements Runnable {
        private final PullRequestResult pullResult;

        public ConsumeRequest(PullRequestResult pullRequestResult) {
            this.pullResult = pullRequestResult;
        }

        @Override // java.lang.Runnable
        public void run() {
            MessageListenerImpl messageListenerImpl = PullRequestMessageServiceProcessor.this.messageListener;
            RequestContext requestContext = new RequestContext(PullRequestMessageServiceProcessor.this.requestConsumer.getBrokerInfo());
            PullRequestMessageServiceProcessor.log.info("RequestID:[{}],RequestMessageBodyLength:[{}],Broker:[{}:{}]----> [{}]", this.pullResult.getMsgFoundList().get(0).getRequestorID(), Integer.valueOf(this.pullResult.getMsgFoundList().get(0).getBody().length), PullRequestMessageServiceProcessor.this.requestConsumer.getBrokerInfo().getIpaddr6(), Integer.valueOf(PullRequestMessageServiceProcessor.this.requestConsumer.getBrokerInfo().getPort()), this.pullResult.getPullStatus().name());
            try {
                messageListenerImpl.getMessage(this.pullResult, requestContext);
            } catch (Throwable th) {
                PullRequestMessageServiceProcessor.log.warn("request consumeMessage exception: {} TLQServerId: {} Msgs: {} BrokerInfo: {}", RemotingHelper.exceptionSimpleDesc(th), PullRequestMessageServiceProcessor.this.requestConsumer.getConsumerId(), this.pullResult, PullRequestMessageServiceProcessor.this.requestConsumer.getBrokerInfo());
            }
        }
    }

    public PullRequestMessageServiceProcessor(RequestConsumer requestConsumer, MessageListenerImpl messageListenerImpl) {
        this.requestConsumer = requestConsumer;
        this.messageListener = messageListenerImpl;
        this.consumeExecutor = new ThreadPoolExecutor(this.requestConsumer.getConsumeThreadMin(), this.requestConsumer.getConsumeThreadMax(), 60000L, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("RequestConsumeMessageThread_"));
    }

    public void submitConsumeRequest(PullRequestResult pullRequestResult) {
        ConsumeRequest consumeRequest = new ConsumeRequest(pullRequestResult);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            submitConsumeRequestLater(consumeRequest);
        }
    }

    private void submitConsumeRequestLater(final ConsumeRequest consumeRequest) {
        this.scheduledExecutorService.schedule(new Runnable() { // from class: com.tongtech.client.request.PullRequestMessageServiceProcessor.1
            @Override // java.lang.Runnable
            public void run() {
                PullRequestMessageServiceProcessor.this.consumeExecutor.submit(consumeRequest);
            }
        }, NettyRemotingAbstract.LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
    }

    @Override // com.tongtech.client.remoting.netty.NettyRequestProcessor
    public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws Exception {
        return null;
    }

    public void takeAsyncMessages(int i, InvokePullCallback invokePullCallback, long j) {
        invokePullCallback.operationComplete(takeSyncMessages(i, j));
    }

    /* JADX WARN: Finally extract failed */
    public List<RequestMessage> takeSyncMessages(int i, long j) {
        ArrayList arrayList = new ArrayList(i);
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            try {
                if (!this.requestMessageTables.isEmpty()) {
                    for (int i2 = 0; i2 < i; i2++) {
                        RequestMessage poll = this.requestMessageTables.poll(j, TimeUnit.MILLISECONDS);
                        if (poll != null) {
                            arrayList.add(poll);
                        }
                    }
                }
                this.lockTreeMap.writeLock().unlock();
            } catch (Throwable th) {
                this.lockTreeMap.writeLock().unlock();
                throw th;
            }
        } catch (InterruptedException e) {
            log.error("take Request Messages exception", (Throwable) e);
        }
        return arrayList;
    }

    public void shutdown() {
        this.consumeExecutor.shutdown();
        this.scheduledExecutorService.shutdown();
    }
}
