001/** 002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com). 003 * <p> 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * <p> 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * <p> 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package io.jboot.components.cache.caredis; 017 018import com.github.benmanes.caffeine.cache.Cache; 019import com.github.benmanes.caffeine.cache.Caffeine; 020import com.jfinal.plugin.ehcache.IDataLoader; 021import io.jboot.Jboot; 022import io.jboot.components.cache.JbootCacheBase; 023import io.jboot.components.cache.JbootCacheConfig; 024import io.jboot.components.cache.caffeine.CaffeineCacheImpl; 025import io.jboot.components.cache.redis.JbootRedisCacheImpl; 026import io.jboot.components.serializer.JbootSerializer; 027import io.jboot.support.redis.JbootRedis; 028import io.jboot.utils.StrUtil; 029import redis.clients.jedis.BinaryJedisPubSub; 030 031import java.util.ArrayList; 032import java.util.List; 033import java.util.concurrent.TimeUnit; 034 035/** 036 * 基于 caffeine和redis做的二级缓存 037 * 优点是:减少高并发下redis的io瓶颈 038 */ 039public class JbootCaredisCacheImpl extends JbootCacheBase { 040 041 public static final String DEFAULT_NOTIFY_CHANNEL = "jboot_caredis_channel"; 042 043 private CaffeineCacheImpl caffeineCacheImpl; 044 private JbootRedisCacheImpl redisCacheImpl; 045 private JbootRedis redis; 046 private JbootSerializer serializer; 047 048 private String channel = DEFAULT_NOTIFY_CHANNEL; 049 private String clientId; 050 051 private Cache<String, List> keysCache = Caffeine.newBuilder() 052 .expireAfterAccess(10, TimeUnit.MINUTES) 053 .expireAfterWrite(10, TimeUnit.MINUTES) 054 .build(); 055 056 057 public JbootCaredisCacheImpl(JbootCacheConfig config) { 058 super(config); 059 this.caffeineCacheImpl = new CaffeineCacheImpl(config); 060 this.redisCacheImpl = new JbootRedisCacheImpl(config); 061 this.clientId = StrUtil.uuid(); 062 this.serializer = Jboot.getSerializer(); 063 064 //在某些场景下,多个应用使用同一个 redis 实例,此时可以通过配置 cacheSyncMqChannel 来解决缓存冲突的问题 065 if (StrUtil.isNotBlank(config.getCacheSyncMqChannel())){ 066 this.channel = config.getCacheSyncMqChannel(); 067 } 068 069 this.redis = redisCacheImpl.getRedis(); 070 this.redis.subscribe(new BinaryJedisPubSub() { 071 @Override 072 public void onMessage(byte[] channel, byte[] message) { 073 JbootCaredisCacheImpl.this.onMessage((String) serializer.deserialize(channel), serializer.deserialize(message)); 074 } 075 }, serializer.serialize(channel)); 076 } 077 078 079 @Override 080 public <T> T get(String cacheName, Object key) { 081 T value = caffeineCacheImpl.get(cacheName, key); 082 if (value == null && !config.isUseFirstLevelOnly()) { 083 value = redisCacheImpl.get(cacheName, key); 084 if (value != null) { 085 Integer ttl = redisCacheImpl.getTtl(cacheName, key); 086 if (ttl != null && ttl > 0) { 087 caffeineCacheImpl.put(cacheName, key, value, ttl); 088 } else { 089 caffeineCacheImpl.put(cacheName, key, value); 090 } 091 } 092 } 093 return value; 094 } 095 096 @Override 097 public void put(String cacheName, Object key, Object value) { 098 try { 099 caffeineCacheImpl.put(cacheName, key, value); 100 if (!config.isUseFirstLevelOnly()) { 101 redisCacheImpl.put(cacheName, key, value); 102 } 103 } finally { 104 publishMessage(JbootCaredisMessage.ACTION_PUT, cacheName, key); 105 } 106 } 107 108 109 @Override 110 public void put(String cacheName, Object key, Object value, int liveSeconds) { 111 if (liveSeconds <= 0) { 112 put(cacheName, key, value); 113 return; 114 } 115 try { 116 caffeineCacheImpl.put(cacheName, key, value, liveSeconds); 117 if (!config.isUseFirstLevelOnly()) { 118 redisCacheImpl.put(cacheName, key, value, liveSeconds); 119 } 120 } finally { 121 publishMessage(JbootCaredisMessage.ACTION_PUT, cacheName, key); 122 } 123 } 124 125 @Override 126 public void remove(String cacheName, Object key) { 127 try { 128 caffeineCacheImpl.remove(cacheName, key); 129 if (!config.isUseFirstLevelOnly()) { 130 redisCacheImpl.remove(cacheName, key); 131 } 132 } finally { 133 publishMessage(JbootCaredisMessage.ACTION_REMOVE, cacheName, key); 134 } 135 } 136 137 @Override 138 public void removeAll(String cacheName) { 139 try { 140 caffeineCacheImpl.removeAll(cacheName); 141 if (!config.isUseFirstLevelOnly()) { 142 redisCacheImpl.removeAll(cacheName); 143 } 144 } finally { 145 publishMessage(JbootCaredisMessage.ACTION_REMOVE_ALL, cacheName, null); 146 } 147 } 148 149 @Override 150 public <T> T get(String cacheName, Object key, IDataLoader dataLoader) { 151 T value = get(cacheName, key); 152 if (value != null) { 153 return value; 154 } 155 156 value = (T) dataLoader.load(); 157 if (value != null) { 158 put(cacheName, key, value); 159 } 160 return value; 161 } 162 163 @Override 164 public <T> T get(String cacheName, Object key, IDataLoader dataLoader, int liveSeconds) { 165 if (liveSeconds <= 0) { 166 return get(cacheName, key, dataLoader); 167 } 168 169 T value = get(cacheName, key); 170 if (value != null) { 171 return value; 172 } 173 174 value = (T) dataLoader.load(); 175 if (value != null) { 176 put(cacheName, key, value, liveSeconds); 177 } 178 return value; 179 } 180 181 @Override 182 public Integer getTtl(String cacheName, Object key) { 183 Integer ttl = caffeineCacheImpl.getTtl(cacheName, key); 184 if (ttl == null && !config.isUseFirstLevelOnly()) { 185 ttl = redisCacheImpl.getTtl(cacheName, key); 186 } 187 return ttl; 188 } 189 190 191 @Override 192 public void setTtl(String cacheName, Object key, int seconds) { 193 try { 194 caffeineCacheImpl.setTtl(cacheName, key, seconds); 195 196 if (!config.isUseFirstLevelOnly()) { 197 redisCacheImpl.setTtl(cacheName, key, seconds); 198 } 199 } finally { 200 publishMessage(JbootCaredisMessage.ACTION_REMOVE, cacheName, key); 201 } 202 } 203 204 205 @Override 206 public void refresh(String cacheName, Object key) { 207 publishMessage(JbootCaredisMessage.ACTION_REMOVE, cacheName, key); 208 } 209 210 211 @Override 212 public void refresh(String cacheName) { 213 publishMessage(JbootCaredisMessage.ACTION_REMOVE_ALL, cacheName, null); 214 } 215 216 217 @Override 218 public List getNames() { 219 return config.isUseFirstLevelOnly() ? null : redisCacheImpl.getNames(); 220 } 221 222 223 @Override 224 public List getKeys(String cacheName) { 225 List list = keysCache.getIfPresent(cacheName); 226 if (list != null) { 227 return list; 228 } 229 230 if (!config.isUseFirstLevelOnly()) { 231 list = redisCacheImpl.getKeys(cacheName); 232 if (list == null) { 233 list = new ArrayList(); 234 } 235 keysCache.put(cacheName, list); 236 } 237 238 return list; 239 } 240 241 242 private void publishMessage(int action, String cacheName, Object key) { 243 clearKeysCache(cacheName); 244 JbootCaredisMessage message = new JbootCaredisMessage(clientId, action, cacheName, key); 245 redis.publish(serializer.serialize(channel), serializer.serialize(message)); 246 } 247 248 private void clearKeysCache(String cacheName) { 249 keysCache.invalidate(cacheName); 250 } 251 252 public void onMessage(String channel, Object obj) { 253 254 JbootCaredisMessage message = (JbootCaredisMessage) obj; 255 256 //不处理自己发送的消息 257 if (clientId.equals(message.getClientId())) { 258 return; 259 } 260 261 clearKeysCache(message.getCacheName()); 262 263 switch (message.getAction()) { 264 case JbootCaredisMessage.ACTION_PUT: 265 case JbootCaredisMessage.ACTION_REMOVE: 266 caffeineCacheImpl.remove(message.getCacheName(), message.getKey()); 267 break; 268 case JbootCaredisMessage.ACTION_REMOVE_ALL: 269 caffeineCacheImpl.removeAll(message.getCacheName()); 270 break; 271 } 272 } 273 274 275 public CaffeineCacheImpl getCaffeineCacheImpl() { 276 return caffeineCacheImpl; 277 } 278 279 public JbootRedisCacheImpl getRedisCacheImpl() { 280 return redisCacheImpl; 281 } 282 283}