package com.taotao.cloud.core.monitor;

import com.taotao.cloud.common.exception.BaseException;
import com.taotao.cloud.common.utils.LogUtil;
import com.taotao.cloud.core.model.AsyncThreadPoolExecutor;
import com.taotao.cloud.core.model.AsyncThreadPoolTaskExecutor;
import com.taotao.cloud.core.model.Callable;
import com.taotao.cloud.core.model.Collector;
import com.taotao.cloud.core.model.ProcessExitEvent;
import com.taotao.cloud.core.model.Ref;
import com.taotao.cloud.core.properties.AsyncThreadPoolProperties;
import com.taotao.cloud.core.properties.MonitorThreadPoolProperties;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/* loaded from: input_file:com/taotao/cloud/core/monitor/MonitorThreadPool.class */
public class MonitorThreadPool {
    private final ThreadPoolExecutor monitorThreadPoolExecutor;
    private final ThreadPoolTaskExecutor coreThreadPoolExecutor;
    private boolean monitorCheckHealth = true;
    private boolean coreCheckHealth = true;
    private MonitorSystem monitorSystem;
    private String monitorThreadName;
    private Collector collector;

    /* loaded from: input_file:com/taotao/cloud/core/monitor/MonitorThreadPool$MonitorThreadPoolFactory.class */
    public static class MonitorThreadPoolFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final String namePrefix;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final ThreadGroup group = Thread.currentThread().getThreadGroup();

        public MonitorThreadPoolFactory(String str) {
            this.namePrefix = str + "-pool-" + poolNumber.getAndIncrement();
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(this.group, runnable, this.namePrefix + "-thread-" + this.threadNumber.getAndIncrement(), 0L);
            Thread.UncaughtExceptionHandler uncaughtExceptionHandler = thread.getUncaughtExceptionHandler();
            if (!(uncaughtExceptionHandler instanceof MonitorThreadPoolUncaughtExceptionHandler)) {
                thread.setUncaughtExceptionHandler(new MonitorThreadPoolUncaughtExceptionHandler(uncaughtExceptionHandler));
            }
            thread.setDaemon(true);
            if (thread.getPriority() != 5) {
                thread.setPriority(5);
            }
            return thread;
        }

        public ThreadGroup getGroup() {
            return this.group;
        }

        public AtomicInteger getThreadNumber() {
            return this.threadNumber;
        }

