package com.aliyun.openservices.aliyun.log.producer;

import com.aliyun.openservices.aliyun.log.producer.errors.MaxBatchCountExceedException;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.aliyun.log.producer.internals.BatchHandler;
import com.aliyun.openservices.aliyun.log.producer.internals.IOThreadPool;
import com.aliyun.openservices.aliyun.log.producer.internals.LogAccumulator;
import com.aliyun.openservices.aliyun.log.producer.internals.Mover;
import com.aliyun.openservices.aliyun.log.producer.internals.RetryQueue;
import com.aliyun.openservices.aliyun.log.producer.internals.Utils;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.http.client.ClientConfiguration;
import com.aliyun.openservices.log.http.comm.ServiceClient;
import com.aliyun.openservices.log.http.comm.TimeoutServiceClient;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/openservices/aliyun/log/producer/LogProducer.class */
public class LogProducer implements Producer {
    private static final Logger LOGGER = LoggerFactory.getLogger(LogProducer.class);
    private static final AtomicInteger INSTANCE_ID_GENERATOR = new AtomicInteger(0);
    private static final String LOG_PRODUCER_PREFIX = "aliyun-log-producer-";
    private static final String MOVER_SUFFIX = "-mover";
    private static final String SUCCESS_BATCH_HANDLER_SUFFIX = "-success-batch-handler";
    private static final String FAILURE_BATCH_HANDLER_SUFFIX = "-failure-batch-handler";
    private static final String TIMEOUT_THREAD_SUFFIX_FORMAT = "-timeout-thread-%d";
    private final ProducerConfig producerConfig;
    private final ServiceClient serviceClient;
    private final Semaphore memoryController;
    private final IOThreadPool ioThreadPool;
    private final ThreadPoolExecutor timeoutThreadPool;
    private final LogAccumulator accumulator;
    private final Mover mover;
    private final BatchHandler successBatchHandler;
    private final BatchHandler failureBatchHandler;
    private final ShardHashAdjuster adjuster;
    private final Map<String, Client> clientPool = new ConcurrentHashMap();
    private final AtomicInteger batchCount = new AtomicInteger(0);
    private final int instanceId = INSTANCE_ID_GENERATOR.getAndIncrement();
    private final String name = LOG_PRODUCER_PREFIX + this.instanceId;
    private final String producerHash = Utils.generateProducerHash(this.instanceId);
    private final RetryQueue retryQueue = new RetryQueue();

