001/** 002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com). 003 * <p> 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * <p> 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * <p> 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package io.jboot.components.schedule; 017 018import com.jfinal.kit.LogKit; 019import com.jfinal.log.Log; 020import io.jboot.Jboot; 021import io.jboot.support.redis.JbootRedis; 022import io.jboot.utils.ClassUtil; 023import io.jboot.utils.QuietlyUtil; 024import io.jboot.utils.StrUtil; 025 026/** 027 * @author Michael Yang 杨福海 (fuhai999@gmail.com) 028 * @version V1.0 029 * @Title: 分布式任务 030 * @Description: 在分布式应用中,处理分布式应用,基于redis。 031 * <p> 032 * 特点: 033 * 1、简单,无需依赖数据库。 034 * 2、高可用,不存在单点故障 035 * 3、一致性,在集群环境中,只有一个任务在执行。 036 * 4、Failover,支持故障转移 037 */ 038public class JbootDistributedRunnable implements Runnable { 039 040 private static final Log LOG = Log.getLog(JbootDistributedRunnable.class); 041 042 private JbootRedis redis; 043 044 // 锁的过期时间,单位毫秒,默认 1 分钟, 045 // 因此,配置的定时任务,每次执行任务的时间间隔,必须大于 1 分钟以上 046 private int expire = 60 * 1000; 047 private String key; 048 private Runnable runnable; 049 050 051 public JbootDistributedRunnable(Runnable runnable, String key, int expireSeconds) { 052 this.runnable = runnable; 053 054 if (StrUtil.isNotBlank(key)) { 055 this.key = key; 056 } else { 057 this.key = "jboot-distributed-key:" + ClassUtil.getUsefulClass(runnable.getClass()).getName(); 058 } 059 060 if (expireSeconds > 0) { 061 this.expire = expireSeconds * 1000; 062 } 063 064 this.redis = Jboot.getRedis(); 065 066 if (redis == null) { 067 LOG.warn("Redis is null, Can not use @EnableDistributedRunnable in your class: " 068 + ClassUtil.getUsefulClass(runnable.getClass()).getName() 069 + ", Please config redis info in jboot.properties"); 070 } 071 072 } 073 074 075 @Override 076 public void run() { 077 if (redis == null) { 078 // 当未配置 redis 的时候,默认使用本地任务的方式进行执行 079 runnable.run(); 080 return; 081 } 082 083 084 Long setTimeMillis = System.currentTimeMillis(); 085 086 //要设置的值 087 String setValue = setTimeMillis + ":" + StrUtil.uuid(); 088 089 090 boolean locked = false; 091 092 for (int i = 0; i < 5; i++) { 093 094 //setnx: 只在键 key 不存在的情况下, 将键 key 的值设置为 value, 若键 key 已经存在, 则 SETNX 命令不做任何动作。 095 //result: 设置成功,返回 1,设置失败,返回 0 096 Long result = redis.setnx(key, setValue); 097 098 //error 一般不会出现这种情况,除非是网络异常等原因 099 if (result == null) { 100 quietlySleep(); 101 continue; 102 } 103 104 105 //setnx success 设置成功 106 if (result == 1) { 107 String value = redis.get(key); 108 109 //在分布式的场景下,可能自己设置成功后,又被其他节点删除重新设置的情况 110 //需要判断是否是当前节点(或者线程)设置的 111 if (setValue.equals(value)) { 112 locked = true; 113 break; 114 } 115 // 可能被其他节点删除,或重置了 116 else { 117 quietlySleep(); 118 continue; 119 } 120 } 121 122 //setnx fail,可能已经被其他 server 优先设置了,也有可能是 自己的server 在上一次任务里设置了 123 else if (result == 0) { 124 125 String value = null; 126 127 try { 128 value = redis.get(key); 129 } catch (Exception ex) { 130 LogKit.logNothing(ex); 131 } 132 133 134 //获取不到,一般不会出现这种情况,除非是网络异常等原因 135 //或者是使用了已经存在的 key,但是此 key 已经有其他序列化方式的值导致异常 136 if (value == null) { 137 reset(); 138 quietlySleep(); 139 continue; 140 } 141 142 String[] split = value.split(":"); 143 144 //被其他节点,或者手动操作 redis 的方式给设置了这个key值 145 if (split.length != 2) { 146 reset(); 147 continue; 148 } 149 150 //获取设置的时间 151 long savedTimeMillis = 0; 152 153 try { 154 savedTimeMillis = Long.parseLong(split[0]); 155 } catch (NumberFormatException ex) { 156 LogKit.logNothing(ex); 157 } 158 159 //redis 存储的内容有问题,可能是被手动设置 redis 的方式设置了这个 key 值 160 if (savedTimeMillis == 0) { 161 reset(); 162 continue; 163 } 164 165 if ((System.currentTimeMillis() - savedTimeMillis) > expire) { 166 //若设置锁的时间以及过期了 167 //说明是上一次任务配置的,此时需要删除这个过期的 key,然后重新去抢 168 reset(); 169 } 170 // 若锁没有过期,休息后重新去抢,因为抢走的线程可能会重新释放 171 else { 172 quietlySleep(); 173 } 174 } 175 176 } 177 178 //抢了5次都抢不到,证明已经被别的应用抢走了 179 if (!locked) { 180 return; 181 } 182 183 184 try { 185 runnable.run(); 186 } 187 188 // 如果 run() 执行异常,让别的分布式应用APP去执行 189 // 但如果 run() 执行的时间很长(超过30秒),而且失败了,那么别的分布式应用可能也抢不到了,只能等待下次任务 190 // 作用:故障转移 191 catch (Throwable ex) { 192 LOG.error(ex.toString(), ex); 193 reset(); 194 } 195 } 196 197 198 /** 199 * 重置分布式的 key 200 */ 201 private void reset() { 202 try { 203 redis.del(key); 204 } catch (Exception ex) { 205 LogKit.logNothing(ex); 206 } 207 } 208 209 210 public void quietlySleep() { 211 int millis = 2000; 212 if (this.expire <= 2000) { 213 millis = 100; 214 } else if (this.expire <= 5000) { 215 millis = 500; 216 } else if (this.expire <= 300000) { 217 millis = 1000; 218 } 219 220 QuietlyUtil.sleepQuietly(millis); 221 } 222 223 224}