001/**
002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com).
003 * <p>
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * <p>
008 * http://www.apache.org/licenses/LICENSE-2.0
009 * <p>
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package io.jboot.support.redis.jedis;
017
018import com.jfinal.log.Log;
019import io.jboot.exception.JbootException;
020import io.jboot.support.redis.JbootRedisBase;
021import io.jboot.support.redis.JbootRedisConfig;
022import io.jboot.support.redis.RedisScanResult;
023import io.jboot.utils.QuietlyUtil;
024import io.jboot.utils.StrUtil;
025import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
026import redis.clients.jedis.*;
027import redis.clients.jedis.exceptions.JedisConnectionException;
028import redis.clients.jedis.params.ScanParams;
029import redis.clients.jedis.resps.ScanResult;
030
031import java.util.*;
032import java.util.Map.Entry;
033import java.util.stream.Collectors;
034
035/**
036 * 参考: com.jfinal.plugin.redis
037 * JbootRedis 命令文档: http://redisdoc.com/
038 */
039public class JbootJedisClusterImpl extends JbootRedisBase {
040
041    protected JedisCluster jedisCluster;
042    private int timeout = 2000;
043    private int maxAttempts = 5;
044
045    static final Log LOG = Log.getLog(JbootJedisClusterImpl.class);
046
047
048    public JbootJedisClusterImpl(JbootRedisConfig config) {
049
050        super(config);
051
052        Integer timeout = config.getTimeout();
053        String password = config.getPassword();
054        Integer maxAttempts = config.getMaxAttempts();
055
056        if (timeout != null) {
057            this.timeout = timeout;
058        }
059        if (maxAttempts == null) {
060            maxAttempts = this.maxAttempts;
061        }
062
063        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
064
065        if (StrUtil.isNotBlank(config.getTestWhileIdle())) {
066            poolConfig.setTestWhileIdle(config.getTestWhileIdle());
067        }
068
069        if (StrUtil.isNotBlank(config.getTestOnBorrow())) {
070            poolConfig.setTestOnBorrow(config.getTestOnBorrow());
071        }
072
073        if (StrUtil.isNotBlank(config.getTestOnCreate())) {
074            poolConfig.setTestOnCreate(config.getTestOnCreate());
075        }
076
077        if (StrUtil.isNotBlank(config.getTestOnReturn())) {
078            poolConfig.setTestOnReturn(config.getTestOnReturn());
079        }
080
081        if (StrUtil.isNotBlank(config.getMinEvictableIdleTimeMillis())) {
082            poolConfig.setMinEvictableIdleTimeMillis(config.getMinEvictableIdleTimeMillis());
083        }
084
085        if (StrUtil.isNotBlank(config.getTimeBetweenEvictionRunsMillis())) {
086            poolConfig.setTimeBetweenEvictionRunsMillis(config.getTimeBetweenEvictionRunsMillis());
087        }
088
089        if (StrUtil.isNotBlank(config.getNumTestsPerEvictionRun())) {
090            poolConfig.setNumTestsPerEvictionRun(config.getNumTestsPerEvictionRun());
091        }
092
093        if (StrUtil.isNotBlank(config.getMaxTotal())) {
094            poolConfig.setMaxTotal(config.getMaxTotal());
095        }
096
097        if (StrUtil.isNotBlank(config.getMaxIdle())) {
098            poolConfig.setMaxIdle(config.getMaxIdle());
099        }
100
101        if (StrUtil.isNotBlank(config.getMinIdle())) {
102            poolConfig.setMinIdle(config.getMinIdle());
103        }
104
105        if (StrUtil.isNotBlank(config.getMaxWaitMillis())) {
106            poolConfig.setMaxWaitMillis(config.getMaxWaitMillis());
107        }
108        this.jedisCluster = newJedisCluster(config.getHostAndPorts(), timeout, maxAttempts, password, poolConfig);
109    }
110
111    public static JedisCluster newJedisCluster(Set<HostAndPort> haps, Integer timeout,
112                                               Integer maxAttempts, String password, GenericObjectPoolConfig poolConfig) {
113        JedisCluster jedisCluster;
114
115        if (timeout != null && maxAttempts != null && password != null && poolConfig != null) {
116            jedisCluster = new JedisCluster(haps, timeout, timeout, maxAttempts, password, poolConfig);
117        } else if (timeout != null && maxAttempts != null && poolConfig != null) {
118            jedisCluster = new JedisCluster(haps, timeout, maxAttempts, poolConfig);
119        } else if (timeout != null && maxAttempts != null) {
120            jedisCluster = new JedisCluster(haps, timeout, maxAttempts);
121        } else if (timeout != null && poolConfig != null) {
122            jedisCluster = new JedisCluster(haps, timeout, poolConfig);
123        } else if (timeout != null) {
124            jedisCluster = new JedisCluster(haps, timeout);
125        } else {
126            jedisCluster = new JedisCluster(haps);
127        }
128        return jedisCluster;
129    }
130
131    public JbootJedisClusterImpl(JedisCluster jedisCluster) {
132        super(null);
133        this.jedisCluster = jedisCluster;
134    }
135
136    /**
137     * 存放 key value 对到 redis
138     * 如果 key 已经持有其他值, SET 就覆写旧值,无视类型。
139     * 对于某个原本带有生存时间(TTL)的键来说, 当 SET 命令成功在这个键上执行时, 这个键原有的 TTL 将被清除。
140     */
141    @Override
142    public String set(Object key, Object value) {
143        return jedisCluster.set(keyToBytes(key), valueToBytes(value));
144    }
145
146    @Override
147    public Long setnx(Object key, Object value) {
148        return jedisCluster.setnx(keyToBytes(key), valueToBytes(value));
149    }
150
151    /**
152     * 存放 key value 对到 redis
153     * 如果 key 已经持有其他值, SET 就覆写旧值,无视类型。
154     * 此方法用了修改 incr 等的值
155     */
156    @Override
157    public String setWithoutSerialize(Object key, Object value) {
158        return jedisCluster.set(keyToBytes(key), value.toString().getBytes());
159    }
160
161
162    /**
163     * 存放 key value 对到 redis,并将 key 的生存时间设为 seconds (以秒为单位)。
164     * 如果 key 已经存在, SETEX 命令将覆写旧值。
165     */
166    @Override
167    public String setex(Object key, int seconds, Object value) {
168
169        return jedisCluster.setex(keyToBytes(key), seconds, valueToBytes(value));
170
171    }
172
173    /**
174     * 返回 key 所关联的 value 值
175     * 如果 key 不存在那么返回特殊值 nil 。
176     */
177    @Override
178    @SuppressWarnings("unchecked")
179    public <T> T get(Object key) {
180
181        return (T) valueFromBytes(jedisCluster.get(keyToBytes(key)));
182
183    }
184
185    @Override
186    public String getWithoutSerialize(Object key) {
187        byte[] bytes = jedisCluster.get(keyToBytes(key));
188        if (bytes == null || bytes.length == 0) {
189            return null;
190        }
191        return new String(jedisCluster.get(keyToBytes(key)));
192    }
193
194    /**
195     * 删除给定的一个 key
196     * 不存在的 key 会被忽略。
197     */
198    @Override
199    public Long del(Object key) {
200        return jedisCluster.del(keyToBytes(key));
201    }
202
203    /**
204     * 删除给定的多个 key
205     * 不存在的 key 会被忽略。
206     */
207    @Override
208    public Long del(Object... keys) {
209
210        return jedisCluster.del(keysToBytesArray(keys));
211
212    }
213
214    /**
215     * 查找所有符合给定模式 pattern 的 key 。
216     * KEYS * 匹配数据库中所有 key 。
217     * KEYS h?llo 匹配 hello , hallo 和 hxllo 等。
218     * KEYS h*llo 匹配 hllo 和 heeeeello 等。
219     * KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo 。
220     * 特殊符号用 \ 隔开
221     */
222    @Override
223    public Set<String> keys(String pattern) {
224        HashSet<String> keys = new HashSet<>();
225        Map<String, ConnectionPool> clusterNodes = jedisCluster.getClusterNodes();
226        for (String k : clusterNodes.keySet()) {
227            ConnectionPool cp = clusterNodes.get(k);
228            Connection conn = cp.getResource();
229            try {
230                keys.addAll(conn.executeCommand(new ClusterCommandObjects().keys(pattern)));
231            } catch (Exception e) {
232                LOG.error(e.toString(), e);
233            } finally {
234                conn.close();
235                cp.close(); //用完一定要close这个链接!!!
236            }
237        }
238        return keys;
239    }
240
241
242    /**
243     * 同时设置一个或多个 key-value 对。
244     * 如果某个给定 key 已经存在,那么 MSET 会用新值覆盖原来的旧值,如果这不是你所希望的效果,请考虑使用 MSETNX 命令:它只会在所有给定 key 都不存在的情况下进行设置操作。
245     * MSET 是一个原子性(atomic)操作,所有给定 key 都会在同一时间内被设置,某些给定 key 被更新而另一些给定 key 没有改变的情况,不可能发生。
246     * <pre>
247     * 例子:
248     * Cache cache = RedisKit.use();                    // 使用 JbootRedis 的 cache
249     * cache.mset("k1", "v1", "k2", "v2");              // 放入多个 key value 键值对
250     * List list = cache.mget("k1", "k2");              // 利用多个键值得到上面代码放入的值
251     * </pre>
252     */
253    @Override
254    public String mset(Object... keysValues) {
255        if (keysValues.length % 2 != 0)
256            throw new IllegalArgumentException("wrong number of arguments for met, keysValues length can not be odd");
257
258        byte[][] kv = new byte[keysValues.length][];
259        for (int i = 0; i < keysValues.length; i++) {
260            if (i % 2 == 0)
261                kv[i] = keyToBytes(keysValues[i]);
262            else
263                kv[i] = valueToBytes(keysValues[i]);
264        }
265        return jedisCluster.mset(kv);
266
267    }
268
269    /**
270     * 返回所有(一个或多个)给定 key 的值。
271     * 如果给定的 key 里面,有某个 key 不存在,那么这个 key 返回特殊值 nil 。因此,该命令永不失败。
272     */
273    @Override
274    @SuppressWarnings("rawtypes")
275    public List mget(Object... keys) {
276
277        byte[][] keysBytesArray = keysToBytesArray(keys);
278        List<byte[]> data = jedisCluster.mget(keysBytesArray);
279        return valueListFromBytesList(data);
280
281    }
282
283    /**
284     * 将 key 中储存的数字值减一。
285     * 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 DECR 操作。
286     * 如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。
287     * 本操作的值限制在 64 位(bit)有符号数字表示之内。
288     * 关于递增(increment) / 递减(decrement)操作的更多信息,请参见 INCR 命令。
289     */
290    @Override
291    public Long decr(Object key) {
292
293        return jedisCluster.decr(keyToBytes(key));
294
295    }
296
297    /**
298     * 将 key 所储存的值减去减量 decrement 。
299     * 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 DECRBY 操作。
300     * 如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。
301     * 本操作的值限制在 64 位(bit)有符号数字表示之内。
302     * 关于更多递增(increment) / 递减(decrement)操作的更多信息,请参见 INCR 命令。
303     */
304    @Override
305    public Long decrBy(Object key, long longValue) {
306
307        return jedisCluster.decrBy(keyToBytes(key), longValue);
308
309    }
310
311    /**
312     * 将 key 中储存的数字值增一。
313     * 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCR 操作。
314     * 如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。
315     * 本操作的值限制在 64 位(bit)有符号数字表示之内。
316     */
317    @Override
318    public Long incr(Object key) {
319
320        return jedisCluster.incr(keyToBytes(key));
321
322    }
323
324    /**
325     * 将 key 所储存的值加上增量 increment 。
326     * 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCRBY 命令。
327     * 如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。
328     * 本操作的值限制在 64 位(bit)有符号数字表示之内。
329     * 关于递增(increment) / 递减(decrement)操作的更多信息,参见 INCR 命令。
330     */
331    @Override
332    public Long incrBy(Object key, long longValue) {
333        return jedisCluster.incrBy(keyToBytes(key), longValue);
334
335    }
336
337    /**
338     * 检查给定 key 是否存在。
339     */
340    @Override
341    public boolean exists(Object key) {
342
343        return jedisCluster.exists(keyToBytes(key));
344
345    }
346
347    /**
348     * 从当前数据库中随机返回(不删除)一个 key 。
349     */
350    @Override
351    public String randomKey() {
352
353        throw new JbootException("not support randomKey commmand in redis cluster.");
354
355    }
356
357    /**
358     * 将 key 改名为 newkey 。
359     * 当 key 和 newkey 相同,或者 key 不存在时,返回一个错误。
360     * 当 newkey 已经存在时, RENAME 命令将覆盖旧值。
361     */
362    @Override
363    public String rename(Object oldkey, Object newkey) {
364
365        return jedisCluster.rename(keyToBytes(oldkey), keyToBytes(newkey));
366
367    }
368
369    /**
370     * 将当前数据库的 key 移动到给定的数据库 db 当中。
371     * 如果当前数据库(源数据库)和给定数据库(目标数据库)有相同名字的给定 key ,或者 key 不存在于当前数据库,那么 MOVE 没有任何效果。
372     * 因此,也可以利用这一特性,将 MOVE 当作锁(locking)原语(primitive)。
373     */
374    @Override
375    public Long move(Object key, int dbIndex) {
376
377//        return jedisCluster.move(keyToBytes(key), dbIndex);
378        throw new JbootException("not support move commmand in redis cluster.");
379
380    }
381
382    /**
383     * 将 key 原子性地从当前实例传送到目标实例的指定数据库上,一旦传送成功, key 保证会出现在目标实例上,而当前实例上的 key 会被删除。
384     */
385    @Override
386    public String migrate(String host, int port, Object key, int destinationDb, int timeout) {
387
388        throw new JbootException("not support migrate commmand in redis cluster.");
389
390    }
391
392    /**
393     * 切换到指定的数据库,数据库索引号 index 用数字值指定,以 0 作为起始索引值。
394     * 默认使用 0 号数据库。
395     * 注意:在 Jedis 对象被关闭时,数据库又会重新被设置为初始值,所以本方法 select(...)
396     * 正常工作需要使用如下方式之一:
397     * 1:使用 RedisInterceptor,在本线程内共享同一个 Jedis 对象
398     * 2:使用 JbootRedis.call(ICallback) 进行操作
399     * 3:自行获取 Jedis 对象进行操作
400     */
401    @Override
402    public String select(int databaseIndex) {
403
404//        return jedisCluster.select(databaseIndex);
405        throw new IllegalStateException("Redis Cluster does not support multiple databases like the stand alone version of Redis, " +
406                "there is just database 0, and SELECT is not allowed.");
407    }
408
409    /**
410     * 为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除。
411     * 在 JbootRedis 中,带有生存时间的 key 被称为『易失的』(volatile)。
412     */
413    @Override
414    public Long expire(Object key, int seconds) {
415
416        return jedisCluster.expire(keyToBytes(key), seconds);
417
418    }
419
420    /**
421     * EXPIREAT 的作用和 EXPIRE 类似,都用于为 key 设置生存时间。不同在于 EXPIREAT 命令接受的时间参数是 UNIX 时间戳(unix timestamp)。
422     */
423    @Override
424    public Long expireAt(Object key, long unixTime) {
425
426        return jedisCluster.expireAt(keyToBytes(key), unixTime);
427
428    }
429
430    /**
431     * 这个命令和 EXPIRE 命令的作用类似,但是它以毫秒为单位设置 key 的生存时间,而不像 EXPIRE 命令那样,以秒为单位。
432     */
433    @Override
434    public Long pexpire(Object key, long milliseconds) {
435
436        return jedisCluster.pexpire(keyToBytes(key), milliseconds);
437
438    }
439
440    /**
441     * 这个命令和 EXPIREAT 命令类似,但它以毫秒为单位设置 key 的过期 unix 时间戳,而不是像 EXPIREAT 那样,以秒为单位。
442     */
443    @Override
444    public Long pexpireAt(Object key, long millisecondsTimestamp) {
445
446        return jedisCluster.pexpireAt(keyToBytes(key), millisecondsTimestamp);
447
448    }
449
450    /**
451     * 将给定 key 的值设为 value ,并返回 key 的旧值(old value)。
452     * 当 key 存在但不是字符串类型时,返回一个错误。
453     */
454    @Override
455    @SuppressWarnings("unchecked")
456    public <T> T getSet(Object key, Object value) {
457
458        return (T) valueFromBytes(jedisCluster.getSet(keyToBytes(key), valueToBytes(value)));
459
460    }
461
462    /**
463     * 移除给定 key 的生存时间,将这个 key 从『易失的』(带生存时间 key )转换成『持久的』(一个不带生存时间、永不过期的 key )。
464     */
465    @Override
466    public Long persist(Object key) {
467
468        return jedisCluster.persist(keyToBytes(key));
469
470    }
471
472    /**
473     * 返回 key 所储存的值的类型。
474     */
475    @Override
476    public String type(Object key) {
477
478        return jedisCluster.type(keyToBytes(key));
479
480    }
481
482    /**
483     * 以秒为单位,返回给定 key 的剩余生存时间(TTL, time to live)。
484     */
485    @Override
486    public Long ttl(Object key) {
487
488        return jedisCluster.ttl(keyToBytes(key));
489
490    }
491
492    /**
493     * 这个命令类似于 TTL 命令,但它以毫秒为单位返回 key 的剩余生存时间,而不是像 TTL 命令那样,以秒为单位。
494     */
495    @Override
496    public Long pttl(Object key) {
497
498        return jedisCluster.pttl(key.toString());
499
500    }
501
502    /**
503     * 对象被引用的数量
504     */
505    @Override
506    public Long objectRefcount(Object key) {
507
508//        return jedisCluster.objectRefcount(keyToBytes(key));
509        throw new JbootException("not support move objectRefcount in redis cluster.");
510    }
511
512    /**
513     * 对象没有被访问的空闲时间
514     */
515    @Override
516    public Long objectIdletime(Object key) {
517
518//        return jedisCluster.objectIdletime(keyToBytes(key));
519        throw new JbootException("not support move objectIdletime in redis cluster.");
520
521    }
522
523    /**
524     * 将哈希表 key 中的域 field 的值设为 value 。
525     * 如果 key 不存在,一个新的哈希表被创建并进行 HSET 操作。
526     * 如果域 field 已经存在于哈希表中,旧值将被覆盖。
527     */
528    @Override
529    public Long hset(Object key, Object field, Object value) {
530
531        return jedisCluster.hset(keyToBytes(key), valueToBytes(field), valueToBytes(value));
532
533    }
534
535    /**
536     * 同时将多个 field-value (域-值)对设置到哈希表 key 中。
537     * 此命令会覆盖哈希表中已存在的域。
538     * 如果 key 不存在,一个空哈希表被创建并执行 HMSET 操作。
539     */
540    @Override
541    public String hmset(Object key, Map<Object, Object> hash) {
542
543        Map<byte[], byte[]> para = new HashMap<byte[], byte[]>();
544        for (Entry<Object, Object> e : hash.entrySet())
545            para.put(valueToBytes(e.getKey()), valueToBytes(e.getValue()));
546        return jedisCluster.hmset(keyToBytes(key), para);
547
548    }
549
550    /**
551     * 返回哈希表 key 中给定域 field 的值。
552     */
553    @Override
554    @SuppressWarnings("unchecked")
555    public <T> T hget(Object key, Object field) {
556
557        return (T) valueFromBytes(jedisCluster.hget(keyToBytes(key), valueToBytes(field)));
558
559    }
560
561    /**
562     * 返回哈希表 key 中,一个或多个给定域的值。
563     * 如果给定的域不存在于哈希表,那么返回一个 nil 值。
564     * 因为不存在的 key 被当作一个空哈希表来处理,所以对一个不存在的 key 进行 HMGET 操作将返回一个只带有 nil 值的表。
565     */
566    @Override
567    @SuppressWarnings("rawtypes")
568    public List hmget(Object key, Object... fields) {
569
570        List<byte[]> data = jedisCluster.hmget(keyToBytes(key), valuesToBytesArray(fields));
571        return valueListFromBytesList(data);
572
573    }
574
575    /**
576     * 删除哈希表 key 中的一个或多个指定域,不存在的域将被忽略。
577     */
578    @Override
579    public Long hdel(Object key, Object... fields) {
580
581        return jedisCluster.hdel(keyToBytes(key), valuesToBytesArray(fields));
582
583    }
584
585    /**
586     * 查看哈希表 key 中,给定域 field 是否存在。
587     */
588    @Override
589    public boolean hexists(Object key, Object field) {
590
591        return jedisCluster.hexists(keyToBytes(key), valueToBytes(field));
592
593    }
594
595    /**
596     * 返回哈希表 key 中,所有的域和值。
597     * 在返回值里,紧跟每个域名(field name)之后是域的值(value),所以返回值的长度是哈希表大小的两倍。
598     */
599    @Override
600    @SuppressWarnings("rawtypes")
601    public Map hgetAll(Object key) {
602
603        Map<byte[], byte[]> data = jedisCluster.hgetAll(keyToBytes(key));
604        Map<Object, Object> result = new HashMap<Object, Object>();
605        for (Entry<byte[], byte[]> e : data.entrySet())
606            result.put(valueFromBytes(e.getKey()), valueFromBytes(e.getValue()));
607        return result;
608
609    }
610
611    /**
612     * 返回哈希表 key 中所有域的值。
613     */
614    @Override
615    @SuppressWarnings("rawtypes")
616    public List hvals(Object key) {
617
618        Collection<byte[]> data = jedisCluster.hvals(keyToBytes(key));
619        return valueListFromBytesList(data);
620
621    }
622
623    /**
624     * 返回哈希表 key 中的所有域。
625     * 底层实现此方法取名为 hfields 更为合适,在此仅为与底层保持一致
626     */
627    @Override
628    public Set<Object> hkeys(Object key) {
629
630        Set<byte[]> fieldSet = jedisCluster.hkeys(keyToBytes(key));
631        Set<Object> result = new HashSet<Object>();
632        fieldSetFromBytesSet(fieldSet, result);
633        return result;
634
635    }
636
637    /**
638     * 返回哈希表 key 中域的数量。
639     */
640    @Override
641    public Long hlen(Object key) {
642
643        return jedisCluster.hlen(keyToBytes(key));
644
645    }
646
647    /**
648     * 为哈希表 key 中的域 field 的值加上增量 increment 。
649     * 增量也可以为负数,相当于对给定域进行减法操作。
650     * 如果 key 不存在,一个新的哈希表被创建并执行 HINCRBY 命令。
651     * 如果域 field 不存在,那么在执行命令前,域的值被初始化为 0 。
652     * 对一个储存字符串值的域 field 执行 HINCRBY 命令将造成一个错误。
653     * 本操作的值被限制在 64 位(bit)有符号数字表示之内。
654     */
655    @Override
656    public Long hincrBy(Object key, Object field, long value) {
657
658        return jedisCluster.hincrBy(keyToBytes(key), valueToBytes(field), value);
659
660    }
661
662    /**
663     * 为哈希表 key 中的域 field 加上浮点数增量 increment 。
664     * 如果哈希表中没有域 field ,那么 HINCRBYFLOAT 会先将域 field 的值设为 0 ,然后再执行加法操作。
665     * 如果键 key 不存在,那么 HINCRBYFLOAT 会先创建一个哈希表,再创建域 field ,最后再执行加法操作。
666     * 当以下任意一个条件发生时,返回一个错误:
667     * 1:域 field 的值不是字符串类型(因为 redis 中的数字和浮点数都以字符串的形式保存,所以它们都属于字符串类型)
668     * 2:域 field 当前的值或给定的增量 increment 不能解释(parse)为双精度浮点数(double precision floating point number)
669     * HINCRBYFLOAT 命令的详细功能和 INCRBYFLOAT 命令类似,请查看 INCRBYFLOAT 命令获取更多相关信息。
670     */
671    @Override
672    public Double hincrByFloat(Object key, Object field, double value) {
673
674        return jedisCluster.hincrByFloat(keyToBytes(key), valueToBytes(field), value);
675
676    }
677
678    /**
679     * 返回列表 key 中,下标为 index 的元素。
680     * 下标(index)参数 start 和 stop 都以 0 为底,也就是说,以 0 表示列表的第一个元素,以 1 表示列表的第二个元素,以此类推。
681     * 你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推。
682     * 如果 key 不是列表类型,返回一个错误。
683     */
684    @Override
685    @SuppressWarnings("unchecked")
686
687    /**
688     * 返回列表 key 中,下标为 index 的元素。
689     * 下标(index)参数 start 和 stop 都以 0 为底,也就是说,以 0 表示列表的第一个元素,
690     * 以 1 表示列表的第二个元素,以此类推。
691     * 你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推。
692     * 如果 key 不是列表类型,返回一个错误。
693     */
694    public <T> T lindex(Object key, long index) {
695
696        return (T) valueFromBytes(jedisCluster.lindex(keyToBytes(key), index));
697
698    }
699
700
701    /**
702     * 返回列表 key 的长度。
703     * 如果 key 不存在,则 key 被解释为一个空列表,返回 0 .
704     * 如果 key 不是列表类型,返回一个错误。
705     */
706    @Override
707    public Long llen(Object key) {
708
709        return jedisCluster.llen(keyToBytes(key));
710
711    }
712
713    /**
714     * 移除并返回列表 key 的头元素。
715     */
716    @Override
717    @SuppressWarnings("unchecked")
718    public <T> T lpop(Object key) {
719
720        return (T) valueFromBytes(jedisCluster.lpop(keyToBytes(key)));
721
722    }
723
724    /**
725     * 将一个或多个值 value 插入到列表 key 的表头
726     * 如果有多个 value 值,那么各个 value 值按从左到右的顺序依次插入到表头: 比如说,
727     * 对空列表 mylist 执行命令 LPUSH mylist a b c ,列表的值将是 c b a ,
728     * 这等同于原子性地执行 LPUSH mylist a 、 LPUSH mylist b 和 LPUSH mylist c 三个命令。
729     * 如果 key 不存在,一个空列表会被创建并执行 LPUSH 操作。
730     * 当 key 存在但不是列表类型时,返回一个错误。
731     */
732    @Override
733    public Long lpush(Object key, Object... values) {
734
735        return jedisCluster.lpush(keyToBytes(key), valuesToBytesArray(values));
736
737    }
738
739    /**
740     * 将列表 key 下标为 index 的元素的值设置为 value 。
741     * 当 index 参数超出范围,或对一个空列表( key 不存在)进行 LSET 时,返回一个错误。
742     * 关于列表下标的更多信息,请参考 LINDEX 命令。
743     */
744    @Override
745    public String lset(Object key, long index, Object value) {
746
747        return jedisCluster.lset(keyToBytes(key), index, valueToBytes(value));
748
749    }
750
751    /**
752     * 根据参数 count 的值,移除列表中与参数 value 相等的元素。
753     * count 的值可以是以下几种:
754     * count 大于 0 : 从表头开始向表尾搜索,移除与 value 相等的元素,数量为 count 。
755     * count 小于 0 : 从表尾开始向表头搜索,移除与 value 相等的元素,数量为 count 的绝对值。
756     * count 等于 0 : 移除表中所有与 value 相等的值。
757     */
758    @Override
759    public Long lrem(Object key, long count, Object value) {
760
761        return jedisCluster.lrem(keyToBytes(key), count, valueToBytes(value));
762
763    }
764
765    /**
766     * 返回列表 key 中指定区间内的元素,区间以偏移量 start 和 stop 指定。
767     * 下标(index)参数 start 和 stop 都以 0 为底,也就是说,以 0 表示列表的第一个元素,以 1 表示列表的第二个元素,以此类推。
768     * 你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推。
769     * <pre>
770     * 例子:
771     * 获取 list 中所有数据:cache.lrange(listKey, 0, -1);
772     * 获取 list 中下标 1 到 3 的数据: cache.lrange(listKey, 1, 3);
773     * </pre>
774     */
775    @Override
776    @SuppressWarnings("rawtypes")
777    public List lrange(Object key, long start, long end) {
778
779        List<byte[]> data = jedisCluster.lrange(keyToBytes(key), start, end);
780        if (data != null) {
781            return valueListFromBytesList(data);
782        } else {
783            return new ArrayList<byte[]>(0);
784        }
785
786    }
787
788    /**
789     * 对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。
790     * 举个例子,执行命令 LTRIM list 0 2 ,表示只保留列表 list 的前三个元素,其余元素全部删除。
791     * 下标(index)参数 start 和 stop 都以 0 为底,也就是说,以 0 表示列表的第一个元素,以 1 表示列表的第二个元素,以此类推。
792     * 你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推。
793     * 当 key 不是列表类型时,返回一个错误。
794     */
795    @Override
796    public String ltrim(Object key, long start, long end) {
797
798        return jedisCluster.ltrim(keyToBytes(key), start, end);
799
800    }
801
802    /**
803     * 移除并返回列表 key 的尾元素。
804     */
805    @Override
806    @SuppressWarnings("unchecked")
807    public <T> T rpop(Object key) {
808
809        return (T) valueFromBytes(jedisCluster.rpop(keyToBytes(key)));
810
811    }
812
813    /**
814     * 命令 RPOPLPUSH 在一个原子时间内,执行以下两个动作:
815     * 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端。
816     * 将 source 弹出的元素插入到列表 destination ,作为 destination 列表的的头元素。
817     */
818    @Override
819    @SuppressWarnings("unchecked")
820    public <T> T rpoplpush(Object srcKey, Object dstKey) {
821
822        return (T) valueFromBytes(jedisCluster.rpoplpush(keyToBytes(srcKey), keyToBytes(dstKey)));
823
824    }
825
826    /**
827     * 将一个或多个值 value 插入到列表 key 的表尾(最右边)。
828     * 如果有多个 value 值,那么各个 value 值按从左到右的顺序依次插入到表尾:比如
829     * 对一个空列表 mylist 执行 RPUSH mylist a b c ,得出的结果列表为 a b c ,
830     * 等同于执行命令 RPUSH mylist a 、 RPUSH mylist b 、 RPUSH mylist c 。
831     * 如果 key 不存在,一个空列表会被创建并执行 RPUSH 操作。
832     * 当 key 存在但不是列表类型时,返回一个错误。
833     */
834    @Override
835    public Long rpush(Object key, Object... values) {
836
837        return jedisCluster.rpush(keyToBytes(key), valuesToBytesArray(values));
838
839    }
840
841    /**
842     * BLPOP 是列表的阻塞式(blocking)弹出原语。
843     * 它是 LPOP 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BLPOP 命令阻塞,直到等待超时或发现可弹出元素为止。
844     * 当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。
845     */
846    @Override
847    @SuppressWarnings("rawtypes")
848    public List blpop(Object... keys) {
849//        String[] keysStrings = new String[keys.length];
850//        for (int i = 0; i < keys.length; i++) {
851//            keysStrings[i] = keys[i].toString();
852//        }
853
854        List<byte[]> data = jedisCluster.blpop(timeout, keysToBytesArray(keys));
855        return keyValueListFromBytesList(data);
856
857    }
858
859    /**
860     * BLPOP 是列表的阻塞式(blocking)弹出原语。
861     * 它是 LPOP 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BLPOP 命令阻塞,直到等待超时或发现可弹出元素为止。
862     * 当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。
863     */
864    @Override
865    @SuppressWarnings("rawtypes")
866    public List blpop(Integer timeout, Object... keys) {
867
868        List<byte[]> data = jedisCluster.blpop(timeout, keysToBytesArray(keys));
869        return keyValueListFromBytesList(data);
870
871    }
872
873    /**
874     * BRPOP 是列表的阻塞式(blocking)弹出原语。
875     * 它是 RPOP 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BRPOP 命令阻塞,直到等待超时或发现可弹出元素为止。
876     * 当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的尾部元素。
877     * 关于阻塞操作的更多信息,请查看 BLPOP 命令, BRPOP 除了弹出元素的位置和 BLPOP 不同之外,其他表现一致。
878     */
879    @Override
880    @SuppressWarnings("rawtypes")
881    public List brpop(Object... keys) {
882
883        List<byte[]> data = jedisCluster.brpop(timeout, keysToBytesArray(keys));
884        return keyValueListFromBytesList(data);
885
886    }
887
888    /**
889     * BRPOP 是列表的阻塞式(blocking)弹出原语。
890     * 它是 RPOP 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BRPOP 命令阻塞,直到等待超时或发现可弹出元素为止。
891     * 当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的尾部元素。
892     * 关于阻塞操作的更多信息,请查看 BLPOP 命令, BRPOP 除了弹出元素的位置和 BLPOP 不同之外,其他表现一致。
893     */
894    @Override
895    @SuppressWarnings("rawtypes")
896    public List brpop(Integer timeout, Object... keys) {
897
898        List<byte[]> data = jedisCluster.brpop(timeout, keysToBytesArray(keys));
899        return keyValueListFromBytesList(data);
900
901    }
902
903    /**
904     * 使用客户端向 JbootRedis 服务器发送一个 PING ,如果服务器运作正常的话,会返回一个 PONG 。
905     * 通常用于测试与服务器的连接是否仍然生效,或者用于测量延迟值。
906     */
907    @Override
908    public String ping() {
909//        jedisCluster.getClusterNodes().get("aa").getResource().ping
910//        return jedisCluster..ping();
911
912        Map<String, ConnectionPool> nodes = jedisCluster.getClusterNodes();
913        if (nodes != null) {
914            for (ConnectionPool pool : nodes.values()) {
915                try (Connection node = pool.getResource()) {
916                    if (node.ping()) {
917                        return "PONG";
918                    }
919                }
920            }
921        }
922        return null;
923    }
924
925    /**
926     * 将一个或多个 member 元素加入到集合 key 当中,已经存在于集合的 member 元素将被忽略。
927     * 假如 key 不存在,则创建一个只包含 member 元素作成员的集合。
928     * 当 key 不是集合类型时,返回一个错误。
929     */
930    @Override
931    public Long sadd(Object key, Object... members) {
932
933        return jedisCluster.sadd(keyToBytes(key), valuesToBytesArray(members));
934
935    }
936
937    /**
938     * 返回集合 key 的基数(集合中元素的数量)。
939     */
940    @Override
941    public Long scard(Object key) {
942
943        return jedisCluster.scard(keyToBytes(key));
944
945    }
946
947    /**
948     * 移除并返回集合中的一个随机元素。
949     * 如果只想获取一个随机元素,但不想该元素从集合中被移除的话,可以使用 SRANDMEMBER 命令。
950     */
951    @Override
952    @SuppressWarnings("unchecked")
953    public <T> T spop(Object key) {
954
955        return (T) valueFromBytes(jedisCluster.spop(keyToBytes(key)));
956
957    }
958
959    /**
960     * 返回集合 key 中的所有成员。
961     * 不存在的 key 被视为空集合。
962     */
963    @Override
964    @SuppressWarnings("rawtypes")
965    public Set smembers(Object key) {
966
967        Set<byte[]> data = jedisCluster.smembers(keyToBytes(key));
968        Set<Object> result = new HashSet<Object>();
969        valueSetFromBytesSet(data, result);
970        return result;
971
972    }
973
974    /**
975     * 判断 member 元素是否集合 key 的成员。
976     */
977    @Override
978    public boolean sismember(Object key, Object member) {
979
980        return jedisCluster.sismember(keyToBytes(key), valueToBytes(member));
981
982    }
983
984    /**
985     * 返回多个集合的交集,多个集合由 keys 指定
986     */
987    @Override
988    @SuppressWarnings("rawtypes")
989    public Set sinter(Object... keys) {
990
991        Set<byte[]> data = jedisCluster.sinter(keysToBytesArray(keys));
992        Set<Object> result = new HashSet<Object>();
993        valueSetFromBytesSet(data, result);
994        return result;
995
996    }
997
998    /**
999     * 返回集合中的一个随机元素。
1000     */
1001    @Override
1002    @SuppressWarnings("unchecked")
1003    public <T> T srandmember(Object key) {
1004
1005        return (T) valueFromBytes(jedisCluster.srandmember(keyToBytes(key)));
1006
1007    }
1008
1009    /**
1010     * 返回集合中的 count 个随机元素。
1011     * 从 JbootRedis 2.6 版本开始, SRANDMEMBER 命令接受可选的 count 参数:
1012     * 如果 count 为正数,且小于集合基数,那么命令返回一个包含 count 个元素的数组,数组中的元素各不相同。
1013     * 如果 count 大于等于集合基数,那么返回整个集合。
1014     * 如果 count 为负数,那么命令返回一个数组,数组中的元素可能会重复出现多次,而数组的长度为 count 的绝对值。
1015     * 该操作和 SPOP 相似,但 SPOP 将随机元素从集合中移除并返回,而 SRANDMEMBER 则仅仅返回随机元素,而不对集合进行任何改动。
1016     */
1017    @Override
1018    @SuppressWarnings("rawtypes")
1019    public List srandmember(Object key, int count) {
1020
1021        List<byte[]> data = jedisCluster.srandmember(keyToBytes(key), count);
1022        return valueListFromBytesList(data);
1023
1024    }
1025
1026    /**
1027     * 移除集合 key 中的一个或多个 member 元素,不存在的 member 元素会被忽略。
1028     */
1029    @Override
1030    public Long srem(Object key, Object... members) {
1031
1032        return jedisCluster.srem(keyToBytes(key), valuesToBytesArray(members));
1033
1034    }
1035
1036    /**
1037     * 返回多个集合的并集,多个集合由 keys 指定
1038     * 不存在的 key 被视为空集。
1039     */
1040    @Override
1041    @SuppressWarnings("rawtypes")
1042    public Set sunion(Object... keys) {
1043
1044        Set<byte[]> data = jedisCluster.sunion(keysToBytesArray(keys));
1045        Set<Object> result = new HashSet<Object>();
1046        valueSetFromBytesSet(data, result);
1047        return result;
1048
1049    }
1050
1051    /**
1052     * 返回一个集合的全部成员,该集合是所有给定集合之间的差集。
1053     * 不存在的 key 被视为空集。
1054     */
1055    @Override
1056    @SuppressWarnings("rawtypes")
1057    public Set sdiff(Object... keys) {
1058
1059        Set<byte[]> data = jedisCluster.sdiff(keysToBytesArray(keys));
1060        Set<Object> result = new HashSet<Object>();
1061        valueSetFromBytesSet(data, result);
1062        return result;
1063
1064    }
1065
1066    /**
1067     * 将一个或多个 member 元素及其 score 值加入到有序集 key 当中。
1068     * 如果某个 member 已经是有序集的成员,那么更新这个 member 的 score 值,
1069     * 并通过重新插入这个 member 元素,来保证该 member 在正确的位置上。
1070     */
1071    @Override
1072    public Long zadd(Object key, double score, Object member) {
1073
1074        return jedisCluster.zadd(keyToBytes(key), score, valueToBytes(member));
1075
1076    }
1077
1078    @Override
1079    public Long zadd(Object key, Map<Object, Double> scoreMembers) {
1080
1081        Map<byte[], Double> para = new HashMap<byte[], Double>();
1082        for (Entry<Object, Double> e : scoreMembers.entrySet())
1083            para.put(valueToBytes(e.getKey()), e.getValue());    // valueToBytes is important
1084        return jedisCluster.zadd(keyToBytes(key), para);
1085
1086    }
1087
1088    /**
1089     * 返回有序集 key 的基数。
1090     */
1091    @Override
1092    public Long zcard(Object key) {
1093
1094        return jedisCluster.zcard(keyToBytes(key));
1095
1096    }
1097
1098    /**
1099     * 返回有序集 key 中, score 值在 min 和 max 之间(默认包括 score 值等于 min 或 max )的成员的数量。
1100     * 关于参数 min 和 max 的详细使用方法,请参考 ZRANGEBYSCORE 命令。
1101     */
1102    @Override
1103    public Long zcount(Object key, double min, double max) {
1104
1105        return jedisCluster.zcount(keyToBytes(key), min, max);
1106
1107    }
1108
1109    /**
1110     * 为有序集 key 的成员 member 的 score 值加上增量 increment 。
1111     */
1112    @Override
1113    public Double zincrby(Object key, double score, Object member) {
1114
1115        return jedisCluster.zincrby(keyToBytes(key), score, valueToBytes(member));
1116
1117    }
1118
1119    /**
1120     * 返回有序集 key 中,指定区间内的成员。
1121     * 其中成员的位置按 score 值递增(从小到大)来排序。
1122     * 具有相同 score 值的成员按字典序(lexicographical order )来排列。
1123     * 如果你需要成员按 score 值递减(从大到小)来排列,请使用 ZREVRANGE 命令。
1124     */
1125    @Override
1126    @SuppressWarnings("rawtypes")
1127    public List zrange(Object key, long start, long end) {
1128
1129        List<byte[]> data = jedisCluster.zrange(keyToBytes(key), start, end);
1130        List<Object> result = data.stream().map(d->valueFromBytes(d)).collect(Collectors.toList());    // 有序集合必须 LinkedHashSet
1131        return result;
1132
1133    }
1134
1135    /**
1136     * 返回有序集 key 中,指定区间内的成员。
1137     * 其中成员的位置按 score 值递减(从大到小)来排列。
1138     * 具有相同 score 值的成员按字典序的逆序(reverse lexicographical order)排列。
1139     * 除了成员按 score 值递减的次序排列这一点外, ZREVRANGE 命令的其他方面和 ZRANGE 命令一样。
1140     */
1141    @Override
1142    @SuppressWarnings("rawtypes")
1143    public List zrevrange(Object key, long start, long end) {
1144
1145        List<byte[]> data = jedisCluster.zrevrange(keyToBytes(key), start, end);
1146        List<Object> result = data.stream().map(d->valueFromBytes(d)).collect(Collectors.toList());    // 有序集合必须 LinkedHashSet
1147        return result;
1148
1149    }
1150
1151    /**
1152     * 返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。
1153     * 有序集成员按 score 值递增(从小到大)次序排列。
1154     */
1155    @Override
1156    @SuppressWarnings("rawtypes")
1157    public List zrangeByScore(Object key, double min, double max) {
1158
1159        List<byte[]> data = jedisCluster.zrangeByScore(keyToBytes(key), min, max);
1160        List<Object> result = data.stream().map(d->valueFromBytes(d)).collect(Collectors.toList());    // 有序集合必须 LinkedHashSet
1161        return result;
1162
1163    }
1164
1165    /**
1166     * 返回有序集 key 中成员 member 的排名。其中有序集成员按 score 值递增(从小到大)顺序排列。
1167     * 排名以 0 为底,也就是说, score 值最小的成员排名为 0 。
1168     * 使用 ZREVRANK 命令可以获得成员按 score 值递减(从大到小)排列的排名。
1169     */
1170    @Override
1171    public Long zrank(Object key, Object member) {
1172
1173        return jedisCluster.zrank(keyToBytes(key), valueToBytes(member));
1174
1175    }
1176
1177    /**
1178     * 返回有序集 key 中成员 member 的排名。其中有序集成员按 score 值递减(从大到小)排序。
1179     * 排名以 0 为底,也就是说, score 值最大的成员排名为 0 。
1180     * 使用 ZRANK 命令可以获得成员按 score 值递增(从小到大)排列的排名。
1181     */
1182    @Override
1183    public Long zrevrank(Object key, Object member) {
1184
1185        return jedisCluster.zrevrank(keyToBytes(key), valueToBytes(member));
1186
1187    }
1188
1189    /**
1190     * 移除有序集 key 中的一个或多个成员,不存在的成员将被忽略。
1191     * 当 key 存在但不是有序集类型时,返回一个错误。
1192     */
1193    @Override
1194    public Long zrem(Object key, Object... members) {
1195
1196        return jedisCluster.zrem(keyToBytes(key), valuesToBytesArray(members));
1197
1198    }
1199
1200    /**
1201     * 返回有序集 key 中,成员 member 的 score 值。
1202     * 如果 member 元素不是有序集 key 的成员,或 key 不存在,返回 nil 。
1203     */
1204    @Override
1205    public Double zscore(Object key, Object member) {
1206
1207        return jedisCluster.zscore(keyToBytes(key), valueToBytes(member));
1208
1209    }
1210
1211    /**
1212     * 发布
1213     *
1214     * @param channel
1215     * @param message
1216     */
1217    @Override
1218    public void publish(String channel, String message) {
1219
1220        jedisCluster.publish(channel, message);
1221
1222    }
1223
1224    /**
1225     * 发布
1226     *
1227     * @param channel
1228     * @param message
1229     */
1230    @Override
1231    public void publish(byte[] channel, byte[] message) {
1232        jedisCluster.publish(channel, message);
1233    }
1234
1235
1236    /**
1237     * 订阅
1238     *
1239     * @param listener
1240     * @param channels
1241     */
1242    @Override
1243    public void subscribe(JedisPubSub listener, final String... channels) {
1244        /**
1245         * https://github.com/xetorthio/jedis/wiki/AdvancedUsage
1246         * Note that subscribe is a blocking operation because it will poll JbootRedis for responses on the thread that calls subscribe.
1247         * A single JedisPubSub instance can be used to subscribe to multiple channels.
1248         * You can call subscribe or psubscribe on an existing JedisPubSub instance to change your subscriptions.
1249         */
1250        new Thread("jboot-redisCluster-subscribe-JedisPubSub") {
1251            @Override
1252            public void run() {
1253                while (true) {
1254                    //订阅线程断开连接,需要进行重连
1255                    try {
1256                        jedisCluster.subscribe(listener, channels);
1257                        LOG.warn("Disconnect to redis channel in subscribe JedisPubSub!");
1258                        break;
1259                    } catch (JedisConnectionException e) {
1260                        LOG.error("failed connect to redis, reconnect it.", e);
1261                        QuietlyUtil.sleepQuietly(1000);
1262                    }
1263                }
1264            }
1265        }.start();
1266    }
1267
1268    /**
1269     * 订阅
1270     *
1271     * @param binaryListener
1272     * @param channels
1273     */
1274    @Override
1275    public void subscribe(BinaryJedisPubSub binaryListener, final byte[]... channels) {
1276        /**
1277         * https://github.com/xetorthio/jedis/wiki/AdvancedUsage
1278         * Note that subscribe is a blocking operation because it will poll JbootRedis for responses on the thread that calls subscribe.
1279         * A single JedisPubSub instance can be used to subscribe to multiple channels.
1280         * You can call subscribe or psubscribe on an existing JedisPubSub instance to change your subscriptions.
1281         */
1282        new Thread("jboot-redisCluster-subscribe-BinaryJedisPubSub") {
1283            @Override
1284            public void run() {
1285                while (!isClose()) {
1286                    //订阅线程断开连接,需要进行重连
1287                    try {
1288                        jedisCluster.subscribe(binaryListener, channels);
1289                        LOG.warn("Disconnect to redis channel in subscribe BinaryJedisPubSub!");
1290                        break;
1291                    } catch (Throwable e) {
1292                        LOG.error("failed connect to redis, reconnect it.", e);
1293                        QuietlyUtil.sleepQuietly(1000);
1294                    }
1295                }
1296            }
1297        }.start();
1298    }
1299
1300
1301    @Override
1302    public RedisScanResult<String> scan(String pattern, String cursor, int scanCount) {
1303        ScanParams params = new ScanParams();
1304        params.match(pattern).count(scanCount);
1305        ScanResult<String> scanResult = jedisCluster.scan(cursor, params);
1306        return new RedisScanResult<>(scanResult.getCursor(), scanResult.getResult());
1307    }
1308
1309    @Override
1310    public Object eval(String script, int keyCount, String... params) {
1311        return jedisCluster.eval(script, keyCount, params);
1312    }
1313
1314    public JedisCluster getJedisCluster() {
1315        return jedisCluster;
1316    }
1317
1318}
1319
1320
1321
1322
1323
1324