package com.alicloud.openservices.tablestore.tunnel.worker;

import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.TunnelClientInterface;
import com.alicloud.openservices.tablestore.core.ErrorCode;
import com.alicloud.openservices.tablestore.core.utils.Preconditions;
import com.alicloud.openservices.tablestore.model.tunnel.internal.Channel;
import com.alicloud.openservices.tablestore.model.tunnel.internal.ConnectTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.internal.HeartbeatRequest;
import com.alicloud.openservices.tablestore.model.tunnel.internal.HeartbeatResponse;
import com.alicloud.openservices.tablestore.model.tunnel.internal.ShutdownTunnelRequest;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alicloud/openservices/tablestore/tunnel/worker/TunnelWorker.class */
public class TunnelWorker implements ITunnelWorker {
    private static final Logger LOG = LoggerFactory.getLogger(TunnelWorker.class);
    private static final int CORE_POOL_SIZE = 2;
    private static final int WORKER_RANDOM_RETRY_MILLIS = 10000;
    private String tunnelId;
    private String clientId;
    private TunnelWorkerConfig workerConfig;
    private TunnelClientInterface client;
    private TunnelStateMachine stateMachine;
    private IChannelDialer channelDialer;
    private AtomicReference<TunnelWorkerStatus> workerStatus = new AtomicReference<>();
    private Date lastHeartbeatTime;
    private ScheduledExecutorService heartbeatExecutor;

