001/**
002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com).
003 * <p>
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * <p>
008 * http://www.apache.org/licenses/LICENSE-2.0
009 * <p>
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package io.jboot.components.schedule;
017
018import com.jfinal.kit.LogKit;
019import com.jfinal.log.Log;
020import io.jboot.Jboot;
021import io.jboot.support.redis.JbootRedis;
022import io.jboot.utils.ClassUtil;
023import io.jboot.utils.QuietlyUtil;
024import io.jboot.utils.StrUtil;
025
026/**
027 * @author Michael Yang 杨福海 (fuhai999@gmail.com)
028 * @version V1.0
029 * @Title: 分布式任务
030 * @Description: 在分布式应用中,处理分布式应用,基于redis。
031 * <p>
032 * 特点:
033 * 1、简单,无需依赖数据库。
034 * 2、高可用,不存在单点故障
035 * 3、一致性,在集群环境中,只有一个任务在执行。
036 * 4、Failover,支持故障转移
037 */
038public class JbootDistributedRunnable implements Runnable {
039
040    private static final Log LOG = Log.getLog(JbootDistributedRunnable.class);
041
042    private JbootRedis redis;
043
044    // 锁的过期时间,单位毫秒,默认 1 分钟,
045    // 因此,配置的定时任务,每次执行任务的时间间隔,必须大于 1 分钟以上
046    private int expire = 60 * 1000;
047    private String key;
048    private Runnable runnable;
049
050
051    public JbootDistributedRunnable(Runnable runnable, String key, int expireSeconds) {
052        this.runnable = runnable;
053
054        if (StrUtil.isNotBlank(key)) {
055            this.key = key;
056        } else {
057            this.key = "jboot-distributed-key:" + ClassUtil.getUsefulClass(runnable.getClass()).getName();
058        }
059
060        if (expireSeconds > 0) {
061            this.expire = expireSeconds * 1000;
062        }
063
064        this.redis = Jboot.getRedis();
065
066        if (redis == null) {
067            LOG.warn("Redis is null, Can not use @EnableDistributedRunnable in your class: "
068                    + ClassUtil.getUsefulClass(runnable.getClass()).getName()
069                    + ", Please config redis info in jboot.properties");
070        }
071
072    }
073
074
075    @Override
076    public void run() {
077        if (redis == null) {
078            // 当未配置 redis 的时候,默认使用本地任务的方式进行执行
079            runnable.run();
080            return;
081        }
082
083
084        Long setTimeMillis = System.currentTimeMillis();
085
086        //要设置的值
087        String setValue = setTimeMillis + ":" + StrUtil.uuid();
088
089
090        boolean locked = false;
091
092        for (int i = 0; i < 5; i++) {
093
094            //setnx: 只在键 key 不存在的情况下, 将键 key 的值设置为 value, 若键 key 已经存在, 则 SETNX 命令不做任何动作。
095            //result: 设置成功,返回 1,设置失败,返回 0
096            Long result = redis.setnx(key, setValue);
097
098            //error 一般不会出现这种情况,除非是网络异常等原因
099            if (result == null) {
100                quietlySleep();
101                continue;
102            }
103
104
105            //setnx success 设置成功
106            if (result == 1) {
107                String value = redis.get(key);
108
109                //在分布式的场景下,可能自己设置成功后,又被其他节点删除重新设置的情况
110                //需要判断是否是当前节点(或者线程)设置的
111                if (setValue.equals(value)) {
112                    locked = true;
113                    break;
114                }
115                // 可能被其他节点删除,或重置了
116                else {
117                    quietlySleep();
118                    continue;
119                }
120            }
121
122            //setnx fail,可能已经被其他 server 优先设置了,也有可能是 自己的server 在上一次任务里设置了
123            else if (result == 0) {
124
125                String value = null;
126
127                try {
128                    value = redis.get(key);
129                } catch (Exception ex) {
130                    LogKit.logNothing(ex);
131                }
132
133
134                //获取不到,一般不会出现这种情况,除非是网络异常等原因
135                //或者是使用了已经存在的 key,但是此 key 已经有其他序列化方式的值导致异常
136                if (value == null) {
137                    reset();
138                    quietlySleep();
139                    continue;
140                }
141
142                String[] split = value.split(":");
143
144                //被其他节点,或者手动操作 redis 的方式给设置了这个key值
145                if (split.length != 2) {
146                    reset();
147                    continue;
148                }
149
150                //获取设置的时间
151                long savedTimeMillis = 0;
152
153                try {
154                    savedTimeMillis = Long.parseLong(split[0]);
155                } catch (NumberFormatException ex) {
156                    LogKit.logNothing(ex);
157                }
158
159                //redis 存储的内容有问题,可能是被手动设置 redis 的方式设置了这个 key 值
160                if (savedTimeMillis == 0) {
161                    reset();
162                    continue;
163                }
164
165                if ((System.currentTimeMillis() - savedTimeMillis) > expire) {
166                    //若设置锁的时间以及过期了
167                    //说明是上一次任务配置的,此时需要删除这个过期的 key,然后重新去抢
168                    reset();
169                }
170                // 若锁没有过期,休息后重新去抢,因为抢走的线程可能会重新释放
171                else {
172                    quietlySleep();
173                }
174            }
175
176        }
177
178        //抢了5次都抢不到,证明已经被别的应用抢走了
179        if (!locked) {
180            return;
181        }
182
183
184        try {
185            runnable.run();
186        }
187
188        // 如果 run() 执行异常,让别的分布式应用APP去执行
189        // 但如果 run() 执行的时间很长(超过30秒),而且失败了,那么别的分布式应用可能也抢不到了,只能等待下次任务
190        // 作用:故障转移
191        catch (Throwable ex) {
192            LOG.error(ex.toString(), ex);
193            reset();
194        }
195    }
196
197
198    /**
199     * 重置分布式的 key
200     */
201    private void reset() {
202        try {
203            redis.del(key);
204        } catch (Exception ex) {
205            LogKit.logNothing(ex);
206        }
207    }
208
209
210    public void quietlySleep() {
211        int millis = 2000;
212        if (this.expire <= 2000) {
213            millis = 100;
214        } else if (this.expire <= 5000) {
215            millis = 500;
216        } else if (this.expire <= 300000) {
217            millis = 1000;
218        }
219
220        QuietlyUtil.sleepQuietly(millis);
221    }
222
223
224}