001/** 002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com). 003 * <p> 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * <p> 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * <p> 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package io.jboot.support.redis.jedis; 017 018import com.jfinal.log.Log; 019import io.jboot.exception.JbootException; 020import io.jboot.support.redis.JbootRedisBase; 021import io.jboot.support.redis.JbootRedisConfig; 022import io.jboot.support.redis.RedisScanResult; 023import io.jboot.utils.QuietlyUtil; 024import io.jboot.utils.StrUtil; 025import org.apache.commons.pool2.impl.GenericObjectPoolConfig; 026import redis.clients.jedis.*; 027import redis.clients.jedis.exceptions.JedisConnectionException; 028import redis.clients.jedis.params.ScanParams; 029import redis.clients.jedis.resps.ScanResult; 030 031import java.util.*; 032import java.util.Map.Entry; 033import java.util.stream.Collectors; 034 035/** 036 * 参考: com.jfinal.plugin.redis 037 * JbootRedis 命令文档: http://redisdoc.com/ 038 */ 039public class JbootJedisClusterImpl extends JbootRedisBase { 040 041 protected JedisCluster jedisCluster; 042 private int timeout = 2000; 043 private int maxAttempts = 5; 044 045 static final Log LOG = Log.getLog(JbootJedisClusterImpl.class); 046 047 048 public JbootJedisClusterImpl(JbootRedisConfig config) { 049 050 super(config); 051 052 Integer timeout = config.getTimeout(); 053 String password = config.getPassword(); 054 Integer maxAttempts = config.getMaxAttempts(); 055 056 if (timeout != null) { 057 this.timeout = timeout; 058 } 059 if (maxAttempts == null) { 060 maxAttempts = this.maxAttempts; 061 } 062 063 GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); 064 065 if (StrUtil.isNotBlank(config.getTestWhileIdle())) { 066 poolConfig.setTestWhileIdle(config.getTestWhileIdle()); 067 } 068 069 if (StrUtil.isNotBlank(config.getTestOnBorrow())) { 070 poolConfig.setTestOnBorrow(config.getTestOnBorrow()); 071 } 072 073 if (StrUtil.isNotBlank(config.getTestOnCreate())) { 074 poolConfig.setTestOnCreate(config.getTestOnCreate()); 075 } 076 077 if (StrUtil.isNotBlank(config.getTestOnReturn())) { 078 poolConfig.setTestOnReturn(config.getTestOnReturn()); 079 } 080 081 if (StrUtil.isNotBlank(config.getMinEvictableIdleTimeMillis())) { 082 poolConfig.setMinEvictableIdleTimeMillis(config.getMinEvictableIdleTimeMillis()); 083 } 084 085 if (StrUtil.isNotBlank(config.getTimeBetweenEvictionRunsMillis())) { 086 poolConfig.setTimeBetweenEvictionRunsMillis(config.getTimeBetweenEvictionRunsMillis()); 087 } 088 089 if (StrUtil.isNotBlank(config.getNumTestsPerEvictionRun())) { 090 poolConfig.setNumTestsPerEvictionRun(config.getNumTestsPerEvictionRun()); 091 } 092 093 if (StrUtil.isNotBlank(config.getMaxTotal())) { 094 poolConfig.setMaxTotal(config.getMaxTotal()); 095 } 096 097 if (StrUtil.isNotBlank(config.getMaxIdle())) { 098 poolConfig.setMaxIdle(config.getMaxIdle()); 099 } 100 101 if (StrUtil.isNotBlank(config.getMinIdle())) { 102 poolConfig.setMinIdle(config.getMinIdle()); 103 } 104 105 if (StrUtil.isNotBlank(config.getMaxWaitMillis())) { 106 poolConfig.setMaxWaitMillis(config.getMaxWaitMillis()); 107 } 108 this.jedisCluster = newJedisCluster(config.getHostAndPorts(), timeout, maxAttempts, password, poolConfig); 109 } 110 111 public static JedisCluster newJedisCluster(Set<HostAndPort> haps, Integer timeout, 112 Integer maxAttempts, String password, GenericObjectPoolConfig poolConfig) { 113 JedisCluster jedisCluster; 114 115 if (timeout != null && maxAttempts != null && password != null && poolConfig != null) { 116 jedisCluster = new JedisCluster(haps, timeout, timeout, maxAttempts, password, poolConfig); 117 } else if (timeout != null && maxAttempts != null && poolConfig != null) { 118 jedisCluster = new JedisCluster(haps, timeout, maxAttempts, poolConfig); 119 } else if (timeout != null && maxAttempts != null) { 120 jedisCluster = new JedisCluster(haps, timeout, maxAttempts); 121 } else if (timeout != null && poolConfig != null) { 122 jedisCluster = new JedisCluster(haps, timeout, poolConfig); 123 } else if (timeout != null) { 124 jedisCluster = new JedisCluster(haps, timeout); 125 } else { 126 jedisCluster = new JedisCluster(haps); 127 } 128 return jedisCluster; 129 } 130 131 public JbootJedisClusterImpl(JedisCluster jedisCluster) { 132 super(null); 133 this.jedisCluster = jedisCluster; 134 } 135 136 /** 137 * 存放 key value 对到 redis 138 * 如果 key 已经持有其他值, SET 就覆写旧值,无视类型。 139 * 对于某个原本带有生存时间(TTL)的键来说, 当 SET 命令成功在这个键上执行时, 这个键原有的 TTL 将被清除。 140 */ 141 @Override 142 public String set(Object key, Object value) { 143 return jedisCluster.set(keyToBytes(key), valueToBytes(value)); 144 } 145 146 @Override 147 public Long setnx(Object key, Object value) { 148 return jedisCluster.setnx(keyToBytes(key), valueToBytes(value)); 149 } 150 151 /** 152 * 存放 key value 对到 redis 153 * 如果 key 已经持有其他值, SET 就覆写旧值,无视类型。 154 * 此方法用了修改 incr 等的值 155 */ 156 @Override 157 public String setWithoutSerialize(Object key, Object value) { 158 return jedisCluster.set(keyToBytes(key), value.toString().getBytes()); 159 } 160 161 162 /** 163 * 存放 key value 对到 redis,并将 key 的生存时间设为 seconds (以秒为单位)。 164 * 如果 key 已经存在, SETEX 命令将覆写旧值。 165 */ 166 @Override 167 public String setex(Object key, int seconds, Object value) { 168 169 return jedisCluster.setex(keyToBytes(key), seconds, valueToBytes(value)); 170 171 } 172 173 /** 174 * 返回 key 所关联的 value 值 175 * 如果 key 不存在那么返回特殊值 nil 。 176 */ 177 @Override 178 @SuppressWarnings("unchecked") 179 public <T> T get(Object key) { 180 181 return (T) valueFromBytes(jedisCluster.get(keyToBytes(key))); 182 183 } 184 185 @Override 186 public String getWithoutSerialize(Object key) { 187 byte[] bytes = jedisCluster.get(keyToBytes(key)); 188 if (bytes == null || bytes.length == 0) { 189 return null; 190 } 191 return new String(jedisCluster.get(keyToBytes(key))); 192 } 193 194 /** 195 * 删除给定的一个 key 196 * 不存在的 key 会被忽略。 197 */ 198 @Override 199 public Long del(Object key) { 200 return jedisCluster.del(keyToBytes(key)); 201 } 202 203 /** 204 * 删除给定的多个 key 205 * 不存在的 key 会被忽略。 206 */ 207 @Override 208 public Long del(Object... keys) { 209 210 return jedisCluster.del(keysToBytesArray(keys)); 211 212 } 213 214 /** 215 * 查找所有符合给定模式 pattern 的 key 。 216 * KEYS * 匹配数据库中所有 key 。 217 * KEYS h?llo 匹配 hello , hallo 和 hxllo 等。 218 * KEYS h*llo 匹配 hllo 和 heeeeello 等。 219 * KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo 。 220 * 特殊符号用 \ 隔开 221 */ 222 @Override 223 public Set<String> keys(String pattern) { 224 HashSet<String> keys = new HashSet<>(); 225 Map<String, ConnectionPool> clusterNodes = jedisCluster.getClusterNodes(); 226 for (String k : clusterNodes.keySet()) { 227 ConnectionPool cp = clusterNodes.get(k); 228 Connection conn = cp.getResource(); 229 try { 230 keys.addAll(conn.executeCommand(new ClusterCommandObjects().keys(pattern))); 231 } catch (Exception e) { 232 LOG.error(e.toString(), e); 233 } finally { 234 conn.close(); 235 cp.close(); //用完一定要close这个链接!!! 236 } 237 } 238 return keys; 239 } 240 241 242 /** 243 * 同时设置一个或多个 key-value 对。 244 * 如果某个给定 key 已经存在,那么 MSET 会用新值覆盖原来的旧值,如果这不是你所希望的效果,请考虑使用 MSETNX 命令:它只会在所有给定 key 都不存在的情况下进行设置操作。 245 * MSET 是一个原子性(atomic)操作,所有给定 key 都会在同一时间内被设置,某些给定 key 被更新而另一些给定 key 没有改变的情况,不可能发生。 246 * <pre> 247 * 例子: 248 * Cache cache = RedisKit.use(); // 使用 JbootRedis 的 cache 249 * cache.mset("k1", "v1", "k2", "v2"); // 放入多个 key value 键值对 250 * List list = cache.mget("k1", "k2"); // 利用多个键值得到上面代码放入的值 251 * </pre> 252 */ 253 @Override 254 public String mset(Object... keysValues) { 255 if (keysValues.length % 2 != 0) 256 throw new IllegalArgumentException("wrong number of arguments for met, keysValues length can not be odd"); 257 258 byte[][] kv = new byte[keysValues.length][]; 259 for (int i = 0; i < keysValues.length; i++) { 260 if (i % 2 == 0) 261 kv[i] = keyToBytes(keysValues[i]); 262 else 263 kv[i] = valueToBytes(keysValues[i]); 264 } 265 return jedisCluster.mset(kv); 266 267 } 268 269 /** 270 * 返回所有(一个或多个)给定 key 的值。 271 * 如果给定的 key 里面,有某个 key 不存在,那么这个 key 返回特殊值 nil 。因此,该命令永不失败。 272 */ 273 @Override 274 @SuppressWarnings("rawtypes") 275 public List mget(Object... keys) { 276 277 byte[][] keysBytesArray = keysToBytesArray(keys); 278 List<byte[]> data = jedisCluster.mget(keysBytesArray); 279 return valueListFromBytesList(data); 280 281 } 282 283 /** 284 * 将 key 中储存的数字值减一。 285 * 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 DECR 操作。 286 * 如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。 287 * 本操作的值限制在 64 位(bit)有符号数字表示之内。 288 * 关于递增(increment) / 递减(decrement)操作的更多信息,请参见 INCR 命令。 289 */ 290 @Override 291 public Long decr(Object key) { 292 293 return jedisCluster.decr(keyToBytes(key)); 294 295 } 296 297 /** 298 * 将 key 所储存的值减去减量 decrement 。 299 * 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 DECRBY 操作。 300 * 如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。 301 * 本操作的值限制在 64 位(bit)有符号数字表示之内。 302 * 关于更多递增(increment) / 递减(decrement)操作的更多信息,请参见 INCR 命令。 303 */ 304 @Override 305 public Long decrBy(Object key, long longValue) { 306 307 return jedisCluster.decrBy(keyToBytes(key), longValue); 308 309 } 310 311 /** 312 * 将 key 中储存的数字值增一。 313 * 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCR 操作。 314 * 如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。 315 * 本操作的值限制在 64 位(bit)有符号数字表示之内。 316 */ 317 @Override 318 public Long incr(Object key) { 319 320 return jedisCluster.incr(keyToBytes(key)); 321 322 } 323 324 /** 325 * 将 key 所储存的值加上增量 increment 。 326 * 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCRBY 命令。 327 * 如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。 328 * 本操作的值限制在 64 位(bit)有符号数字表示之内。 329 * 关于递增(increment) / 递减(decrement)操作的更多信息,参见 INCR 命令。 330 */ 331 @Override 332 public Long incrBy(Object key, long longValue) { 333 return jedisCluster.incrBy(keyToBytes(key), longValue); 334 335 } 336 337 /** 338 * 检查给定 key 是否存在。 339 */ 340 @Override 341 public boolean exists(Object key) { 342 343 return jedisCluster.exists(keyToBytes(key)); 344 345 } 346 347 /** 348 * 从当前数据库中随机返回(不删除)一个 key 。 349 */ 350 @Override 351 public String randomKey() { 352 353 throw new JbootException("not support randomKey commmand in redis cluster."); 354 355 } 356 357 /** 358 * 将 key 改名为 newkey 。 359 * 当 key 和 newkey 相同,或者 key 不存在时,返回一个错误。 360 * 当 newkey 已经存在时, RENAME 命令将覆盖旧值。 361 */ 362 @Override 363 public String rename(Object oldkey, Object newkey) { 364 365 return jedisCluster.rename(keyToBytes(oldkey), keyToBytes(newkey)); 366 367 } 368 369 /** 370 * 将当前数据库的 key 移动到给定的数据库 db 当中。 371 * 如果当前数据库(源数据库)和给定数据库(目标数据库)有相同名字的给定 key ,或者 key 不存在于当前数据库,那么 MOVE 没有任何效果。 372 * 因此,也可以利用这一特性,将 MOVE 当作锁(locking)原语(primitive)。 373 */ 374 @Override 375 public Long move(Object key, int dbIndex) { 376 377// return jedisCluster.move(keyToBytes(key), dbIndex); 378 throw new JbootException("not support move commmand in redis cluster."); 379 380 } 381 382 /** 383 * 将 key 原子性地从当前实例传送到目标实例的指定数据库上,一旦传送成功, key 保证会出现在目标实例上,而当前实例上的 key 会被删除。 384 */ 385 @Override 386 public String migrate(String host, int port, Object key, int destinationDb, int timeout) { 387 388 throw new JbootException("not support migrate commmand in redis cluster."); 389 390 } 391 392 /** 393 * 切换到指定的数据库,数据库索引号 index 用数字值指定,以 0 作为起始索引值。 394 * 默认使用 0 号数据库。 395 * 注意:在 Jedis 对象被关闭时,数据库又会重新被设置为初始值,所以本方法 select(...) 396 * 正常工作需要使用如下方式之一: 397 * 1:使用 RedisInterceptor,在本线程内共享同一个 Jedis 对象 398 * 2:使用 JbootRedis.call(ICallback) 进行操作 399 * 3:自行获取 Jedis 对象进行操作 400 */ 401 @Override 402 public String select(int databaseIndex) { 403 404// return jedisCluster.select(databaseIndex); 405 throw new IllegalStateException("Redis Cluster does not support multiple databases like the stand alone version of Redis, " + 406 "there is just database 0, and SELECT is not allowed."); 407 } 408 409 /** 410 * 为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除。 411 * 在 JbootRedis 中,带有生存时间的 key 被称为『易失的』(volatile)。 412 */ 413 @Override 414 public Long expire(Object key, int seconds) { 415 416 return jedisCluster.expire(keyToBytes(key), seconds); 417 418 } 419 420 /** 421 * EXPIREAT 的作用和 EXPIRE 类似,都用于为 key 设置生存时间。不同在于 EXPIREAT 命令接受的时间参数是 UNIX 时间戳(unix timestamp)。 422 */ 423 @Override 424 public Long expireAt(Object key, long unixTime) { 425 426 return jedisCluster.expireAt(keyToBytes(key), unixTime); 427 428 } 429 430 /** 431 * 这个命令和 EXPIRE 命令的作用类似,但是它以毫秒为单位设置 key 的生存时间,而不像 EXPIRE 命令那样,以秒为单位。 432 */ 433 @Override 434 public Long pexpire(Object key, long milliseconds) { 435 436 return jedisCluster.pexpire(keyToBytes(key), milliseconds); 437 438 } 439 440 /** 441 * 这个命令和 EXPIREAT 命令类似,但它以毫秒为单位设置 key 的过期 unix 时间戳,而不是像 EXPIREAT 那样,以秒为单位。 442 */ 443 @Override 444 public Long pexpireAt(Object key, long millisecondsTimestamp) { 445 446 return jedisCluster.pexpireAt(keyToBytes(key), millisecondsTimestamp); 447 448 } 449 450 /** 451 * 将给定 key 的值设为 value ,并返回 key 的旧值(old value)。 452 * 当 key 存在但不是字符串类型时,返回一个错误。 453 */ 454 @Override 455 @SuppressWarnings("unchecked") 456 public <T> T getSet(Object key, Object value) { 457 458 return (T) valueFromBytes(jedisCluster.getSet(keyToBytes(key), valueToBytes(value))); 459 460 } 461 462 /** 463 * 移除给定 key 的生存时间,将这个 key 从『易失的』(带生存时间 key )转换成『持久的』(一个不带生存时间、永不过期的 key )。 464 */ 465 @Override 466 public Long persist(Object key) { 467 468 return jedisCluster.persist(keyToBytes(key)); 469 470 } 471 472 /** 473 * 返回 key 所储存的值的类型。 474 */ 475 @Override 476 public String type(Object key) { 477 478 return jedisCluster.type(keyToBytes(key)); 479 480 } 481 482 /** 483 * 以秒为单位,返回给定 key 的剩余生存时间(TTL, time to live)。 484 */ 485 @Override 486 public Long ttl(Object key) { 487 488 return jedisCluster.ttl(keyToBytes(key)); 489 490 } 491 492 /** 493 * 这个命令类似于 TTL 命令,但它以毫秒为单位返回 key 的剩余生存时间,而不是像 TTL 命令那样,以秒为单位。 494 */ 495 @Override 496 public Long pttl(Object key) { 497 498 return jedisCluster.pttl(key.toString()); 499 500 } 501 502 /** 503 * 对象被引用的数量 504 */ 505 @Override 506 public Long objectRefcount(Object key) { 507 508// return jedisCluster.objectRefcount(keyToBytes(key)); 509 throw new JbootException("not support move objectRefcount in redis cluster."); 510 } 511 512 /** 513 * 对象没有被访问的空闲时间 514 */ 515 @Override 516 public Long objectIdletime(Object key) { 517 518// return jedisCluster.objectIdletime(keyToBytes(key)); 519 throw new JbootException("not support move objectIdletime in redis cluster."); 520 521 } 522 523 /** 524 * 将哈希表 key 中的域 field 的值设为 value 。 525 * 如果 key 不存在,一个新的哈希表被创建并进行 HSET 操作。 526 * 如果域 field 已经存在于哈希表中,旧值将被覆盖。 527 */ 528 @Override 529 public Long hset(Object key, Object field, Object value) { 530 531 return jedisCluster.hset(keyToBytes(key), valueToBytes(field), valueToBytes(value)); 532 533 } 534 535 /** 536 * 同时将多个 field-value (域-值)对设置到哈希表 key 中。 537 * 此命令会覆盖哈希表中已存在的域。 538 * 如果 key 不存在,一个空哈希表被创建并执行 HMSET 操作。 539 */ 540 @Override 541 public String hmset(Object key, Map<Object, Object> hash) { 542 543 Map<byte[], byte[]> para = new HashMap<byte[], byte[]>(); 544 for (Entry<Object, Object> e : hash.entrySet()) 545 para.put(valueToBytes(e.getKey()), valueToBytes(e.getValue())); 546 return jedisCluster.hmset(keyToBytes(key), para); 547 548 } 549 550 /** 551 * 返回哈希表 key 中给定域 field 的值。 552 */ 553 @Override 554 @SuppressWarnings("unchecked") 555 public <T> T hget(Object key, Object field) { 556 557 return (T) valueFromBytes(jedisCluster.hget(keyToBytes(key), valueToBytes(field))); 558 559 } 560 561 /** 562 * 返回哈希表 key 中,一个或多个给定域的值。 563 * 如果给定的域不存在于哈希表,那么返回一个 nil 值。 564 * 因为不存在的 key 被当作一个空哈希表来处理,所以对一个不存在的 key 进行 HMGET 操作将返回一个只带有 nil 值的表。 565 */ 566 @Override 567 @SuppressWarnings("rawtypes") 568 public List hmget(Object key, Object... fields) { 569 570 List<byte[]> data = jedisCluster.hmget(keyToBytes(key), valuesToBytesArray(fields)); 571 return valueListFromBytesList(data); 572 573 } 574 575 /** 576 * 删除哈希表 key 中的一个或多个指定域,不存在的域将被忽略。 577 */ 578 @Override 579 public Long hdel(Object key, Object... fields) { 580 581 return jedisCluster.hdel(keyToBytes(key), valuesToBytesArray(fields)); 582 583 } 584 585 /** 586 * 查看哈希表 key 中,给定域 field 是否存在。 587 */ 588 @Override 589 public boolean hexists(Object key, Object field) { 590 591 return jedisCluster.hexists(keyToBytes(key), valueToBytes(field)); 592 593 } 594 595 /** 596 * 返回哈希表 key 中,所有的域和值。 597 * 在返回值里,紧跟每个域名(field name)之后是域的值(value),所以返回值的长度是哈希表大小的两倍。 598 */ 599 @Override 600 @SuppressWarnings("rawtypes") 601 public Map hgetAll(Object key) { 602 603 Map<byte[], byte[]> data = jedisCluster.hgetAll(keyToBytes(key)); 604 Map<Object, Object> result = new HashMap<Object, Object>(); 605 for (Entry<byte[], byte[]> e : data.entrySet()) 606 result.put(valueFromBytes(e.getKey()), valueFromBytes(e.getValue())); 607 return result; 608 609 } 610 611 /** 612 * 返回哈希表 key 中所有域的值。 613 */ 614 @Override 615 @SuppressWarnings("rawtypes") 616 public List hvals(Object key) { 617 618 Collection<byte[]> data = jedisCluster.hvals(keyToBytes(key)); 619 return valueListFromBytesList(data); 620 621 } 622 623 /** 624 * 返回哈希表 key 中的所有域。 625 * 底层实现此方法取名为 hfields 更为合适,在此仅为与底层保持一致 626 */ 627 @Override 628 public Set<Object> hkeys(Object key) { 629 630 Set<byte[]> fieldSet = jedisCluster.hkeys(keyToBytes(key)); 631 Set<Object> result = new HashSet<Object>(); 632 fieldSetFromBytesSet(fieldSet, result); 633 return result; 634 635 } 636 637 /** 638 * 返回哈希表 key 中域的数量。 639 */ 640 @Override 641 public Long hlen(Object key) { 642 643 return jedisCluster.hlen(keyToBytes(key)); 644 645 } 646 647 /** 648 * 为哈希表 key 中的域 field 的值加上增量 increment 。 649 * 增量也可以为负数,相当于对给定域进行减法操作。 650 * 如果 key 不存在,一个新的哈希表被创建并执行 HINCRBY 命令。 651 * 如果域 field 不存在,那么在执行命令前,域的值被初始化为 0 。 652 * 对一个储存字符串值的域 field 执行 HINCRBY 命令将造成一个错误。 653 * 本操作的值被限制在 64 位(bit)有符号数字表示之内。 654 */ 655 @Override 656 public Long hincrBy(Object key, Object field, long value) { 657 658 return jedisCluster.hincrBy(keyToBytes(key), valueToBytes(field), value); 659 660 } 661 662 /** 663 * 为哈希表 key 中的域 field 加上浮点数增量 increment 。 664 * 如果哈希表中没有域 field ,那么 HINCRBYFLOAT 会先将域 field 的值设为 0 ,然后再执行加法操作。 665 * 如果键 key 不存在,那么 HINCRBYFLOAT 会先创建一个哈希表,再创建域 field ,最后再执行加法操作。 666 * 当以下任意一个条件发生时,返回一个错误: 667 * 1:域 field 的值不是字符串类型(因为 redis 中的数字和浮点数都以字符串的形式保存,所以它们都属于字符串类型) 668 * 2:域 field 当前的值或给定的增量 increment 不能解释(parse)为双精度浮点数(double precision floating point number) 669 * HINCRBYFLOAT 命令的详细功能和 INCRBYFLOAT 命令类似,请查看 INCRBYFLOAT 命令获取更多相关信息。 670 */ 671 @Override 672 public Double hincrByFloat(Object key, Object field, double value) { 673 674 return jedisCluster.hincrByFloat(keyToBytes(key), valueToBytes(field), value); 675 676 } 677 678 /** 679 * 返回列表 key 中,下标为 index 的元素。 680 * 下标(index)参数 start 和 stop 都以 0 为底,也就是说,以 0 表示列表的第一个元素,以 1 表示列表的第二个元素,以此类推。 681 * 你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推。 682 * 如果 key 不是列表类型,返回一个错误。 683 */ 684 @Override 685 @SuppressWarnings("unchecked") 686 687 /** 688 * 返回列表 key 中,下标为 index 的元素。 689 * 下标(index)参数 start 和 stop 都以 0 为底,也就是说,以 0 表示列表的第一个元素, 690 * 以 1 表示列表的第二个元素,以此类推。 691 * 你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推。 692 * 如果 key 不是列表类型,返回一个错误。 693 */ 694 public <T> T lindex(Object key, long index) { 695 696 return (T) valueFromBytes(jedisCluster.lindex(keyToBytes(key), index)); 697 698 } 699 700 701 /** 702 * 返回列表 key 的长度。 703 * 如果 key 不存在,则 key 被解释为一个空列表,返回 0 . 704 * 如果 key 不是列表类型,返回一个错误。 705 */ 706 @Override 707 public Long llen(Object key) { 708 709 return jedisCluster.llen(keyToBytes(key)); 710 711 } 712 713 /** 714 * 移除并返回列表 key 的头元素。 715 */ 716 @Override 717 @SuppressWarnings("unchecked") 718 public <T> T lpop(Object key) { 719 720 return (T) valueFromBytes(jedisCluster.lpop(keyToBytes(key))); 721 722 } 723 724 /** 725 * 将一个或多个值 value 插入到列表 key 的表头 726 * 如果有多个 value 值,那么各个 value 值按从左到右的顺序依次插入到表头: 比如说, 727 * 对空列表 mylist 执行命令 LPUSH mylist a b c ,列表的值将是 c b a , 728 * 这等同于原子性地执行 LPUSH mylist a 、 LPUSH mylist b 和 LPUSH mylist c 三个命令。 729 * 如果 key 不存在,一个空列表会被创建并执行 LPUSH 操作。 730 * 当 key 存在但不是列表类型时,返回一个错误。 731 */ 732 @Override 733 public Long lpush(Object key, Object... values) { 734 735 return jedisCluster.lpush(keyToBytes(key), valuesToBytesArray(values)); 736 737 } 738 739 /** 740 * 将列表 key 下标为 index 的元素的值设置为 value 。 741 * 当 index 参数超出范围,或对一个空列表( key 不存在)进行 LSET 时,返回一个错误。 742 * 关于列表下标的更多信息,请参考 LINDEX 命令。 743 */ 744 @Override 745 public String lset(Object key, long index, Object value) { 746 747 return jedisCluster.lset(keyToBytes(key), index, valueToBytes(value)); 748 749 } 750 751 /** 752 * 根据参数 count 的值,移除列表中与参数 value 相等的元素。 753 * count 的值可以是以下几种: 754 * count 大于 0 : 从表头开始向表尾搜索,移除与 value 相等的元素,数量为 count 。 755 * count 小于 0 : 从表尾开始向表头搜索,移除与 value 相等的元素,数量为 count 的绝对值。 756 * count 等于 0 : 移除表中所有与 value 相等的值。 757 */ 758 @Override 759 public Long lrem(Object key, long count, Object value) { 760 761 return jedisCluster.lrem(keyToBytes(key), count, valueToBytes(value)); 762 763 } 764 765 /** 766 * 返回列表 key 中指定区间内的元素,区间以偏移量 start 和 stop 指定。 767 * 下标(index)参数 start 和 stop 都以 0 为底,也就是说,以 0 表示列表的第一个元素,以 1 表示列表的第二个元素,以此类推。 768 * 你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推。 769 * <pre> 770 * 例子: 771 * 获取 list 中所有数据:cache.lrange(listKey, 0, -1); 772 * 获取 list 中下标 1 到 3 的数据: cache.lrange(listKey, 1, 3); 773 * </pre> 774 */ 775 @Override 776 @SuppressWarnings("rawtypes") 777 public List lrange(Object key, long start, long end) { 778 779 List<byte[]> data = jedisCluster.lrange(keyToBytes(key), start, end); 780 if (data != null) { 781 return valueListFromBytesList(data); 782 } else { 783 return new ArrayList<byte[]>(0); 784 } 785 786 } 787 788 /** 789 * 对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。 790 * 举个例子,执行命令 LTRIM list 0 2 ,表示只保留列表 list 的前三个元素,其余元素全部删除。 791 * 下标(index)参数 start 和 stop 都以 0 为底,也就是说,以 0 表示列表的第一个元素,以 1 表示列表的第二个元素,以此类推。 792 * 你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推。 793 * 当 key 不是列表类型时,返回一个错误。 794 */ 795 @Override 796 public String ltrim(Object key, long start, long end) { 797 798 return jedisCluster.ltrim(keyToBytes(key), start, end); 799 800 } 801 802 /** 803 * 移除并返回列表 key 的尾元素。 804 */ 805 @Override 806 @SuppressWarnings("unchecked") 807 public <T> T rpop(Object key) { 808 809 return (T) valueFromBytes(jedisCluster.rpop(keyToBytes(key))); 810 811 } 812 813 /** 814 * 命令 RPOPLPUSH 在一个原子时间内,执行以下两个动作: 815 * 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端。 816 * 将 source 弹出的元素插入到列表 destination ,作为 destination 列表的的头元素。 817 */ 818 @Override 819 @SuppressWarnings("unchecked") 820 public <T> T rpoplpush(Object srcKey, Object dstKey) { 821 822 return (T) valueFromBytes(jedisCluster.rpoplpush(keyToBytes(srcKey), keyToBytes(dstKey))); 823 824 } 825 826 /** 827 * 将一个或多个值 value 插入到列表 key 的表尾(最右边)。 828 * 如果有多个 value 值,那么各个 value 值按从左到右的顺序依次插入到表尾:比如 829 * 对一个空列表 mylist 执行 RPUSH mylist a b c ,得出的结果列表为 a b c , 830 * 等同于执行命令 RPUSH mylist a 、 RPUSH mylist b 、 RPUSH mylist c 。 831 * 如果 key 不存在,一个空列表会被创建并执行 RPUSH 操作。 832 * 当 key 存在但不是列表类型时,返回一个错误。 833 */ 834 @Override 835 public Long rpush(Object key, Object... values) { 836 837 return jedisCluster.rpush(keyToBytes(key), valuesToBytesArray(values)); 838 839 } 840 841 /** 842 * BLPOP 是列表的阻塞式(blocking)弹出原语。 843 * 它是 LPOP 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BLPOP 命令阻塞,直到等待超时或发现可弹出元素为止。 844 * 当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。 845 */ 846 @Override 847 @SuppressWarnings("rawtypes") 848 public List blpop(Object... keys) { 849// String[] keysStrings = new String[keys.length]; 850// for (int i = 0; i < keys.length; i++) { 851// keysStrings[i] = keys[i].toString(); 852// } 853 854 List<byte[]> data = jedisCluster.blpop(timeout, keysToBytesArray(keys)); 855 return keyValueListFromBytesList(data); 856 857 } 858 859 /** 860 * BLPOP 是列表的阻塞式(blocking)弹出原语。 861 * 它是 LPOP 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BLPOP 命令阻塞,直到等待超时或发现可弹出元素为止。 862 * 当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。 863 */ 864 @Override 865 @SuppressWarnings("rawtypes") 866 public List blpop(Integer timeout, Object... keys) { 867 868 List<byte[]> data = jedisCluster.blpop(timeout, keysToBytesArray(keys)); 869 return keyValueListFromBytesList(data); 870 871 } 872 873 /** 874 * BRPOP 是列表的阻塞式(blocking)弹出原语。 875 * 它是 RPOP 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BRPOP 命令阻塞,直到等待超时或发现可弹出元素为止。 876 * 当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的尾部元素。 877 * 关于阻塞操作的更多信息,请查看 BLPOP 命令, BRPOP 除了弹出元素的位置和 BLPOP 不同之外,其他表现一致。 878 */ 879 @Override 880 @SuppressWarnings("rawtypes") 881 public List brpop(Object... keys) { 882 883 List<byte[]> data = jedisCluster.brpop(timeout, keysToBytesArray(keys)); 884 return keyValueListFromBytesList(data); 885 886 } 887 888 /** 889 * BRPOP 是列表的阻塞式(blocking)弹出原语。 890 * 它是 RPOP 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BRPOP 命令阻塞,直到等待超时或发现可弹出元素为止。 891 * 当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的尾部元素。 892 * 关于阻塞操作的更多信息,请查看 BLPOP 命令, BRPOP 除了弹出元素的位置和 BLPOP 不同之外,其他表现一致。 893 */ 894 @Override 895 @SuppressWarnings("rawtypes") 896 public List brpop(Integer timeout, Object... keys) { 897 898 List<byte[]> data = jedisCluster.brpop(timeout, keysToBytesArray(keys)); 899 return keyValueListFromBytesList(data); 900 901 } 902 903 /** 904 * 使用客户端向 JbootRedis 服务器发送一个 PING ,如果服务器运作正常的话,会返回一个 PONG 。 905 * 通常用于测试与服务器的连接是否仍然生效,或者用于测量延迟值。 906 */ 907 @Override 908 public String ping() { 909// jedisCluster.getClusterNodes().get("aa").getResource().ping 910// return jedisCluster..ping(); 911 912 Map<String, ConnectionPool> nodes = jedisCluster.getClusterNodes(); 913 if (nodes != null) { 914 for (ConnectionPool pool : nodes.values()) { 915 try (Connection node = pool.getResource()) { 916 if (node.ping()) { 917 return "PONG"; 918 } 919 } 920 } 921 } 922 return null; 923 } 924 925 /** 926 * 将一个或多个 member 元素加入到集合 key 当中,已经存在于集合的 member 元素将被忽略。 927 * 假如 key 不存在,则创建一个只包含 member 元素作成员的集合。 928 * 当 key 不是集合类型时,返回一个错误。 929 */ 930 @Override 931 public Long sadd(Object key, Object... members) { 932 933 return jedisCluster.sadd(keyToBytes(key), valuesToBytesArray(members)); 934 935 } 936 937 /** 938 * 返回集合 key 的基数(集合中元素的数量)。 939 */ 940 @Override 941 public Long scard(Object key) { 942 943 return jedisCluster.scard(keyToBytes(key)); 944 945 } 946 947 /** 948 * 移除并返回集合中的一个随机元素。 949 * 如果只想获取一个随机元素,但不想该元素从集合中被移除的话,可以使用 SRANDMEMBER 命令。 950 */ 951 @Override 952 @SuppressWarnings("unchecked") 953 public <T> T spop(Object key) { 954 955 return (T) valueFromBytes(jedisCluster.spop(keyToBytes(key))); 956 957 } 958 959 /** 960 * 返回集合 key 中的所有成员。 961 * 不存在的 key 被视为空集合。 962 */ 963 @Override 964 @SuppressWarnings("rawtypes") 965 public Set smembers(Object key) { 966 967 Set<byte[]> data = jedisCluster.smembers(keyToBytes(key)); 968 Set<Object> result = new HashSet<Object>(); 969 valueSetFromBytesSet(data, result); 970 return result; 971 972 } 973 974 /** 975 * 判断 member 元素是否集合 key 的成员。 976 */ 977 @Override 978 public boolean sismember(Object key, Object member) { 979 980 return jedisCluster.sismember(keyToBytes(key), valueToBytes(member)); 981 982 } 983 984 /** 985 * 返回多个集合的交集,多个集合由 keys 指定 986 */ 987 @Override 988 @SuppressWarnings("rawtypes") 989 public Set sinter(Object... keys) { 990 991 Set<byte[]> data = jedisCluster.sinter(keysToBytesArray(keys)); 992 Set<Object> result = new HashSet<Object>(); 993 valueSetFromBytesSet(data, result); 994 return result; 995 996 } 997 998 /** 999 * 返回集合中的一个随机元素。 1000 */ 1001 @Override 1002 @SuppressWarnings("unchecked") 1003 public <T> T srandmember(Object key) { 1004 1005 return (T) valueFromBytes(jedisCluster.srandmember(keyToBytes(key))); 1006 1007 } 1008 1009 /** 1010 * 返回集合中的 count 个随机元素。 1011 * 从 JbootRedis 2.6 版本开始, SRANDMEMBER 命令接受可选的 count 参数: 1012 * 如果 count 为正数,且小于集合基数,那么命令返回一个包含 count 个元素的数组,数组中的元素各不相同。 1013 * 如果 count 大于等于集合基数,那么返回整个集合。 1014 * 如果 count 为负数,那么命令返回一个数组,数组中的元素可能会重复出现多次,而数组的长度为 count 的绝对值。 1015 * 该操作和 SPOP 相似,但 SPOP 将随机元素从集合中移除并返回,而 SRANDMEMBER 则仅仅返回随机元素,而不对集合进行任何改动。 1016 */ 1017 @Override 1018 @SuppressWarnings("rawtypes") 1019 public List srandmember(Object key, int count) { 1020 1021 List<byte[]> data = jedisCluster.srandmember(keyToBytes(key), count); 1022 return valueListFromBytesList(data); 1023 1024 } 1025 1026 /** 1027 * 移除集合 key 中的一个或多个 member 元素,不存在的 member 元素会被忽略。 1028 */ 1029 @Override 1030 public Long srem(Object key, Object... members) { 1031 1032 return jedisCluster.srem(keyToBytes(key), valuesToBytesArray(members)); 1033 1034 } 1035 1036 /** 1037 * 返回多个集合的并集,多个集合由 keys 指定 1038 * 不存在的 key 被视为空集。 1039 */ 1040 @Override 1041 @SuppressWarnings("rawtypes") 1042 public Set sunion(Object... keys) { 1043 1044 Set<byte[]> data = jedisCluster.sunion(keysToBytesArray(keys)); 1045 Set<Object> result = new HashSet<Object>(); 1046 valueSetFromBytesSet(data, result); 1047 return result; 1048 1049 } 1050 1051 /** 1052 * 返回一个集合的全部成员,该集合是所有给定集合之间的差集。 1053 * 不存在的 key 被视为空集。 1054 */ 1055 @Override 1056 @SuppressWarnings("rawtypes") 1057 public Set sdiff(Object... keys) { 1058 1059 Set<byte[]> data = jedisCluster.sdiff(keysToBytesArray(keys)); 1060 Set<Object> result = new HashSet<Object>(); 1061 valueSetFromBytesSet(data, result); 1062 return result; 1063 1064 } 1065 1066 /** 1067 * 将一个或多个 member 元素及其 score 值加入到有序集 key 当中。 1068 * 如果某个 member 已经是有序集的成员,那么更新这个 member 的 score 值, 1069 * 并通过重新插入这个 member 元素,来保证该 member 在正确的位置上。 1070 */ 1071 @Override 1072 public Long zadd(Object key, double score, Object member) { 1073 1074 return jedisCluster.zadd(keyToBytes(key), score, valueToBytes(member)); 1075 1076 } 1077 1078 @Override 1079 public Long zadd(Object key, Map<Object, Double> scoreMembers) { 1080 1081 Map<byte[], Double> para = new HashMap<byte[], Double>(); 1082 for (Entry<Object, Double> e : scoreMembers.entrySet()) 1083 para.put(valueToBytes(e.getKey()), e.getValue()); // valueToBytes is important 1084 return jedisCluster.zadd(keyToBytes(key), para); 1085 1086 } 1087 1088 /** 1089 * 返回有序集 key 的基数。 1090 */ 1091 @Override 1092 public Long zcard(Object key) { 1093 1094 return jedisCluster.zcard(keyToBytes(key)); 1095 1096 } 1097 1098 /** 1099 * 返回有序集 key 中, score 值在 min 和 max 之间(默认包括 score 值等于 min 或 max )的成员的数量。 1100 * 关于参数 min 和 max 的详细使用方法,请参考 ZRANGEBYSCORE 命令。 1101 */ 1102 @Override 1103 public Long zcount(Object key, double min, double max) { 1104 1105 return jedisCluster.zcount(keyToBytes(key), min, max); 1106 1107 } 1108 1109 /** 1110 * 为有序集 key 的成员 member 的 score 值加上增量 increment 。 1111 */ 1112 @Override 1113 public Double zincrby(Object key, double score, Object member) { 1114 1115 return jedisCluster.zincrby(keyToBytes(key), score, valueToBytes(member)); 1116 1117 } 1118 1119 /** 1120 * 返回有序集 key 中,指定区间内的成员。 1121 * 其中成员的位置按 score 值递增(从小到大)来排序。 1122 * 具有相同 score 值的成员按字典序(lexicographical order )来排列。 1123 * 如果你需要成员按 score 值递减(从大到小)来排列,请使用 ZREVRANGE 命令。 1124 */ 1125 @Override 1126 @SuppressWarnings("rawtypes") 1127 public List zrange(Object key, long start, long end) { 1128 1129 List<byte[]> data = jedisCluster.zrange(keyToBytes(key), start, end); 1130 List<Object> result = data.stream().map(d->valueFromBytes(d)).collect(Collectors.toList()); // 有序集合必须 LinkedHashSet 1131 return result; 1132 1133 } 1134 1135 /** 1136 * 返回有序集 key 中,指定区间内的成员。 1137 * 其中成员的位置按 score 值递减(从大到小)来排列。 1138 * 具有相同 score 值的成员按字典序的逆序(reverse lexicographical order)排列。 1139 * 除了成员按 score 值递减的次序排列这一点外, ZREVRANGE 命令的其他方面和 ZRANGE 命令一样。 1140 */ 1141 @Override 1142 @SuppressWarnings("rawtypes") 1143 public List zrevrange(Object key, long start, long end) { 1144 1145 List<byte[]> data = jedisCluster.zrevrange(keyToBytes(key), start, end); 1146 List<Object> result = data.stream().map(d->valueFromBytes(d)).collect(Collectors.toList()); // 有序集合必须 LinkedHashSet 1147 return result; 1148 1149 } 1150 1151 /** 1152 * 返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。 1153 * 有序集成员按 score 值递增(从小到大)次序排列。 1154 */ 1155 @Override 1156 @SuppressWarnings("rawtypes") 1157 public List zrangeByScore(Object key, double min, double max) { 1158 1159 List<byte[]> data = jedisCluster.zrangeByScore(keyToBytes(key), min, max); 1160 List<Object> result = data.stream().map(d->valueFromBytes(d)).collect(Collectors.toList()); // 有序集合必须 LinkedHashSet 1161 return result; 1162 1163 } 1164 1165 /** 1166 * 返回有序集 key 中成员 member 的排名。其中有序集成员按 score 值递增(从小到大)顺序排列。 1167 * 排名以 0 为底,也就是说, score 值最小的成员排名为 0 。 1168 * 使用 ZREVRANK 命令可以获得成员按 score 值递减(从大到小)排列的排名。 1169 */ 1170 @Override 1171 public Long zrank(Object key, Object member) { 1172 1173 return jedisCluster.zrank(keyToBytes(key), valueToBytes(member)); 1174 1175 } 1176 1177 /** 1178 * 返回有序集 key 中成员 member 的排名。其中有序集成员按 score 值递减(从大到小)排序。 1179 * 排名以 0 为底,也就是说, score 值最大的成员排名为 0 。 1180 * 使用 ZRANK 命令可以获得成员按 score 值递增(从小到大)排列的排名。 1181 */ 1182 @Override 1183 public Long zrevrank(Object key, Object member) { 1184 1185 return jedisCluster.zrevrank(keyToBytes(key), valueToBytes(member)); 1186 1187 } 1188 1189 /** 1190 * 移除有序集 key 中的一个或多个成员,不存在的成员将被忽略。 1191 * 当 key 存在但不是有序集类型时,返回一个错误。 1192 */ 1193 @Override 1194 public Long zrem(Object key, Object... members) { 1195 1196 return jedisCluster.zrem(keyToBytes(key), valuesToBytesArray(members)); 1197 1198 } 1199 1200 /** 1201 * 返回有序集 key 中,成员 member 的 score 值。 1202 * 如果 member 元素不是有序集 key 的成员,或 key 不存在,返回 nil 。 1203 */ 1204 @Override 1205 public Double zscore(Object key, Object member) { 1206 1207 return jedisCluster.zscore(keyToBytes(key), valueToBytes(member)); 1208 1209 } 1210 1211 /** 1212 * 发布 1213 * 1214 * @param channel 1215 * @param message 1216 */ 1217 @Override 1218 public void publish(String channel, String message) { 1219 1220 jedisCluster.publish(channel, message); 1221 1222 } 1223 1224 /** 1225 * 发布 1226 * 1227 * @param channel 1228 * @param message 1229 */ 1230 @Override 1231 public void publish(byte[] channel, byte[] message) { 1232 jedisCluster.publish(channel, message); 1233 } 1234 1235 1236 /** 1237 * 订阅 1238 * 1239 * @param listener 1240 * @param channels 1241 */ 1242 @Override 1243 public void subscribe(JedisPubSub listener, final String... channels) { 1244 /** 1245 * https://github.com/xetorthio/jedis/wiki/AdvancedUsage 1246 * Note that subscribe is a blocking operation because it will poll JbootRedis for responses on the thread that calls subscribe. 1247 * A single JedisPubSub instance can be used to subscribe to multiple channels. 1248 * You can call subscribe or psubscribe on an existing JedisPubSub instance to change your subscriptions. 1249 */ 1250 new Thread("jboot-redisCluster-subscribe-JedisPubSub") { 1251 @Override 1252 public void run() { 1253 while (true) { 1254 //订阅线程断开连接,需要进行重连 1255 try { 1256 jedisCluster.subscribe(listener, channels); 1257 LOG.warn("Disconnect to redis channel in subscribe JedisPubSub!"); 1258 break; 1259 } catch (JedisConnectionException e) { 1260 LOG.error("failed connect to redis, reconnect it.", e); 1261 QuietlyUtil.sleepQuietly(1000); 1262 } 1263 } 1264 } 1265 }.start(); 1266 } 1267 1268 /** 1269 * 订阅 1270 * 1271 * @param binaryListener 1272 * @param channels 1273 */ 1274 @Override 1275 public void subscribe(BinaryJedisPubSub binaryListener, final byte[]... channels) { 1276 /** 1277 * https://github.com/xetorthio/jedis/wiki/AdvancedUsage 1278 * Note that subscribe is a blocking operation because it will poll JbootRedis for responses on the thread that calls subscribe. 1279 * A single JedisPubSub instance can be used to subscribe to multiple channels. 1280 * You can call subscribe or psubscribe on an existing JedisPubSub instance to change your subscriptions. 1281 */ 1282 new Thread("jboot-redisCluster-subscribe-BinaryJedisPubSub") { 1283 @Override 1284 public void run() { 1285 while (!isClose()) { 1286 //订阅线程断开连接,需要进行重连 1287 try { 1288 jedisCluster.subscribe(binaryListener, channels); 1289 LOG.warn("Disconnect to redis channel in subscribe BinaryJedisPubSub!"); 1290 break; 1291 } catch (Throwable e) { 1292 LOG.error("failed connect to redis, reconnect it.", e); 1293 QuietlyUtil.sleepQuietly(1000); 1294 } 1295 } 1296 } 1297 }.start(); 1298 } 1299 1300 1301 @Override 1302 public RedisScanResult<String> scan(String pattern, String cursor, int scanCount) { 1303 ScanParams params = new ScanParams(); 1304 params.match(pattern).count(scanCount); 1305 ScanResult<String> scanResult = jedisCluster.scan(cursor, params); 1306 return new RedisScanResult<>(scanResult.getCursor(), scanResult.getResult()); 1307 } 1308 1309 @Override 1310 public Object eval(String script, int keyCount, String... params) { 1311 return jedisCluster.eval(script, keyCount, params); 1312 } 1313 1314 public JedisCluster getJedisCluster() { 1315 return jedisCluster; 1316 } 1317 1318} 1319 1320 1321 1322 1323 1324