    public LogProducer(ProducerConfig producerConfig) {
        this.producerConfig = producerConfig;
        this.memoryController = new Semaphore(producerConfig.getTotalSizeInBytes());
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
        this.ioThreadPool = new IOThreadPool(producerConfig.getIoThreadCount(), this.name);
        this.timeoutThreadPool = new ThreadPoolExecutor(producerConfig.getIoThreadCount(), producerConfig.getIoThreadCount(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat(this.name + TIMEOUT_THREAD_SUFFIX_FORMAT).build());
        this.serviceClient = new TimeoutServiceClient(new ClientConfiguration(), this.timeoutThreadPool);
        this.accumulator = new LogAccumulator(this.producerHash, producerConfig, this.clientPool, this.memoryController, this.retryQueue, linkedBlockingQueue, linkedBlockingQueue2, this.ioThreadPool, this.batchCount);
        this.mover = new Mover(this.name + MOVER_SUFFIX, producerConfig, this.clientPool, this.accumulator, this.retryQueue, linkedBlockingQueue, linkedBlockingQueue2, this.ioThreadPool, this.batchCount);
        this.successBatchHandler = new BatchHandler(this.name + SUCCESS_BATCH_HANDLER_SUFFIX, linkedBlockingQueue, this.batchCount, this.memoryController);
        this.failureBatchHandler = new BatchHandler(this.name + FAILURE_BATCH_HANDLER_SUFFIX, linkedBlockingQueue2, this.batchCount, this.memoryController);
        this.mover.start();
        this.successBatchHandler.start();
        this.failureBatchHandler.start();
        this.adjuster = new ShardHashAdjuster(producerConfig.getBuckets());
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public ListenableFuture<Result> send(String str, String str2, LogItem logItem) throws InterruptedException, ProducerException {
        return send(str, str2, "", "", (String) null, logItem, (Callback) null);
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public ListenableFuture<Result> send(String str, String str2, List<LogItem> list) throws InterruptedException, ProducerException {
        return send(str, str2, "", "", (String) null, list, (Callback) null);
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public ListenableFuture<Result> send(String str, String str2, String str3, String str4, LogItem logItem) throws InterruptedException, ProducerException {
        return send(str, str2, str3, str4, (String) null, logItem, (Callback) null);
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public ListenableFuture<Result> send(String str, String str2, String str3, String str4, List<LogItem> list) throws InterruptedException, ProducerException {
        return send(str, str2, str3, str4, (String) null, list, (Callback) null);
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public ListenableFuture<Result> send(String str, String str2, String str3, String str4, String str5, LogItem logItem) throws InterruptedException, ProducerException {
        return send(str, str2, str3, str4, str5, logItem, (Callback) null);
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public ListenableFuture<Result> send(String str, String str2, String str3, String str4, String str5, List<LogItem> list) throws InterruptedException, ProducerException {
        return send(str, str2, str3, str4, str5, list, (Callback) null);
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public ListenableFuture<Result> send(String str, String str2, LogItem logItem, Callback callback) throws InterruptedException, ProducerException {
        return send(str, str2, "", "", (String) null, logItem, callback);
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public ListenableFuture<Result> send(String str, String str2, List<LogItem> list, Callback callback) throws InterruptedException, ProducerException {
        return send(str, str2, "", "", (String) null, list, callback);
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public ListenableFuture<Result> send(String str, String str2, String str3, String str4, LogItem logItem, Callback callback) throws InterruptedException, ProducerException {
        return send(str, str2, str3, str4, (String) null, logItem, callback);
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public ListenableFuture<Result> send(String str, String str2, String str3, String str4, List<LogItem> list, Callback callback) throws InterruptedException, ProducerException {
        return send(str, str2, str3, str4, (String) null, list, callback);
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public ListenableFuture<Result> send(String str, String str2, String str3, String str4, String str5, LogItem logItem, Callback callback) throws InterruptedException, ProducerException {
        Utils.assertArgumentNotNull(logItem, "logItem");
        ArrayList arrayList = new ArrayList();
        arrayList.add(logItem);
        return send(str, str2, str3, str4, str5, arrayList, callback);
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public ListenableFuture<Result> send(String str, String str2, String str3, String str4, String str5, List<LogItem> list, Callback callback) throws InterruptedException, ProducerException {
        Utils.assertArgumentNotNullOrEmpty(str, "project");
        Utils.assertArgumentNotNullOrEmpty(str2, "logStore");
        if (str3 == null) {
            str3 = "";
        }
        Utils.assertArgumentNotNull(list, "logItems");
        if (list.isEmpty()) {
            throw new IllegalArgumentException("logItems cannot be empty");
        }
        int size = list.size();
        if (size > 40960) {
            throw new MaxBatchCountExceedException("the log list size is " + size + " which exceeds the MAX_BATCH_COUNT " + ProducerConfig.MAX_BATCH_COUNT);
        }
        if (str5 != null && this.producerConfig.isAdjustShardHash()) {
            str5 = this.adjuster.adjust(str5);
        }
        return this.accumulator.append(str, str2, str3, str4, str5, list, callback);
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public void close() throws InterruptedException, ProducerException {
        close(Long.MAX_VALUE);
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public void close(long j) throws InterruptedException, ProducerException {
        if (j < 0) {
            throw new IllegalArgumentException("timeoutMs must be greater than or equal to 0, got " + j);
        }
        ProducerException producerException = null;
        LOGGER.info("Closing the log producer, timeoutMs={}", Long.valueOf(j));
        try {
            j = closeMover(j);
        } catch (ProducerException e) {
            producerException = e;
        }
        LOGGER.debug("After close mover, timeoutMs={}", Long.valueOf(j));
        try {
            j = closeIOThreadPool(j);
        } catch (ProducerException e2) {
            if (producerException == null) {
                producerException = e2;
            }
        }
        LOGGER.debug("After close ioThreadPool, timeoutMs={}", Long.valueOf(j));
        try {
            j = closeTimeoutThreadPool(j);
        } catch (ProducerException e3) {
            if (producerException == null) {
                producerException = e3;
            }
        }
        LOGGER.debug("After close timeoutThreadPool, timeoutMs={}", Long.valueOf(j));
        try {
            j = closeSuccessBatchHandler(j);
        } catch (ProducerException e4) {
            if (producerException == null) {
                producerException = e4;
            }
        }
        LOGGER.debug("After close success batch handler, timeoutMs={}", Long.valueOf(j));
        try {
            j = closeFailureBatchHandler(j);
        } catch (ProducerException e5) {
            if (producerException == null) {
                producerException = e5;
            }
        }
        LOGGER.debug("After close failure batch handler, timeoutMs={}", Long.valueOf(j));
        if (producerException != null) {
            throw producerException;
        }
        LOGGER.info("The log producer has been closed");
    }

    private long closeMover(long j) throws InterruptedException, ProducerException {
        long currentTimeMillis = System.currentTimeMillis();
        this.accumulator.close();
        this.retryQueue.close();
        this.mover.close();
        this.mover.join(j);
        if (!this.mover.isAlive()) {
            return Math.max(0L, (j - System.currentTimeMillis()) + currentTimeMillis);
        }
        LOGGER.warn("The mover thread is still alive");
        throw new ProducerException("the mover thread is still alive");
    }

    private long closeIOThreadPool(long j) throws InterruptedException, ProducerException {
        long currentTimeMillis = System.currentTimeMillis();
        this.ioThreadPool.shutdown();
        if (this.ioThreadPool.awaitTermination(j, TimeUnit.MILLISECONDS)) {
            LOGGER.debug("The ioThreadPool is terminated");
            return Math.max(0L, (j - System.currentTimeMillis()) + currentTimeMillis);
        }
        LOGGER.warn("The ioThreadPool is not fully terminated");
        throw new ProducerException("the ioThreadPool is not fully terminated");
    }

    private long closeTimeoutThreadPool(long j) throws InterruptedException, ProducerException {
        long currentTimeMillis = System.currentTimeMillis();
        this.timeoutThreadPool.shutdown();
        if (this.timeoutThreadPool.awaitTermination(j, TimeUnit.MILLISECONDS)) {
            LOGGER.debug("The timeoutThreadPool is terminated");
            return Math.max(0L, (j - System.currentTimeMillis()) + currentTimeMillis);
        }
        LOGGER.warn("The timeoutThreadPool is not fully terminated");
        throw new ProducerException("the timeoutThreadPool is not fully terminated");
    }

    private long closeSuccessBatchHandler(long j) throws InterruptedException, ProducerException {
        long currentTimeMillis = System.currentTimeMillis();
        this.successBatchHandler.close();
        if (Thread.currentThread() == this.successBatchHandler) {
            LOGGER.warn("Skip join success batch handler since you have incorrectly invoked close from the producer call-back");
            return j;
        }
        this.successBatchHandler.join(j);
        if (!this.successBatchHandler.isAlive()) {
            return Math.max(0L, (j - System.currentTimeMillis()) + currentTimeMillis);
        }
        LOGGER.warn("The success batch handler thread is still alive");
        throw new ProducerException("the success batch handler thread is still alive");
    }

    private long closeFailureBatchHandler(long j) throws InterruptedException, ProducerException {
        long currentTimeMillis = System.currentTimeMillis();
        this.failureBatchHandler.close();
        if (Thread.currentThread() == this.successBatchHandler || Thread.currentThread() == this.failureBatchHandler) {
            LOGGER.warn("Skip join failure batch handler since you have incorrectly invoked close from the producer call-back");
            return j;
        }
        this.failureBatchHandler.join(j);
        if (!this.failureBatchHandler.isAlive()) {
            return Math.max(0L, (j - System.currentTimeMillis()) + currentTimeMillis);
        }
        LOGGER.warn("The failure batch handler thread is still alive");
        throw new ProducerException("the failure batch handler thread is still alive");
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public ProducerConfig getProducerConfig() {
        return this.producerConfig;
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public int getBatchCount() {
        return this.batchCount.get();
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public int availableMemoryInBytes() {
        return this.memoryController.availablePermits();
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public void putProjectConfig(ProjectConfig projectConfig) {
        this.clientPool.put(projectConfig.getProject(), buildClient(projectConfig));
    }

    @Override // com.aliyun.openservices.aliyun.log.producer.Producer
    public void removeProjectConfig(ProjectConfig projectConfig) {
        this.clientPool.remove(projectConfig.getProject());
    }

    private Client buildClient(ProjectConfig projectConfig) {
        Client client = new Client(projectConfig.getEndpoint(), projectConfig.getCredentialsProvider(), this.serviceClient, (String) null);
        String userAgent = projectConfig.getUserAgent();
        if (userAgent != null) {
            client.setUserAgent(userAgent);
        }
        return client;
    }
}
