/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl;

import apache.rocketmq.v1.AckMessageRequest;
import apache.rocketmq.v1.AckMessageResponse;
import apache.rocketmq.v1.EndTransactionRequest;
import apache.rocketmq.v1.EndTransactionResponse;
import apache.rocketmq.v1.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v1.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v1.HealthCheckRequest;
import apache.rocketmq.v1.HealthCheckResponse;
import apache.rocketmq.v1.HeartbeatRequest;
import apache.rocketmq.v1.HeartbeatResponse;
import apache.rocketmq.v1.NackMessageRequest;
import apache.rocketmq.v1.NackMessageResponse;
import apache.rocketmq.v1.NotifyClientTerminationRequest;
import apache.rocketmq.v1.NotifyClientTerminationResponse;
import apache.rocketmq.v1.PollCommandRequest;
import apache.rocketmq.v1.PollCommandResponse;
import apache.rocketmq.v1.PullMessageRequest;
import apache.rocketmq.v1.PullMessageResponse;
import apache.rocketmq.v1.QueryAssignmentRequest;
import apache.rocketmq.v1.QueryAssignmentResponse;
import apache.rocketmq.v1.QueryOffsetRequest;
import apache.rocketmq.v1.QueryOffsetResponse;
import apache.rocketmq.v1.QueryRouteRequest;
import apache.rocketmq.v1.QueryRouteResponse;
import apache.rocketmq.v1.ReceiveMessageRequest;
import apache.rocketmq.v1.ReceiveMessageResponse;
import apache.rocketmq.v1.ReportMessageConsumptionResultRequest;
import apache.rocketmq.v1.ReportMessageConsumptionResultResponse;
import apache.rocketmq.v1.ReportThreadStackTraceRequest;
import apache.rocketmq.v1.ReportThreadStackTraceResponse;
import apache.rocketmq.v1.SendMessageRequest;
import apache.rocketmq.v1.SendMessageResponse;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.AbstractIdleService;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.ListenableFuture;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.SettableFuture;
import com.aliyun.openservices.ons.shaded.com.google.errorprone.annotations.concurrent.GuardedBy;
import com.aliyun.openservices.ons.shaded.io.grpc.Metadata;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.exception.ClientException;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.exception.ErrorCode;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.Client;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.ClientManager;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.remoting.RpcClient;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.remoting.RpcClientImpl;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.route.Endpoints;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.ExecutorServices;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.MetadataUtils;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.ThreadFactoryImpl;
import com.aliyun.openservices.ons.shaded.org.slf4j.Logger;
import com.aliyun.openservices.ons.shaded.org.slf4j.LoggerFactory;
import com.aliyun.openservices.ons.shaded.org.slf4j.bridge.SLF4JBridgeHandler;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLException;

