001/** 002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com). 003 * <p> 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * <p> 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * <p> 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package io.jboot.components.schedule; 017 018import com.jfinal.kit.PathKit; 019import com.jfinal.kit.Prop; 020import com.jfinal.log.Log; 021import io.jboot.Jboot; 022import io.jboot.components.schedule.annotation.Cron; 023import io.jboot.components.schedule.annotation.EnableDistributedRunnable; 024import io.jboot.components.schedule.annotation.FixedDelay; 025import io.jboot.components.schedule.annotation.FixedRate; 026import io.jboot.utils.AnnotationUtil; 027import io.jboot.utils.ClassScanner; 028import io.jboot.utils.ClassUtil; 029import io.jboot.utils.NamedThreadFactory; 030 031import java.io.File; 032import java.util.List; 033import java.util.Map; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.ScheduledFuture; 036import java.util.concurrent.ScheduledThreadPoolExecutor; 037import java.util.concurrent.TimeUnit; 038 039 040public class JbootScheduleManager { 041 042 private static final Log LOG = Log.getLog(JbootScheduleManager.class); 043 044 private static JbootScheduleManager manager = new JbootScheduleManager(); 045 private JbootCron4jPlugin cron4jPlugin; 046 private ScheduledThreadPoolExecutor fixedScheduler; 047 private JbooScheduleConfig config; 048 049 private Map<Class<?>, Runnable> scheduleRunnableCache = new ConcurrentHashMap<>(); 050 051 // add by lixin 08.08, 用于 remove fixedScheduler 052 private Map<Class<?>, ScheduledFuture<?>> scheduleFutureCache = new ConcurrentHashMap<>(); 053 054 public JbootScheduleManager() { 055 config = Jboot.config(JbooScheduleConfig.class); 056 fixedScheduler = new ScheduledThreadPoolExecutor(config.getPoolSize(), new NamedThreadFactory("jboot-scheduler")); 057 058 File cron4jProperties = new File(PathKit.getRootClassPath(), config.getCron4jFile()); 059 cron4jPlugin = cron4jProperties.exists() 060 ? new JbootCron4jPlugin(new Prop(config.getCron4jFile())) 061 : new JbootCron4jPlugin(); 062 } 063 064 065 public static final JbootScheduleManager me() { 066 return manager; 067 } 068 069 public void init() { 070 initSchedules(); 071// initCron4jPlugin(); 072 cron4jPlugin.start(); 073 } 074 075 public void stop() { 076 fixedScheduler.shutdownNow(); 077 cron4jPlugin.stop(); 078 } 079 080 private void initSchedules() { 081 List<Class<Runnable>> runnableClass = ClassScanner.scanSubClass(Runnable.class, true); 082 runnableClass.forEach(this::addSchedule); 083 } 084 085 086 public void addSchedule(Class<? extends Runnable> runnableClass) { 087 FixedDelay fixedDelayJob = runnableClass.getAnnotation(FixedDelay.class); 088 if (fixedDelayJob != null) { 089 Runnable runnable = ClassUtil.newInstance(runnableClass); 090 091 EnableDistributedRunnable enableDistributedRunnable = runnableClass.getAnnotation(EnableDistributedRunnable.class); 092 Runnable executeRunnable = enableDistributedRunnable == null 093 ? runnable 094 : new JbootDistributedRunnable(runnable, AnnotationUtil.get(enableDistributedRunnable.redisKey()), enableDistributedRunnable.expireSeconds()); 095 096 // ScheduledThreadPoolExecutor 线程池在遇到未捕获的异常时会终止调度 097 // JbootSafeRunnable 可以捕捉业务代码异常,防止线程池意外终止调度 098 executeRunnable = new JbootSafeRunnable(executeRunnable); 099 try { 100 scheduleRunnableCache.put(runnableClass, executeRunnable); 101 // modified by lixin 08.08, 用于remove fixedScheduler 102 ScheduledFuture<?> sf = fixedScheduler.scheduleWithFixedDelay(executeRunnable, fixedDelayJob.initialDelay(), fixedDelayJob.period(), TimeUnit.SECONDS); 103 scheduleFutureCache.put(runnableClass, sf); 104 } catch (Exception e) { 105 LOG.error(e.toString(), e); 106 } 107 } 108 109 FixedRate fixedRateJob = runnableClass.getAnnotation(FixedRate.class); 110 if (fixedRateJob != null) { 111 Runnable runnable = ClassUtil.newInstance(runnableClass); 112 113 EnableDistributedRunnable enableDistributedRunnable = runnableClass.getAnnotation(EnableDistributedRunnable.class); 114 Runnable executeRunnable = enableDistributedRunnable == null 115 ? runnable 116 : new JbootDistributedRunnable(runnable, AnnotationUtil.get(enableDistributedRunnable.redisKey()), enableDistributedRunnable.expireSeconds()); 117 118 119 executeRunnable = new JbootSafeRunnable(executeRunnable); 120 try { 121 scheduleRunnableCache.put(runnableClass, executeRunnable); 122 // modified by lixin 08.08, 用于 remove fixedScheduler 123 ScheduledFuture<?> future = fixedScheduler.scheduleAtFixedRate(executeRunnable, fixedRateJob.initialDelay(), fixedRateJob.period(), TimeUnit.SECONDS); 124 scheduleFutureCache.put(runnableClass, future); 125 } catch (Exception e) { 126 LOG.error(e.toString(), e); 127 } 128 } 129 130 131 Cron cron = runnableClass.getAnnotation(Cron.class); 132 if (cron != null) { 133 String value = AnnotationUtil.get(cron.value()); 134 Runnable runnable = ClassUtil.newInstance(runnableClass); 135 136 EnableDistributedRunnable enableDistributedRunnable = runnableClass.getAnnotation(EnableDistributedRunnable.class); 137 Runnable executeRunnable = enableDistributedRunnable == null 138 ? runnable 139 : new JbootDistributedRunnable(runnable, AnnotationUtil.get(enableDistributedRunnable.redisKey()), enableDistributedRunnable.expireSeconds()); 140 141 scheduleRunnableCache.put(runnableClass, executeRunnable); 142 cron4jPlugin.addTask(value, executeRunnable, cron.daemon()); 143 } 144 } 145 146 147 //不支持 cron4jPlugin 的remove 148 public void removeSchedule(Class<? extends Runnable> removeClass) { 149 Runnable runnable = scheduleRunnableCache.get(removeClass); 150 if (runnable != null) { 151 fixedScheduler.remove(runnable); 152 scheduleRunnableCache.remove(removeClass); 153 } 154 155 //add by lixin 08.08, 用于 remove fixedScheduler 156 ScheduledFuture<?> sf = scheduleFutureCache.remove(removeClass); 157 if (sf != null) { 158 sf.cancel(true); 159 } 160 } 161 162 public Map<Class<?>, Runnable> getScheduleRunnableCache() { 163 return scheduleRunnableCache; 164 } 165 166 public JbootCron4jPlugin getCron4jPlugin() { 167 return cron4jPlugin; 168 } 169 170 public ScheduledThreadPoolExecutor getFixedScheduler() { 171 return fixedScheduler; 172 } 173 174 public JbooScheduleConfig getConfig() { 175 return config; 176 } 177 178 public Map<Class<?>, ScheduledFuture<?>> getScheduleFutureCache() { 179 return scheduleFutureCache; 180 } 181}