001/** 002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com). 003 * <p> 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * <p> 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * <p> 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package io.jboot.components.cache.ehredis; 017 018import com.github.benmanes.caffeine.cache.Caffeine; 019import com.github.benmanes.caffeine.cache.LoadingCache; 020import com.jfinal.plugin.ehcache.IDataLoader; 021import io.jboot.Jboot; 022import io.jboot.components.cache.JbootCacheBase; 023import io.jboot.components.cache.JbootCacheConfig; 024import io.jboot.components.cache.ehcache.JbootEhcacheImpl; 025import io.jboot.components.cache.redis.JbootRedisCacheImpl; 026import io.jboot.components.serializer.JbootSerializer; 027import io.jboot.support.redis.JbootRedis; 028import io.jboot.utils.StrUtil; 029import net.sf.ehcache.CacheException; 030import net.sf.ehcache.Ehcache; 031import net.sf.ehcache.Element; 032import net.sf.ehcache.event.CacheEventListener; 033import redis.clients.jedis.BinaryJedisPubSub; 034 035import java.util.ArrayList; 036import java.util.List; 037import java.util.concurrent.TimeUnit; 038 039/** 040 * 基于 ehcache和redis做的二级缓存 041 * 优点是:减少高并发下redis的io瓶颈 042 */ 043public class JbootEhredisCacheImpl extends JbootCacheBase implements CacheEventListener { 044 045 public static final String DEFAULT_NOTIFY_CHANNEL = "jboot_ehredis_channel"; 046 047 private JbootEhcacheImpl ehcacheImpl; 048 private JbootRedisCacheImpl redisCacheImpl; 049 private JbootRedis redis; 050 private JbootSerializer serializer; 051 052 private String channel = DEFAULT_NOTIFY_CHANNEL; 053 private String clientId; 054 055 private LoadingCache<String, List> keysCache = Caffeine.newBuilder() 056 .expireAfterAccess(10, TimeUnit.MINUTES) 057 .expireAfterWrite(10, TimeUnit.MINUTES) 058 .build(key -> null); 059 060 061 public JbootEhredisCacheImpl(JbootCacheConfig config) { 062 super(config); 063 this.ehcacheImpl = new JbootEhcacheImpl(config); 064 this.ehcacheImpl.setCacheEventListener(this); 065 066 this.redisCacheImpl = new JbootRedisCacheImpl(config); 067 this.clientId = StrUtil.uuid(); 068 this.serializer = Jboot.getSerializer(); 069 070 if (StrUtil.isNotBlank(config.getCacheSyncMqChannel())){ 071 this.channel = config.getCacheSyncMqChannel(); 072 } 073 074 this.redis = redisCacheImpl.getRedis(); 075 this.redis.subscribe(new BinaryJedisPubSub() { 076 @Override 077 public void onMessage(byte[] channel, byte[] message) { 078 JbootEhredisCacheImpl.this.onMessage((String) serializer.deserialize(channel), serializer.deserialize(message)); 079 } 080 }, serializer.serialize(channel)); 081 } 082 083 084 @Override 085 public <T> T get(String cacheName, Object key) { 086 T value = ehcacheImpl.get(cacheName, key); 087 if (value == null && !config.isUseFirstLevelOnly()) { 088 value = redisCacheImpl.get(cacheName, key); 089 if (value != null) { 090 Integer ttl = redisCacheImpl.getTtl(cacheName, key); 091 if (ttl != null && ttl > 0) { 092 ehcacheImpl.put(cacheName, key, value, ttl); 093 } else { 094 ehcacheImpl.put(cacheName, key, value); 095 } 096 } 097 } 098 return value; 099 } 100 101 @Override 102 public void put(String cacheName, Object key, Object value) { 103 try { 104 ehcacheImpl.put(cacheName, key, value); 105 if (!config.isUseFirstLevelOnly()) { 106 redisCacheImpl.put(cacheName, key, value); 107 } 108 } finally { 109 publishMessage(JbootEhredisMessage.ACTION_PUT, cacheName, key); 110 } 111 } 112 113 114 @Override 115 public void put(String cacheName, Object key, Object value, int liveSeconds) { 116 if (liveSeconds <= 0) { 117 put(cacheName, key, value); 118 return; 119 } 120 try { 121 ehcacheImpl.put(cacheName, key, value, liveSeconds); 122 123 if (!config.isUseFirstLevelOnly()) { 124 redisCacheImpl.put(cacheName, key, value, liveSeconds); 125 } 126 } finally { 127 publishMessage(JbootEhredisMessage.ACTION_PUT, cacheName, key); 128 } 129 } 130 131 @Override 132 public void remove(String cacheName, Object key) { 133 try { 134 ehcacheImpl.remove(cacheName, key); 135 136 if (!config.isUseFirstLevelOnly()) { 137 redisCacheImpl.remove(cacheName, key); 138 } 139 } finally { 140 publishMessage(JbootEhredisMessage.ACTION_REMOVE, cacheName, key); 141 } 142 } 143 144 @Override 145 public void removeAll(String cacheName) { 146 try { 147 ehcacheImpl.removeAll(cacheName); 148 149 if (!config.isUseFirstLevelOnly()) { 150 redisCacheImpl.removeAll(cacheName); 151 } 152 } finally { 153 publishMessage(JbootEhredisMessage.ACTION_REMOVE_ALL, cacheName, null); 154 } 155 } 156 157 @Override 158 public <T> T get(String cacheName, Object key, IDataLoader dataLoader) { 159 T value = get(cacheName, key); 160 if (value != null) { 161 return value; 162 } 163 164 value = (T) dataLoader.load(); 165 if (value != null) { 166 put(cacheName, key, value); 167 } 168 return value; 169 } 170 171 @Override 172 public <T> T get(String cacheName, Object key, IDataLoader dataLoader, int liveSeconds) { 173 if (liveSeconds <= 0) { 174 return get(cacheName, key, dataLoader); 175 } 176 177 T value = get(cacheName, key); 178 if (value != null) { 179 return value; 180 } 181 182 value = (T) dataLoader.load(); 183 if (value != null) { 184 put(cacheName, key, value, liveSeconds); 185 } 186 return value; 187 } 188 189 @Override 190 public Integer getTtl(String cacheName, Object key) { 191 Integer ttl = ehcacheImpl.getTtl(cacheName, key); 192 if (ttl == null && !config.isUseFirstLevelOnly()) { 193 ttl = redisCacheImpl.getTtl(cacheName, key); 194 } 195 return ttl; 196 } 197 198 199 @Override 200 public void setTtl(String cacheName, Object key, int seconds) { 201 try { 202 ehcacheImpl.setTtl(cacheName, key, seconds); 203 204 if (!config.isUseFirstLevelOnly()) { 205 redisCacheImpl.setTtl(cacheName, key, seconds); 206 } 207 } finally { 208 publishMessage(JbootEhredisMessage.ACTION_REMOVE, cacheName, key); 209 } 210 } 211 212 213 private void publishMessage(int action, String cacheName, Object key) { 214 clearKeysCache(cacheName); 215 JbootEhredisMessage message = new JbootEhredisMessage(clientId, action, cacheName, key); 216 redis.publish(serializer.serialize(channel), serializer.serialize(message)); 217 } 218 219 private void clearKeysCache(String cacheName) { 220 keysCache.invalidate(cacheName); 221 } 222 223 224 @Override 225 public void refresh(String cacheName, Object key) { 226 publishMessage(JbootEhredisMessage.ACTION_REMOVE, cacheName, key); 227 } 228 229 230 @Override 231 public void refresh(String cacheName) { 232 publishMessage(JbootEhredisMessage.ACTION_REMOVE_ALL, cacheName, null); 233 } 234 235 @Override 236 public List getNames() { 237 return config.isUseFirstLevelOnly() ? null : redisCacheImpl.getNames(); 238 } 239 240 @Override 241 public List getKeys(String cacheName) { 242 List list = keysCache.getIfPresent(cacheName); 243 if (list == null && !config.isUseFirstLevelOnly()) { 244 list = redisCacheImpl.getKeys(cacheName); 245 if (list == null) { 246 list = new ArrayList(); 247 } 248 keysCache.put(cacheName, list); 249 } 250 return list; 251 } 252 253 254 public void onMessage(String channel, Object obj) { 255 256 JbootEhredisMessage message = (JbootEhredisMessage) obj; 257 258 //不处理自己发送的消息 259 if (clientId.equals(message.getClientId())) { 260 return; 261 } 262 263 clearKeysCache(message.getCacheName()); 264 265 switch (message.getAction()) { 266 case JbootEhredisMessage.ACTION_PUT: 267 case JbootEhredisMessage.ACTION_REMOVE: 268 ehcacheImpl.remove(message.getCacheName(), message.getKey()); 269 break; 270 case JbootEhredisMessage.ACTION_REMOVE_ALL: 271 ehcacheImpl.removeAll(message.getCacheName()); 272 break; 273 } 274 } 275 276 public JbootEhcacheImpl getEhcacheImpl() { 277 return ehcacheImpl; 278 } 279 280 public JbootRedisCacheImpl getRedisCacheImpl() { 281 return redisCacheImpl; 282 } 283 284 285 @Override 286 public void notifyElementRemoved(Ehcache cache, Element element) throws CacheException { 287 } 288 289 @Override 290 public void notifyElementPut(Ehcache cache, Element element) throws CacheException { 291 } 292 293 @Override 294 public void notifyElementUpdated(Ehcache cache, Element element) throws CacheException { 295 } 296 297 @Override 298 public void notifyElementExpired(Ehcache cache, Element element) { 299 clearKeysCache(cache.getName()); 300 } 301 302 @Override 303 public void notifyElementEvicted(Ehcache cache, Element element) { 304 } 305 306 @Override 307 public void notifyRemoveAll(Ehcache cache) { 308 } 309 310 @Override 311 public Object clone() throws CloneNotSupportedException { 312 throw new CloneNotSupportedException(); 313 } 314 315 @Override 316 public void dispose() { 317 } 318}