001/**
002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com).
003 * <p>
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * <p>
008 * http://www.apache.org/licenses/LICENSE-2.0
009 * <p>
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package io.jboot.components.schedule;
017
018import com.jfinal.kit.PathKit;
019import com.jfinal.kit.Prop;
020import com.jfinal.log.Log;
021import io.jboot.Jboot;
022import io.jboot.components.schedule.annotation.Cron;
023import io.jboot.components.schedule.annotation.EnableDistributedRunnable;
024import io.jboot.components.schedule.annotation.FixedDelay;
025import io.jboot.components.schedule.annotation.FixedRate;
026import io.jboot.utils.AnnotationUtil;
027import io.jboot.utils.ClassScanner;
028import io.jboot.utils.ClassUtil;
029import io.jboot.utils.NamedThreadFactory;
030
031import java.io.File;
032import java.util.List;
033import java.util.Map;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.ScheduledFuture;
036import java.util.concurrent.ScheduledThreadPoolExecutor;
037import java.util.concurrent.TimeUnit;
038
039
040public class JbootScheduleManager {
041
042    private static final Log LOG = Log.getLog(JbootScheduleManager.class);
043
044    private static JbootScheduleManager manager = new JbootScheduleManager();
045    private JbootCron4jPlugin cron4jPlugin;
046    private ScheduledThreadPoolExecutor fixedScheduler;
047    private JbooScheduleConfig config;
048
049    private Map<Class<?>, Runnable> scheduleRunnableCache = new ConcurrentHashMap<>();
050
051    // add by lixin 08.08, 用于 remove fixedScheduler 
052    private Map<Class<?>, ScheduledFuture<?>> scheduleFutureCache = new ConcurrentHashMap<>();
053
054    public JbootScheduleManager() {
055        config = Jboot.config(JbooScheduleConfig.class);
056        fixedScheduler = new ScheduledThreadPoolExecutor(config.getPoolSize(), new NamedThreadFactory("jboot-scheduler"));
057
058        File cron4jProperties = new File(PathKit.getRootClassPath(), config.getCron4jFile());
059        cron4jPlugin = cron4jProperties.exists()
060                ? new JbootCron4jPlugin(new Prop(config.getCron4jFile()))
061                : new JbootCron4jPlugin();
062    }
063
064
065    public static final JbootScheduleManager me() {
066        return manager;
067    }
068
069    public void init() {
070        initSchedules();
071//        initCron4jPlugin();
072        cron4jPlugin.start();
073    }
074
075    public void stop() {
076        fixedScheduler.shutdownNow();
077        cron4jPlugin.stop();
078    }
079
080    private void initSchedules() {
081        List<Class<Runnable>> runnableClass = ClassScanner.scanSubClass(Runnable.class, true);
082        runnableClass.forEach(this::addSchedule);
083    }
084
085
086    public void addSchedule(Class<? extends Runnable> runnableClass) {
087        FixedDelay fixedDelayJob = runnableClass.getAnnotation(FixedDelay.class);
088        if (fixedDelayJob != null) {
089            Runnable runnable = ClassUtil.newInstance(runnableClass);
090
091            EnableDistributedRunnable enableDistributedRunnable = runnableClass.getAnnotation(EnableDistributedRunnable.class);
092            Runnable executeRunnable = enableDistributedRunnable == null
093                    ? runnable
094                    : new JbootDistributedRunnable(runnable, AnnotationUtil.get(enableDistributedRunnable.redisKey()), enableDistributedRunnable.expireSeconds());
095
096            // ScheduledThreadPoolExecutor 线程池在遇到未捕获的异常时会终止调度
097            // JbootSafeRunnable 可以捕捉业务代码异常,防止线程池意外终止调度
098            executeRunnable = new JbootSafeRunnable(executeRunnable);
099            try {
100                scheduleRunnableCache.put(runnableClass, executeRunnable);
101                // modified by lixin 08.08, 用于remove fixedScheduler
102                ScheduledFuture<?> sf = fixedScheduler.scheduleWithFixedDelay(executeRunnable, fixedDelayJob.initialDelay(), fixedDelayJob.period(), TimeUnit.SECONDS);
103                scheduleFutureCache.put(runnableClass, sf);
104            } catch (Exception e) {
105                LOG.error(e.toString(), e);
106            }
107        }
108
109        FixedRate fixedRateJob = runnableClass.getAnnotation(FixedRate.class);
110        if (fixedRateJob != null) {
111            Runnable runnable = ClassUtil.newInstance(runnableClass);
112
113            EnableDistributedRunnable enableDistributedRunnable = runnableClass.getAnnotation(EnableDistributedRunnable.class);
114            Runnable executeRunnable = enableDistributedRunnable == null
115                    ? runnable
116                    : new JbootDistributedRunnable(runnable, AnnotationUtil.get(enableDistributedRunnable.redisKey()), enableDistributedRunnable.expireSeconds());
117
118
119            executeRunnable = new JbootSafeRunnable(executeRunnable);
120            try {
121                scheduleRunnableCache.put(runnableClass, executeRunnable);
122                // modified by lixin 08.08, 用于 remove fixedScheduler
123                ScheduledFuture<?> future = fixedScheduler.scheduleAtFixedRate(executeRunnable, fixedRateJob.initialDelay(), fixedRateJob.period(), TimeUnit.SECONDS);
124                scheduleFutureCache.put(runnableClass, future);
125            } catch (Exception e) {
126                LOG.error(e.toString(), e);
127            }
128        }
129
130
131        Cron cron = runnableClass.getAnnotation(Cron.class);
132        if (cron != null) {
133            String value = AnnotationUtil.get(cron.value());
134            Runnable runnable = ClassUtil.newInstance(runnableClass);
135
136            EnableDistributedRunnable enableDistributedRunnable = runnableClass.getAnnotation(EnableDistributedRunnable.class);
137            Runnable executeRunnable = enableDistributedRunnable == null
138                    ? runnable
139                    : new JbootDistributedRunnable(runnable, AnnotationUtil.get(enableDistributedRunnable.redisKey()), enableDistributedRunnable.expireSeconds());
140
141            scheduleRunnableCache.put(runnableClass, executeRunnable);
142            cron4jPlugin.addTask(value, executeRunnable, cron.daemon());
143        }
144    }
145
146
147    //不支持 cron4jPlugin 的remove
148    public void removeSchedule(Class<? extends Runnable> removeClass) {
149        Runnable runnable = scheduleRunnableCache.get(removeClass);
150        if (runnable != null) {
151            fixedScheduler.remove(runnable);
152            scheduleRunnableCache.remove(removeClass);
153        }
154
155        //add by lixin 08.08, 用于 remove fixedScheduler
156        ScheduledFuture<?> sf = scheduleFutureCache.remove(removeClass);
157        if (sf != null) {
158            sf.cancel(true);
159        }
160    }
161
162    public Map<Class<?>, Runnable> getScheduleRunnableCache() {
163        return scheduleRunnableCache;
164    }
165
166    public JbootCron4jPlugin getCron4jPlugin() {
167        return cron4jPlugin;
168    }
169
170    public ScheduledThreadPoolExecutor getFixedScheduler() {
171        return fixedScheduler;
172    }
173
174    public JbooScheduleConfig getConfig() {
175        return config;
176    }
177
178    public Map<Class<?>, ScheduledFuture<?>> getScheduleFutureCache() {
179        return scheduleFutureCache;
180    }
181}