        public String getNamePrefix() {
            return this.namePrefix;
        }
    }

    /* loaded from: input_file:com/taotao/cloud/core/monitor/MonitorThreadPool$MonitorThreadPoolUncaughtExceptionHandler.class */
    public static class MonitorThreadPoolUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
        private Thread.UncaughtExceptionHandler lastUncaughtExceptionHandler;

        public MonitorThreadPoolUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
            this.lastUncaughtExceptionHandler = uncaughtExceptionHandler;
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            if (th != null) {
                LogUtil.error(th, "[警告] [taotao-cloud-monitor-executor] 未捕获错误", new Object[0]);
            }
            if (this.lastUncaughtExceptionHandler != null) {
                this.lastUncaughtExceptionHandler.uncaughtException(thread, th);
            }
        }
    }

    public MonitorThreadPool(Collector collector, MonitorThreadPoolProperties monitorThreadPoolProperties, AsyncThreadPoolProperties asyncThreadPoolProperties, AsyncThreadPoolTaskExecutor asyncThreadPoolTaskExecutor) {
        this.collector = collector;
        this.monitorThreadName = monitorThreadPoolProperties.getThreadNamePrefix();
        AsyncThreadPoolExecutor asyncThreadPoolExecutor = new AsyncThreadPoolExecutor(monitorThreadPoolProperties.getCorePoolSize(), monitorThreadPoolProperties.getMaximumPoolSize(), monitorThreadPoolProperties.getKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue(), new MonitorThreadPoolFactory(this.monitorThreadName));
        asyncThreadPoolExecutor.setNamePrefix(this.monitorThreadName);
        this.monitorThreadPoolExecutor = asyncThreadPoolExecutor;
        this.monitorThreadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        this.coreThreadPoolExecutor = asyncThreadPoolTaskExecutor;
        this.monitorSystem = new MonitorSystem(this.collector, this.monitorThreadName, asyncThreadPoolProperties.getThreadNamePrefix(), asyncThreadPoolExecutor, asyncThreadPoolTaskExecutor);
        ProcessExitEvent.register(() -> {
            try {
                monitorShutdown();
                coreShutdown();
            } catch (Exception e) {
                LogUtil.error(e, "关闭SystemThreadPool时出错", new Object[0]);
            }
        }, Integer.MAX_VALUE, false);
    }

    private void monitorThreadPoolCheckHealth() {
        if (!this.monitorCheckHealth || this.monitorThreadPoolExecutor.getMaximumPoolSize() > this.monitorThreadPoolExecutor.getPoolSize() || this.monitorThreadPoolExecutor.getQueue().size() <= 0) {
            return;
        }
        LogUtil.warn("监控线程池已满 任务开始出现排队 请修改配置 [taotao.cloud.core.threadpool.monitor.maximumPoolSize] 当前活动线程数: {}", new Object[]{Integer.valueOf(this.monitorThreadPoolExecutor.getActiveCount())});
    }

    private void coreThreadPoolCheckHealth() {
        if (!this.coreCheckHealth || this.coreThreadPoolExecutor.getMaxPoolSize() > this.coreThreadPoolExecutor.getPoolSize() || this.coreThreadPoolExecutor.getThreadPoolExecutor().getQueue().size() <= 0) {
            return;
        }
        LogUtil.warn("核心线程池已满 任务开始出现排队 请修改配置 [taotao.cloud.core.threadpool.async.threadPoolMaxSiz] 当前活动线程数: {}", new Object[]{Integer.valueOf(this.coreThreadPoolExecutor.getActiveCount())});
    }

    public <T> Future<T> monitorSubmit(String str, Callable<T> callable) {
        monitorThreadPoolCheckHealth();
        return (Future) this.monitorSystem.monitorHook().run(str, () -> {
            return this.monitorThreadPoolExecutor.submit(callable);
        });
    }

    public <T> Future<T> coreSubmit(String str, Callable<T> callable) {
        if (Objects.isNull(this.coreThreadPoolExecutor)) {
            LogUtil.warn("核心线程池未初始化", new Object[0]);
            return null;
        }
        coreThreadPoolCheckHealth();
        return (Future) this.monitorSystem.coreHook().run(str, () -> {
            return this.coreThreadPoolExecutor.submit(callable);
        });
    }

    public void monitorSubmit(String str, Runnable runnable) {
        monitorThreadPoolCheckHealth();
        this.monitorSystem.monitorHook().run(str, () -> {
            return this.monitorThreadPoolExecutor.submit(runnable);
        });
    }

    public Future<?> coreSubmit(String str, Runnable runnable) {
        if (Objects.isNull(this.coreThreadPoolExecutor)) {
            LogUtil.warn("核心线程池未初始化", new Object[0]);
            return null;
        }
        coreThreadPoolCheckHealth();
        return (Future) this.monitorSystem.coreHook().run(str, () -> {
            return this.coreThreadPoolExecutor.submit(runnable);
        });
    }

    public boolean monitorIsShutdown() {
        return this.monitorThreadPoolExecutor.isShutdown();
    }

    public boolean coreIsShutdown() {
        if (!Objects.isNull(this.coreThreadPoolExecutor)) {
            return this.coreThreadPoolExecutor.getThreadPoolExecutor().isShutdown();
        }
        LogUtil.warn("核心线程池未初始化", new Object[0]);
        return true;
    }

    public void monitorShutdown() {
        this.monitorThreadPoolExecutor.shutdown();
    }

    public void coreShutdown() {
        if (Objects.nonNull(this.coreThreadPoolExecutor)) {
            this.coreThreadPoolExecutor.shutdown();
        }
    }

    public <T> void monitorParallelFor(String str, int i, List<T> list, Callable.Action1<T> action1) {
        monitorThreadPoolCheckHealth();
        this.monitorSystem.monitorHook().run(str, () -> {
            Object poll;
            int i2 = i;
            if (i2 > list.size()) {
                i2 = list.size();
            }
            LinkedList linkedList = new LinkedList();
            linkedList.addAll(list);
            while (linkedList.size() > 0) {
                ArrayList arrayList = new ArrayList(i2);
                for (int i3 = 0; i3 < i2 && (poll = linkedList.poll()) != null; i3++) {
                    arrayList.add(poll);
                }
                CountDownLatch countDownLatch = new CountDownLatch(arrayList.size());
                ArrayList arrayList2 = new ArrayList(i2);
                for (Object obj : arrayList) {
                    arrayList2.add(this.monitorThreadPoolExecutor.submit(() -> {
                        try {
                            try {
                                action1.invoke(obj);
                                countDownLatch.countDown();
                            } catch (Exception e) {
                                throw e;
                            }
                        } catch (Throwable th) {
                            countDownLatch.countDown();
                            throw th;
                        }
                    }));
                }
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    LogUtil.error(e, "parallelFor 任务计数异常", new Object[0]);
                }
                Iterator it = arrayList2.iterator();
                while (it.hasNext()) {
                    try {
                        ((Future) it.next()).get();
                    } catch (Exception e2) {
                        throw new BaseException("parallelFor并行执行出错", e2);
                    }
                }
            }
            return 1;
        });
    }

    public <T> void monitorParallelFor2(String str, int i, Collection<T> collection, Callable.Action1<T> action1) {
        monitorThreadPoolCheckHealth();
        this.monitorSystem.monitorHook().run(str, () -> {
            int i2 = i;
            if (i2 > collection.size()) {
                i2 = collection.size();
            }
            LinkedList linkedList = new LinkedList();
            linkedList.addAll(collection);
            if (linkedList.size() > 0) {
                CountDownLatch countDownLatch = new CountDownLatch(i2);
                Object obj = new Object();
                Ref ref = new Ref(null);
                for (int i3 = 0; i3 < i2; i3++) {
                    this.monitorThreadPoolExecutor.submit(() -> {
                        Object poll;
                        while (true) {
                            synchronized (obj) {
                                poll = linkedList.poll();
                            }
                            if (poll == null || !ref.isNull()) {
                                break;
                            }
                            try {
                                action1.invoke(poll);
                            } catch (Exception e) {
                                countDownLatch.countDown();
                                ref.setData(e);
                                return;
                            }
                        }
                        countDownLatch.countDown();
                    });
                }
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    LogUtil.error(e, "parallelFor 任务计数异常", new Object[0]);
                }
                if (!ref.isNull()) {
                    throw new BaseException("parallelFor 并行执行出错", (Throwable) ref.getData());
                }
            }
            return 1;
        });
    }

    public ThreadPoolExecutor getMonitorThreadPoolExecutor() {
        return this.monitorThreadPoolExecutor;
    }

    public ThreadPoolTaskExecutor getCoreThreadPoolExecutor() {
        return this.coreThreadPoolExecutor;
    }

    public boolean isMonitorCheckHealth() {
        return this.monitorCheckHealth;
    }

    public void setMonitorCheckHealth(boolean z) {
        this.monitorCheckHealth = z;
    }

    public boolean isCoreCheckHealth() {
        return this.coreCheckHealth;
    }

    public void setCoreCheckHealth(boolean z) {
        this.coreCheckHealth = z;
    }

    public MonitorSystem getMonitorSystem() {
        return this.monitorSystem;
    }

    public void setMonitorSystem(MonitorSystem monitorSystem) {
        this.monitorSystem = monitorSystem;
    }

    public String getMonitorThreadName() {
        return this.monitorThreadName;
    }

    public void setMonitorThreadName(String str) {
        this.monitorThreadName = str;
    }

    public Collector getCollector() {
        return this.collector;
    }

    public void setCollector(Collector collector) {
        this.collector = collector;
    }
}