public class ClientManagerImpl
extends AbstractIdleService
implements ClientManager {
    public static final long RPC_CLIENT_MAX_IDLE_SECONDS = 1800L;
    public static final long HEALTH_CHECK_PERIOD_SECONDS = 15L;
    public static final long IDLE_RPC_CLIENT_PERIOD_SECONDS = 60L;
    public static final long HEART_BEAT_PERIOD_SECONDS = 10L;
    public static final long LOG_STATS_PERIOD_SECONDS = 60L;
    private static final Logger log = LoggerFactory.getLogger(ClientManagerImpl.class);
    private final String id;
    @GuardedBy(value="rpcClientTableLock")
    private final Map<Endpoints, RpcClient> rpcClientTable;
    private final ReadWriteLock rpcClientTableLock;
    private final ConcurrentMap<String, Client> clientTable;
    private final ScheduledExecutorService scheduler;
    private final ExecutorService asyncWorker;

    public ClientManagerImpl(String id) {
        this.id = id;
        this.rpcClientTable = new HashMap<Endpoints, RpcClient>();
        this.rpcClientTableLock = new ReentrantReadWriteLock();
        this.clientTable = new ConcurrentHashMap<String, Client>();
        this.scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new ThreadFactoryImpl("ClientScheduler"));
        this.asyncWorker = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("ClientAsyncWorker"));
    }

    @Override
    public void registerClient(Client client) {
        this.clientTable.put(client.getId(), client);
    }

    @Override
    public void unregisterClient(Client client) {
        this.clientTable.remove(client.getId());
    }

    @Override
    public boolean isEmpty() {
        return this.clientTable.isEmpty();
    }

    @Override
    protected void startUp() {
        log.info("Begin to start the client manager, clientManagerId={}", (Object)this.id);
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                this.doHealthCheck();
            }
            catch (Throwable t) {
                log.error("Exception raised while health check.", t);
            }
        }, 5L, 15L, TimeUnit.SECONDS);
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                this.clearIdleRpcClients();
            }
            catch (Throwable t) {
                log.error("Exception raised while clear idle rpc clients.", t);
            }
        }, 5L, 60L, TimeUnit.SECONDS);
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                this.doHeartbeat();
            }
            catch (Throwable t) {
                log.error("Exception raised while heartbeat.", t);
            }
        }, 1L, 10L, TimeUnit.SECONDS);
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                this.doLogStats();
            }
            catch (Throwable t) {
                log.error("Exception raised while log stats", t);
            }
        }, 1L, 60L, TimeUnit.SECONDS);
        log.info("The client manager starts successfully, clientManagerId={}", (Object)this.id);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void shutDown() throws Exception {
        log.info("Begin to shutdown the client manager, clientManagerId={}", (Object)this.id);
        this.scheduler.shutdown();
        if (!ExecutorServices.awaitTerminated(this.scheduler)) {
            log.error("[Bug] Timeout to shutdown the client scheduler, clientManagerId={}", (Object)this.id);
        } else {
            log.info("Shutdown the client scheduler successfully, clientManagerId={}", (Object)this.id);
        }
        this.rpcClientTableLock.writeLock().lock();
        try {
            Iterator<Map.Entry<Endpoints, RpcClient>> it = this.rpcClientTable.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Endpoints, RpcClient> entry = it.next();
                RpcClient rpcClient = entry.getValue();
                it.remove();
                rpcClient.shutdown();
            }
        }
        finally {
            this.rpcClientTableLock.writeLock().unlock();
        }
        log.info("Shutdown all rpc client(s) successfully, clientManagerId={}", (Object)this.id);
        this.asyncWorker.shutdown();
        if (!ExecutorServices.awaitTerminated(this.asyncWorker)) {
            log.error("[Bug] Timeout to shutdown the client async worker, clientManagerId={}", (Object)this.id);
        } else {
            log.info("Shutdown the client async worker successfully, clientManagerId={}", (Object)this.id);
        }
        log.info("Shutdown the client manager successfully, clientManagerId={}", (Object)this.id);
    }

    private void doHealthCheck() {
        log.info("Start to do health check for a new round, clientManagerId={}", (Object)this.id);
        for (Client client : this.clientTable.values()) {
            client.doHealthCheck();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearIdleRpcClients() throws InterruptedException {
        this.rpcClientTableLock.writeLock().lock();
        try {
            Iterator<Map.Entry<Endpoints, RpcClient>> it = this.rpcClientTable.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Endpoints, RpcClient> entry = it.next();
                Endpoints endpoints = entry.getKey();
                RpcClient client = entry.getValue();
                long idleSeconds = client.idleSeconds();
                if (idleSeconds <= 1800L) continue;
                it.remove();
                client.shutdown();
                log.info("Rpc client has been idle for a long time, endpoints={}, idleSeconds={}, maxIdleSeconds={}", endpoints, idleSeconds, 1800L);
            }
        }
        finally {
            this.rpcClientTableLock.writeLock().unlock();
        }
    }

    private void doHeartbeat() {
        for (Client client : this.clientTable.values()) {
            client.doHeartbeat();
        }
    }

    private void doLogStats() {
        log.info("Start to log stats for a new round, clientVersion={}, clientWrapperVersion={}, clientManagerId={}", MetadataUtils.getVersion(), MetadataUtils.getWrapperVersion(), this.id);
        for (Client client : this.clientTable.values()) {
            client.doStats();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private RpcClient getRpcClient(Endpoints endpoints) throws ClientException {
        RpcClient rpcClient;
        this.rpcClientTableLock.readLock().lock();
        try {
            rpcClient = this.rpcClientTable.get(endpoints);
            if (null != rpcClient) {
                RpcClient rpcClient2 = rpcClient;
                return rpcClient2;
            }
        }
        finally {
            this.rpcClientTableLock.readLock().unlock();
        }
        this.rpcClientTableLock.writeLock().lock();
        try {
            rpcClient = this.rpcClientTable.get(endpoints);
            if (null != rpcClient) {
                RpcClient rpcClient3 = rpcClient;
                return rpcClient3;
            }
            try {
                rpcClient = new RpcClientImpl(endpoints);
            }
            catch (SSLException e) {
                log.error("Failed to get rpc client, endpoints={}", (Object)endpoints);
                throw new ClientException(ErrorCode.SSL_FAILURE, "Failed to get rpc client");
            }
            this.rpcClientTable.put(endpoints, rpcClient);
            RpcClient rpcClient4 = rpcClient;
            return rpcClient4;
        }
        finally {
            this.rpcClientTableLock.writeLock().unlock();
        }
    }

    @Override
    public ListenableFuture<QueryRouteResponse> queryRoute(Endpoints endpoints, Metadata metadata, QueryRouteRequest request, long duration, TimeUnit timeUnit) {
        try {
            RpcClient rpcClient = this.getRpcClient(endpoints);
            return rpcClient.queryRoute(metadata, request, this.asyncWorker, duration, timeUnit);
        }
        catch (Throwable t) {
            SettableFuture<QueryRouteResponse> future = SettableFuture.create();
            future.setException(t);
            return future;
        }
    }

    @Override
    public ListenableFuture<HeartbeatResponse> heartbeat(Endpoints endpoints, Metadata metadata, HeartbeatRequest request, long duration, TimeUnit timeUnit) {
        try {
            RpcClient rpcClient = this.getRpcClient(endpoints);
            return rpcClient.heartbeat(metadata, request, this.asyncWorker, duration, timeUnit);
        }
        catch (Throwable t) {
            SettableFuture<HeartbeatResponse> future = SettableFuture.create();
            future.setException(t);
            return future;
        }
    }

    @Override
    public ListenableFuture<HealthCheckResponse> healthCheck(Endpoints endpoints, Metadata metadata, HealthCheckRequest request, long duration, TimeUnit timeUnit) {
        try {
            RpcClient rpcClient = this.getRpcClient(endpoints);
            return rpcClient.healthCheck(metadata, request, this.asyncWorker, duration, timeUnit);
        }
        catch (Throwable t) {
            SettableFuture<HealthCheckResponse> future = SettableFuture.create();
            future.setException(t);
            return future;
        }
    }

    @Override
    public ListenableFuture<SendMessageResponse> sendMessage(Endpoints endpoints, Metadata metadata, SendMessageRequest request, long duration, TimeUnit timeUnit) {
        try {
            RpcClient rpcClient = this.getRpcClient(endpoints);
            return rpcClient.sendMessage(metadata, request, this.asyncWorker, duration, timeUnit);
        }
        catch (Throwable t) {
            SettableFuture<SendMessageResponse> future = SettableFuture.create();
            future.setException(t);
            return future;
        }
    }

    @Override
    public ListenableFuture<QueryAssignmentResponse> queryAssignment(Endpoints endpoints, Metadata metadata, QueryAssignmentRequest request, long duration, TimeUnit timeUnit) {
        try {
            RpcClient rpcClient = this.getRpcClient(endpoints);
            return rpcClient.queryAssignment(metadata, request, this.asyncWorker, duration, timeUnit);
        }
        catch (Throwable t) {
            SettableFuture<QueryAssignmentResponse> future = SettableFuture.create();
            future.setException(t);
            return future;
        }
    }

    @Override
    public ListenableFuture<ReceiveMessageResponse> receiveMessage(Endpoints endpoints, Metadata metadata, ReceiveMessageRequest request, long duration, TimeUnit timeUnit) {
        try {
            RpcClient rpcClient = this.getRpcClient(endpoints);
            return rpcClient.receiveMessage(metadata, request, this.asyncWorker, duration, timeUnit);
        }
        catch (Throwable t) {
            SettableFuture<ReceiveMessageResponse> future = SettableFuture.create();
            future.setException(t);
            return future;
        }
    }

    @Override
    public ListenableFuture<AckMessageResponse> ackMessage(Endpoints endpoints, Metadata metadata, AckMessageRequest request, long duration, TimeUnit timeUnit) {
        try {
            RpcClient rpcClient = this.getRpcClient(endpoints);
            return rpcClient.ackMessage(metadata, request, this.asyncWorker, duration, timeUnit);
        }
        catch (Throwable t) {
            SettableFuture<AckMessageResponse> future = SettableFuture.create();
            future.setException(t);
            return future;
        }
    }

    @Override
    public ListenableFuture<NackMessageResponse> nackMessage(Endpoints endpoints, Metadata metadata, NackMessageRequest request, long duration, TimeUnit timeUnit) {
        try {
            RpcClient rpcClient = this.getRpcClient(endpoints);
            return rpcClient.nackMessage(metadata, request, this.asyncWorker, duration, timeUnit);
        }
        catch (Throwable t) {
            SettableFuture<NackMessageResponse> future = SettableFuture.create();
            future.setException(t);
            return future;
        }
    }

    @Override
    public ListenableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue(Endpoints endpoints, Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, long duration, TimeUnit timeUnit) {
        try {
            RpcClient rpcClient = this.getRpcClient(endpoints);
            return rpcClient.forwardMessageToDeadLetterQueue(metadata, request, this.asyncWorker, duration, timeUnit);
        }
        catch (Throwable t) {
            SettableFuture<ForwardMessageToDeadLetterQueueResponse> future = SettableFuture.create();
            future.setException(t);
            return future;
        }
    }

    @Override
    public ListenableFuture<EndTransactionResponse> endTransaction(Endpoints endpoints, Metadata metadata, EndTransactionRequest request, long duration, TimeUnit timeUnit) {
        try {
            RpcClient rpcClient = this.getRpcClient(endpoints);
            return rpcClient.endTransaction(metadata, request, this.asyncWorker, duration, timeUnit);
        }
        catch (Throwable t) {
            SettableFuture<EndTransactionResponse> future = SettableFuture.create();
            future.setException(t);
            return future;
        }
    }

    @Override
    public ListenableFuture<QueryOffsetResponse> queryOffset(Endpoints endpoints, Metadata metadata, QueryOffsetRequest request, long duration, TimeUnit timeUnit) {
        try {
            RpcClient rpcClient = this.getRpcClient(endpoints);
            return rpcClient.queryOffset(metadata, request, this.asyncWorker, duration, timeUnit);
        }
        catch (Throwable t) {
            SettableFuture<QueryOffsetResponse> future = SettableFuture.create();
            future.setException(t);
            return future;
        }
    }

    @Override
    public ListenableFuture<PullMessageResponse> pullMessage(Endpoints endpoints, Metadata metadata, PullMessageRequest request, long duration, TimeUnit timeUnit) {
        try {
            RpcClient rpcClient = this.getRpcClient(endpoints);
            return rpcClient.pullMessage(metadata, request, this.asyncWorker, duration, timeUnit);
        }
        catch (Throwable t) {
            SettableFuture<PullMessageResponse> future = SettableFuture.create();
            future.setException(t);
            return future;
        }
    }

    @Override
    public ListenableFuture<PollCommandResponse> pollCommand(Endpoints endpoints, Metadata metadata, PollCommandRequest request, long duration, TimeUnit timeUnit) {
        try {
            RpcClient rpcClient = this.getRpcClient(endpoints);
            return rpcClient.pollCommand(metadata, request, this.asyncWorker, duration, timeUnit);
        }
        catch (Throwable t) {
            SettableFuture<PollCommandResponse> future = SettableFuture.create();
            future.setException(t);
            return future;
        }
    }

    @Override
    public ListenableFuture<ReportThreadStackTraceResponse> reportThreadStackTrace(Endpoints endpoints, Metadata metadata, ReportThreadStackTraceRequest request, long duration, TimeUnit timeUnit) {
        try {
            RpcClient rpcClient = this.getRpcClient(endpoints);
            return rpcClient.reportThreadStackTrace(metadata, request, this.asyncWorker, duration, timeUnit);
        }
        catch (Throwable t) {
            SettableFuture<ReportThreadStackTraceResponse> future = SettableFuture.create();
            future.setException(t);
            return future;
        }
    }

    @Override
    public ListenableFuture<ReportMessageConsumptionResultResponse> reportMessageConsumption(Endpoints endpoints, Metadata metadata, ReportMessageConsumptionResultRequest request, long duration, TimeUnit timeUnit) {
        try {
            RpcClient rpcClient = this.getRpcClient(endpoints);
            return rpcClient.reportMessageConsumptionResult(metadata, request, this.asyncWorker, duration, timeUnit);
        }
        catch (Throwable t) {
            SettableFuture<ReportMessageConsumptionResultResponse> future = SettableFuture.create();
            future.setException(t);
            return future;
        }
    }

    @Override
    public ListenableFuture<NotifyClientTerminationResponse> notifyClientTermination(Endpoints endpoints, Metadata metadata, NotifyClientTerminationRequest request, long duration, TimeUnit timeUnit) {
        try {
            RpcClient rpcClient = this.getRpcClient(endpoints);
            return rpcClient.notifyClientTermination(metadata, request, this.asyncWorker, duration, timeUnit);
        }
        catch (Throwable t) {
            SettableFuture<NotifyClientTerminationResponse> future = SettableFuture.create();
            future.setException(t);
            return future;
        }
    }

    @Override
    public ScheduledExecutorService getScheduler() {
        return this.scheduler;
    }

    static {
        SLF4JBridgeHandler.removeHandlersForRootLogger();
        SLF4JBridgeHandler.install();
    }
}

