/*
 * Decompiled with CFR 0.152.
 */
package com.pangubpm.common.executor;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.pangubpm.common.exception.BaseException;
import com.pangubpm.common.executor.AsyncTask;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DuplicateKeyException;

public class CommonExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(CommonExecutor.class);
    private static final ThreadPoolExecutor BASE_EXECUTOR;

    public static ThreadPoolExecutor getCommonExecutor() {
        return BASE_EXECUTOR;
    }

    public static ThreadPoolExecutor buildThreadFirstExecutor(String poolName) {
        int coreSize = CommonExecutor.getCpuProcessors();
        int maxSize = coreSize * 8;
        return CommonExecutor.buildThreadFirstExecutor(coreSize, maxSize, 5L, TimeUnit.MINUTES, 65536, poolName);
    }

    public static ThreadPoolExecutor buildThreadFirstExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, int workQueueSize, String poolName) {
        LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(workQueueSize){
            private static final long serialVersionUID = 5075561696269543041L;

            @Override
            public boolean offer(@Nonnull Runnable o) {
                return false;
            }
        };
        RejectedExecutionHandler rejectedExecutionHandler = (runnable, executor) -> {
            try {
                queue.put(runnable);
            }
            catch (InterruptedException e) {
                LOGGER.warn("{} Queue offer interrupted. ", (Object)poolName, (Object)e);
                Thread.currentThread().interrupt();
            }
        };
        ThreadPoolExecutor executor2 = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue<Runnable>)queue, new ThreadFactoryBuilder().setNameFormat(poolName + "-%d").setUncaughtExceptionHandler((thread, throwable) -> LOGGER.error("{} catching the uncaught exception, ThreadName: [{}]", new Object[]{poolName, thread.toString(), throwable})).build(), rejectedExecutionHandler);
        executor2.allowCoreThreadTimeOut(true);
        CommonExecutor.displayThreadPoolStatus(executor2, poolName);
        CommonExecutor.hookShutdownThreadPool(executor2, poolName);
        ExecutorManager.registerThreadPoolExecutor(poolName, executor2);
        return executor2;
    }

    public static <T> List<T> batchExecuteAsync(List<AsyncTask<T>> tasks, @Nonnull String taskName) {
        return CommonExecutor.batchExecuteAsync(tasks, BASE_EXECUTOR, taskName);
    }

    public static <T> List<T> batchExecuteAsync(@Nonnull List<AsyncTask<T>> tasks, @Nonnull ThreadPoolExecutor executor, @Nonnull String taskName) {
        if (CollectionUtils.isEmpty(tasks)) {
            return Collections.emptyList();
        }
        int size = tasks.size();
        List callables = tasks.stream().map(t -> () -> {
            try {
                Object r = t.doExecute();
                LOGGER.debug("[>>Executor<<] Async task execute success. ThreadName: [{}], BatchTaskName: [{}], SubTaskName: [{}]", new Object[]{Thread.currentThread().getName(), taskName, t.taskName()});
                return r;
            }
            catch (Throwable e) {
                LOGGER.warn("[>>Executor<<] Async task execute error. ThreadName: [{}], BatchTaskName: [{}], SubTaskName: [{}], exception: {}", new Object[]{Thread.currentThread().getName(), taskName, t.taskName(), e.getMessage()});
                throw e;
            }
        }).collect(Collectors.toList());
        ExecutorCompletionService cs = new ExecutorCompletionService(executor, new LinkedBlockingQueue(size));
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(size);
        LOGGER.info("[>>Executor<<] Start async tasks, BatchTaskName: [{}], TaskSize: [{}]", (Object)taskName, (Object)size);
        for (Callable task : callables) {
            futures.add(cs.submit(task));
        }
        ArrayList resultList = new ArrayList(size);
        for (int i = 0; i < size; ++i) {
            try {
                Future future = cs.poll(6L, TimeUnit.MINUTES);
                if (future == null) {
                    CommonExecutor.cancelTask(futures);
                    LOGGER.error("[>>Executor<<] Async task [{}] - [{}] execute timeout, then cancel other tasks.", (Object)taskName, (Object)i);
                    throw new RuntimeException(taskName + "execute timeout, then cancel other tasks.");
                }
                Object result = future.get();
                resultList.add(result);
                LOGGER.debug("[>>Executor<<] Async task [{}] - [{}] execute success, result: {}", new Object[]{taskName, i, result});
                continue;
            }
            catch (ExecutionException e) {
                LOGGER.warn("[>>Executor<<] Async task [{}] - [{}] execute error, then cancel other tasks.", new Object[]{taskName, i, e});
                CommonExecutor.cancelTask(futures);
                Throwable throwable = e.getCause();
                if (throwable instanceof BaseException) {
                    throw (BaseException)throwable;
                }
                if (throwable instanceof DuplicateKeyException) {
                    throw (DuplicateKeyException)throwable;
                }
                throw new BaseException("error.executorError", e.getCause().getMessage());
            }
            catch (InterruptedException e) {
                CommonExecutor.cancelTask(futures);
                Thread.currentThread().interrupt();
                LOGGER.error("[>>Executor<<] Async task [{}] - [{}] were interrupted.", (Object)taskName, (Object)i);
                throw new BaseException(taskName + "execute timeout, then cancel other tasks.");
            }
        }
        LOGGER.info("[>>Executor<<] Finish async tasks , BatchTaskName: [{}], TaskSize: [{}]", (Object)taskName, (Object)size);
        return resultList;
    }

    public static void displayThreadPoolStatus(ThreadPoolExecutor threadPool, String threadPoolName) {
        CommonExecutor.displayThreadPoolStatus(threadPool, threadPoolName, RandomUtils.nextInt((int)60, (int)600), TimeUnit.SECONDS);
    }

    public static void displayThreadPoolStatus(ThreadPoolExecutor threadPool, String threadPoolName, long period, TimeUnit unit) {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            String payload = "[>>ExecutorStatus<<] ThreadPool Name: [{}], Pool Status: [shutdown={}, Terminated={}], Pool Thread Size: {}, Largest Pool Size: {}, Active Thread Count: {}, Task Count: {}, Tasks Completed: {}, Tasks in Queue: {}";
            Object[] params = new Object[]{threadPoolName, threadPool.isShutdown(), threadPool.isTerminated(), threadPool.getPoolSize(), threadPool.getLargestPoolSize(), threadPool.getActiveCount(), threadPool.getTaskCount(), threadPool.getCompletedTaskCount(), threadPool.getQueue().size()};
            if (threadPool.getQueue().remainingCapacity() < 64) {
                LOGGER.warn(payload, params);
            } else {
                LOGGER.info(payload, params);
            }
        }, 0L, period, unit);
    }

    public static void hookShutdownThreadPool(ExecutorService threadPool, String threadPoolName) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            LOGGER.info("[>>ExecutorShutdown<<] Start to shutdown the thead pool: [{}]", (Object)threadPoolName);
            threadPool.shutdown();
            try {
                if (!threadPool.awaitTermination(60L, TimeUnit.SECONDS)) {
                    threadPool.shutdownNow();
                    LOGGER.warn("[>>ExecutorShutdown<<] Interrupt the worker, which may cause some task inconsistent. Please check the biz logs.");
                    if (!threadPool.awaitTermination(60L, TimeUnit.SECONDS)) {
                        LOGGER.error("[>>ExecutorShutdown<<] Thread pool can't be shutdown even with interrupting worker threads, which may cause some task inconsistent. Please check the biz logs.");
                    }
                }
            }
            catch (InterruptedException ie) {
                threadPool.shutdownNow();
                LOGGER.error("[>>ExecutorShutdown<<] The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconsistent state. Please check the biz logs.");
                Thread.currentThread().interrupt();
            }
            LOGGER.info("[>>ExecutorShutdown<<] Finally shutdown the thead pool: [{}]", (Object)threadPoolName);
        }));
    }

    public static int getCpuProcessors() {
        return Runtime.getRuntime() != null && Runtime.getRuntime().availableProcessors() > 0 ? Runtime.getRuntime().availableProcessors() : 8;
    }

    private static <T> void cancelTask(List<Future<T>> futures) {
        for (Future<T> future : futures) {
            if (future.isDone()) continue;
            future.cancel(true);
        }
    }

    static {
        String executorName = "BaseExecutor";
        BASE_EXECUTOR = CommonExecutor.buildThreadFirstExecutor(executorName);
        ExecutorManager.registerThreadPoolExecutor(executorName, BASE_EXECUTOR);
    }

    public static class ExecutorManager {
        private static final ConcurrentHashMap<String, ThreadPoolExecutor> EXECUTORS = new ConcurrentHashMap(8);

        public static void registerThreadPoolExecutor(String threadPoolName, ThreadPoolExecutor executor) {
            EXECUTORS.put(threadPoolName, executor);
        }

        public static ThreadPoolExecutor getThreadPoolExecutor(String threadPoolName) {
            return EXECUTORS.get(threadPoolName);
        }

        public static Map<String, ThreadPoolExecutor> getAllThreadPoolExecutor() {
            return ImmutableMap.copyOf(EXECUTORS);
        }

        public static void removeThreadPoolExecutor(String threadPoolName) {
            EXECUTORS.remove(threadPoolName);
        }
    }
}

