001/**
002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com).
003 * <p>
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * <p>
008 * http://www.apache.org/licenses/LICENSE-2.0
009 * <p>
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package io.jboot.components.cache.ehredis;
017
018import com.github.benmanes.caffeine.cache.Caffeine;
019import com.github.benmanes.caffeine.cache.LoadingCache;
020import com.jfinal.plugin.ehcache.IDataLoader;
021import io.jboot.Jboot;
022import io.jboot.components.cache.JbootCacheBase;
023import io.jboot.components.cache.JbootCacheConfig;
024import io.jboot.components.cache.ehcache.JbootEhcacheImpl;
025import io.jboot.components.cache.redis.JbootRedisCacheImpl;
026import io.jboot.components.serializer.JbootSerializer;
027import io.jboot.support.redis.JbootRedis;
028import io.jboot.utils.StrUtil;
029import net.sf.ehcache.CacheException;
030import net.sf.ehcache.Ehcache;
031import net.sf.ehcache.Element;
032import net.sf.ehcache.event.CacheEventListener;
033import redis.clients.jedis.BinaryJedisPubSub;
034
035import java.util.ArrayList;
036import java.util.List;
037import java.util.concurrent.TimeUnit;
038
039/**
040 * 基于 ehcache和redis做的二级缓存
041 * 优点是:减少高并发下redis的io瓶颈
042 */
043public class JbootEhredisCacheImpl extends JbootCacheBase implements CacheEventListener {
044
045    public static final String DEFAULT_NOTIFY_CHANNEL = "jboot_ehredis_channel";
046
047    private JbootEhcacheImpl ehcacheImpl;
048    private JbootRedisCacheImpl redisCacheImpl;
049    private JbootRedis redis;
050    private JbootSerializer serializer;
051
052    private String channel = DEFAULT_NOTIFY_CHANNEL;
053    private String clientId;
054
055    private LoadingCache<String, List> keysCache = Caffeine.newBuilder()
056            .expireAfterAccess(10, TimeUnit.MINUTES)
057            .expireAfterWrite(10, TimeUnit.MINUTES)
058            .build(key -> null);
059
060
061    public JbootEhredisCacheImpl(JbootCacheConfig config) {
062        super(config);
063        this.ehcacheImpl = new JbootEhcacheImpl(config);
064        this.ehcacheImpl.setCacheEventListener(this);
065
066        this.redisCacheImpl = new JbootRedisCacheImpl(config);
067        this.clientId = StrUtil.uuid();
068        this.serializer = Jboot.getSerializer();
069
070        if (StrUtil.isNotBlank(config.getCacheSyncMqChannel())){
071            this.channel = config.getCacheSyncMqChannel();
072        }
073
074        this.redis = redisCacheImpl.getRedis();
075        this.redis.subscribe(new BinaryJedisPubSub() {
076            @Override
077            public void onMessage(byte[] channel, byte[] message) {
078                JbootEhredisCacheImpl.this.onMessage((String) serializer.deserialize(channel), serializer.deserialize(message));
079            }
080        }, serializer.serialize(channel));
081    }
082
083
084    @Override
085    public <T> T get(String cacheName, Object key) {
086        T value = ehcacheImpl.get(cacheName, key);
087        if (value == null && !config.isUseFirstLevelOnly()) {
088            value = redisCacheImpl.get(cacheName, key);
089            if (value != null) {
090                Integer ttl = redisCacheImpl.getTtl(cacheName, key);
091                if (ttl != null && ttl > 0) {
092                    ehcacheImpl.put(cacheName, key, value, ttl);
093                } else {
094                    ehcacheImpl.put(cacheName, key, value);
095                }
096            }
097        }
098        return value;
099    }
100
101    @Override
102    public void put(String cacheName, Object key, Object value) {
103        try {
104            ehcacheImpl.put(cacheName, key, value);
105            if (!config.isUseFirstLevelOnly()) {
106                redisCacheImpl.put(cacheName, key, value);
107            }
108        } finally {
109            publishMessage(JbootEhredisMessage.ACTION_PUT, cacheName, key);
110        }
111    }
112
113
114    @Override
115    public void put(String cacheName, Object key, Object value, int liveSeconds) {
116        if (liveSeconds <= 0) {
117            put(cacheName, key, value);
118            return;
119        }
120        try {
121            ehcacheImpl.put(cacheName, key, value, liveSeconds);
122
123            if (!config.isUseFirstLevelOnly()) {
124                redisCacheImpl.put(cacheName, key, value, liveSeconds);
125            }
126        } finally {
127            publishMessage(JbootEhredisMessage.ACTION_PUT, cacheName, key);
128        }
129    }
130
131    @Override
132    public void remove(String cacheName, Object key) {
133        try {
134            ehcacheImpl.remove(cacheName, key);
135
136            if (!config.isUseFirstLevelOnly()) {
137                redisCacheImpl.remove(cacheName, key);
138            }
139        } finally {
140            publishMessage(JbootEhredisMessage.ACTION_REMOVE, cacheName, key);
141        }
142    }
143
144    @Override
145    public void removeAll(String cacheName) {
146        try {
147            ehcacheImpl.removeAll(cacheName);
148
149            if (!config.isUseFirstLevelOnly()) {
150                redisCacheImpl.removeAll(cacheName);
151            }
152        } finally {
153            publishMessage(JbootEhredisMessage.ACTION_REMOVE_ALL, cacheName, null);
154        }
155    }
156
157    @Override
158    public <T> T get(String cacheName, Object key, IDataLoader dataLoader) {
159        T value = get(cacheName, key);
160        if (value != null) {
161            return value;
162        }
163
164        value = (T) dataLoader.load();
165        if (value != null) {
166            put(cacheName, key, value);
167        }
168        return value;
169    }
170
171    @Override
172    public <T> T get(String cacheName, Object key, IDataLoader dataLoader, int liveSeconds) {
173        if (liveSeconds <= 0) {
174            return get(cacheName, key, dataLoader);
175        }
176
177        T value = get(cacheName, key);
178        if (value != null) {
179            return value;
180        }
181
182        value = (T) dataLoader.load();
183        if (value != null) {
184            put(cacheName, key, value, liveSeconds);
185        }
186        return value;
187    }
188
189    @Override
190    public Integer getTtl(String cacheName, Object key) {
191        Integer ttl = ehcacheImpl.getTtl(cacheName, key);
192        if (ttl == null && !config.isUseFirstLevelOnly()) {
193            ttl = redisCacheImpl.getTtl(cacheName, key);
194        }
195        return ttl;
196    }
197
198
199    @Override
200    public void setTtl(String cacheName, Object key, int seconds) {
201        try {
202            ehcacheImpl.setTtl(cacheName, key, seconds);
203
204            if (!config.isUseFirstLevelOnly()) {
205                redisCacheImpl.setTtl(cacheName, key, seconds);
206            }
207        } finally {
208            publishMessage(JbootEhredisMessage.ACTION_REMOVE, cacheName, key);
209        }
210    }
211
212
213    private void publishMessage(int action, String cacheName, Object key) {
214        clearKeysCache(cacheName);
215        JbootEhredisMessage message = new JbootEhredisMessage(clientId, action, cacheName, key);
216        redis.publish(serializer.serialize(channel), serializer.serialize(message));
217    }
218
219    private void clearKeysCache(String cacheName) {
220        keysCache.invalidate(cacheName);
221    }
222
223
224    @Override
225    public void refresh(String cacheName, Object key) {
226        publishMessage(JbootEhredisMessage.ACTION_REMOVE, cacheName, key);
227    }
228
229
230    @Override
231    public void refresh(String cacheName) {
232        publishMessage(JbootEhredisMessage.ACTION_REMOVE_ALL, cacheName, null);
233    }
234
235    @Override
236    public List getNames() {
237        return config.isUseFirstLevelOnly() ? null : redisCacheImpl.getNames();
238    }
239
240    @Override
241    public List getKeys(String cacheName) {
242        List list = keysCache.getIfPresent(cacheName);
243        if (list == null && !config.isUseFirstLevelOnly()) {
244            list = redisCacheImpl.getKeys(cacheName);
245            if (list == null) {
246                list = new ArrayList();
247            }
248            keysCache.put(cacheName, list);
249        }
250        return list;
251    }
252
253
254    public void onMessage(String channel, Object obj) {
255
256        JbootEhredisMessage message = (JbootEhredisMessage) obj;
257
258        //不处理自己发送的消息
259        if (clientId.equals(message.getClientId())) {
260            return;
261        }
262
263        clearKeysCache(message.getCacheName());
264
265        switch (message.getAction()) {
266            case JbootEhredisMessage.ACTION_PUT:
267            case JbootEhredisMessage.ACTION_REMOVE:
268                ehcacheImpl.remove(message.getCacheName(), message.getKey());
269                break;
270            case JbootEhredisMessage.ACTION_REMOVE_ALL:
271                ehcacheImpl.removeAll(message.getCacheName());
272                break;
273        }
274    }
275
276    public JbootEhcacheImpl getEhcacheImpl() {
277        return ehcacheImpl;
278    }
279
280    public JbootRedisCacheImpl getRedisCacheImpl() {
281        return redisCacheImpl;
282    }
283
284
285    @Override
286    public void notifyElementRemoved(Ehcache cache, Element element) throws CacheException {
287    }
288
289    @Override
290    public void notifyElementPut(Ehcache cache, Element element) throws CacheException {
291    }
292
293    @Override
294    public void notifyElementUpdated(Ehcache cache, Element element) throws CacheException {
295    }
296
297    @Override
298    public void notifyElementExpired(Ehcache cache, Element element) {
299        clearKeysCache(cache.getName());
300    }
301
302    @Override
303    public void notifyElementEvicted(Ehcache cache, Element element) {
304    }
305
306    @Override
307    public void notifyRemoveAll(Ehcache cache) {
308    }
309
310    @Override
311    public Object clone() throws CloneNotSupportedException {
312        throw new CloneNotSupportedException();
313    }
314
315    @Override
316    public void dispose() {
317    }
318}