    /* loaded from: input_file:com/alicloud/openservices/tablestore/tunnel/worker/TunnelWorker$Heartbeat.class */
    private class Heartbeat implements Runnable {
        private Heartbeat() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (((TunnelWorkerStatus) TunnelWorker.this.workerStatus.get()).equals(TunnelWorkerStatus.WORKER_ENDED)) {
                TunnelWorker.this.workerStatus.set(TunnelWorkerStatus.WORKER_READY);
                TunnelWorker.this.connect();
            }
            try {
                Date date = new Date();
                if (TunnelWorker.this.lastHeartbeatTime.compareTo(date) != 0 && date.getTime() - TunnelWorker.this.lastHeartbeatTime.getTime() > TimeUnit.SECONDS.toMillis(TunnelWorker.this.workerConfig.getHeartbeatTimeoutInSec())) {
                    TunnelWorker.LOG.error("Tunnel client heartbeat timeout, lastHeartbeatTime: {}.", TunnelWorker.this.lastHeartbeatTime);
                    throw new TableStoreException("tunnel client heartbeat timeout", ErrorCode.RESOURCE_GONE);
                }
                TunnelWorker.LOG.info("Begin batch get channels.");
                HeartbeatResponse heartbeat = TunnelWorker.this.client.heartbeat(new HeartbeatRequest(TunnelWorker.this.tunnelId, TunnelWorker.this.clientId, TunnelWorker.this.stateMachine.batchGetChannels()));
                TunnelWorker.this.lastHeartbeatTime = new Date();
                List<Channel> channels = heartbeat.getChannels();
                TunnelWorker.LOG.info("Begin batch update channels, num: {}, detail: {}.", Integer.valueOf(channels.size()), TunnelWorker.this.channelsToString(channels));
                TunnelWorker.this.stateMachine.batchUpdateChannels(channels);
            } catch (TableStoreException e) {
                TunnelWorker.LOG.warn("Heartbeat error, TableStore Exception: {}.", e.toString());
                if (!TunnelWorker.this.isTunnelInvalid(e.getErrorCode())) {
                    TunnelWorker.this.shutdown(false);
                } else {
                    TunnelWorker.LOG.error("Tunnel is expired or invalid, tunnel worker will be halted.");
                    TunnelWorker.this.shutdown(true);
                }
            } catch (Throwable th) {
                TunnelWorker.LOG.warn("Heartbeat error, Throwable: {}", th.toString());
                TunnelWorker.this.shutdown(false);
            }
        }
    }

    public TunnelWorker(String str, TunnelClientInterface tunnelClientInterface, TunnelWorkerConfig tunnelWorkerConfig) {
        Preconditions.checkArgument((str == null || str.isEmpty()) ? false : true, "The tunnel id should not be null or empty.");
        Preconditions.checkNotNull(tunnelClientInterface, "Tunnel client cannot be null.");
        Preconditions.checkNotNull(tunnelWorkerConfig, "Tunnel worker workerConfig cannot be null.");
        Preconditions.checkNotNull(tunnelWorkerConfig.getChannelProcessor(), "Channel Processor cannot be null.");
        init(str, tunnelClientInterface, tunnelWorkerConfig);
    }

    private void init(String str, TunnelClientInterface tunnelClientInterface, TunnelWorkerConfig tunnelWorkerConfig) {
        LOG.info("Initial tunnel worker, tunnelId: {}", str);
        this.tunnelId = str;
        this.client = tunnelClientInterface;
        this.workerConfig = tunnelWorkerConfig;
        this.workerStatus.set(TunnelWorkerStatus.WORKER_READY);
        this.lastHeartbeatTime = new Date();
        this.heartbeatExecutor = Executors.newScheduledThreadPool(2, new ThreadFactory() { // from class: com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker.1
            private final AtomicInteger counter = new AtomicInteger(0);

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, "tunnel-heartbeat-scheduled-" + this.counter.getAndIncrement());
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void connect() {
        if (!this.workerStatus.compareAndSet(TunnelWorkerStatus.WORKER_READY, TunnelWorkerStatus.WORKER_STARTED)) {
            throw new ClientException(String.format("Tunnel worker has already been %s status", this.workerStatus));
        }
        while (true) {
            try {
                this.clientId = this.client.connectTunnel(new ConnectTunnelRequest(this.tunnelId, new TunnelClientConfig(this.workerConfig.getHeartbeatTimeoutInSec(), this.workerConfig.getClientTag()))).getClientId();
                this.channelDialer = new ChannelDialer(this.client, this.workerConfig);
                this.stateMachine = new TunnelStateMachine(this.tunnelId, this.clientId, this.channelDialer, new ChannelProcessFactory(this.workerConfig), this.client);
                LOG.info("Connect tunnel success, clientId: {}, tunnelId: {}", this.clientId, this.tunnelId);
                return;
            } catch (TableStoreException e) {
                LOG.warn("Connect tunnel failed, tunnel id {}, error detail {}", this.tunnelId, e.toString());
                if (isTunnelInvalid(e.getErrorCode())) {
                    LOG.error("Tunnel is expired or invalid, tunnel worker will be halted.");
                    this.workerStatus.set(TunnelWorkerStatus.WORKER_HALT);
                    throw e;
                }
                try {
                    Thread.sleep(new Random(System.currentTimeMillis()).nextInt(WORKER_RANDOM_RETRY_MILLIS) + 1);
                } catch (Exception e2) {
                    LOG.warn("Reconnect worker error, error detail: {}", e2.toString());
                }
            } catch (Exception e3) {
                LOG.warn("Connect tunnel failed, tunnel id {}, error detail {}", this.tunnelId, e3.toString());
                Thread.sleep(new Random(System.currentTimeMillis()).nextInt(WORKER_RANDOM_RETRY_MILLIS) + 1);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isTunnelInvalid(String str) {
        return str.equals(ErrorCode.TUNNEL_EXPIRED) || str.equals(ErrorCode.INVALID_PARAMETER);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String channelsToString(List<Channel> list) {
        StringBuilder sb = new StringBuilder();
        sb.append("[");
        Iterator<Channel> it = list.iterator();
        while (it.hasNext()) {
            sb.append(it.next());
        }
        sb.append("]");
        return sb.toString();
    }

    @Override // com.alicloud.openservices.tablestore.tunnel.worker.ITunnelWorker
    public void connectAndWorking() throws Exception {
        connect();
        this.heartbeatExecutor.scheduleAtFixedRate(new Heartbeat(), 0L, this.workerConfig.getHeartbeatIntervalInSec(), TimeUnit.SECONDS);
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { // from class: com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker.2
            @Override // java.lang.Runnable
            public void run() {
                TunnelWorker.LOG.warn("Unexpected shutdown, do resources clear.");
                TunnelWorker.this.shutdown();
            }
        }));
    }

    @Override // com.alicloud.openservices.tablestore.tunnel.worker.ITunnelWorker
    public void shutdown() {
        shutdown(true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void shutdown(boolean z) {
        if (this.workerStatus.get().equals(TunnelWorkerStatus.WORKER_ENDED) || this.workerStatus.get().equals(TunnelWorkerStatus.WORKER_HALT)) {
            LOG.info("Tunnel worker has already been {} status, skip shutdown logic.", this.workerStatus);
            return;
        }
        if (this.channelDialer != null) {
            LOG.info("Shutdown channel dialer");
            this.channelDialer.shutdown();
        }
        if (this.stateMachine != null) {
            LOG.info("Shutdown tunnel state machine.");
            this.stateMachine.close();
        }
        if (z && this.heartbeatExecutor != null) {
            LOG.info("Shutdown heartbeat executor.");
            this.heartbeatExecutor.shutdown();
        }
        try {
            LOG.info("Shutdown tunnel, tunnelId: {}, clientId: {}", this.tunnelId, this.clientId);
            this.client.shutdownTunnel(new ShutdownTunnelRequest(this.tunnelId, this.clientId));
        } catch (Exception e) {
            LOG.warn("Shutdown tunnel failed, tunnelId: {}, clientId: {}", this.tunnelId, this.clientId);
        }
        LOG.info("Tunnel worker is ended.");
        if (z) {
            this.workerStatus.set(TunnelWorkerStatus.WORKER_HALT);
        } else {
            this.workerStatus.set(TunnelWorkerStatus.WORKER_ENDED);
        }
    }
}
