001/**
002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com).
003 * <p>
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * <p>
008 * http://www.apache.org/licenses/LICENSE-2.0
009 * <p>
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package io.jboot.components.cache.caredis;
017
018import com.github.benmanes.caffeine.cache.Cache;
019import com.github.benmanes.caffeine.cache.Caffeine;
020import com.jfinal.plugin.ehcache.IDataLoader;
021import io.jboot.Jboot;
022import io.jboot.components.cache.JbootCacheBase;
023import io.jboot.components.cache.JbootCacheConfig;
024import io.jboot.components.cache.caffeine.CaffeineCacheImpl;
025import io.jboot.components.cache.redis.JbootRedisCacheImpl;
026import io.jboot.components.serializer.JbootSerializer;
027import io.jboot.support.redis.JbootRedis;
028import io.jboot.utils.StrUtil;
029import redis.clients.jedis.BinaryJedisPubSub;
030
031import java.util.ArrayList;
032import java.util.List;
033import java.util.concurrent.TimeUnit;
034
035/**
036 * 基于 caffeine和redis做的二级缓存
037 * 优点是:减少高并发下redis的io瓶颈
038 */
039public class JbootCaredisCacheImpl extends JbootCacheBase {
040
041    public static final String DEFAULT_NOTIFY_CHANNEL = "jboot_caredis_channel";
042
043    private CaffeineCacheImpl caffeineCacheImpl;
044    private JbootRedisCacheImpl redisCacheImpl;
045    private JbootRedis redis;
046    private JbootSerializer serializer;
047
048    private String channel = DEFAULT_NOTIFY_CHANNEL;
049    private String clientId;
050
051    private Cache<String, List> keysCache = Caffeine.newBuilder()
052            .expireAfterAccess(10, TimeUnit.MINUTES)
053            .expireAfterWrite(10, TimeUnit.MINUTES)
054            .build();
055
056
057    public JbootCaredisCacheImpl(JbootCacheConfig config) {
058        super(config);
059        this.caffeineCacheImpl = new CaffeineCacheImpl(config);
060        this.redisCacheImpl = new JbootRedisCacheImpl(config);
061        this.clientId = StrUtil.uuid();
062        this.serializer = Jboot.getSerializer();
063
064        //在某些场景下,多个应用使用同一个 redis 实例,此时可以通过配置 cacheSyncMqChannel 来解决缓存冲突的问题
065        if (StrUtil.isNotBlank(config.getCacheSyncMqChannel())){
066            this.channel = config.getCacheSyncMqChannel();
067        }
068
069        this.redis = redisCacheImpl.getRedis();
070        this.redis.subscribe(new BinaryJedisPubSub() {
071            @Override
072            public void onMessage(byte[] channel, byte[] message) {
073                JbootCaredisCacheImpl.this.onMessage((String) serializer.deserialize(channel), serializer.deserialize(message));
074            }
075        }, serializer.serialize(channel));
076    }
077
078
079    @Override
080    public <T> T get(String cacheName, Object key) {
081        T value = caffeineCacheImpl.get(cacheName, key);
082        if (value == null && !config.isUseFirstLevelOnly()) {
083            value = redisCacheImpl.get(cacheName, key);
084            if (value != null) {
085                Integer ttl = redisCacheImpl.getTtl(cacheName, key);
086                if (ttl != null && ttl > 0) {
087                    caffeineCacheImpl.put(cacheName, key, value, ttl);
088                } else {
089                    caffeineCacheImpl.put(cacheName, key, value);
090                }
091            }
092        }
093        return value;
094    }
095
096    @Override
097    public void put(String cacheName, Object key, Object value) {
098        try {
099            caffeineCacheImpl.put(cacheName, key, value);
100            if (!config.isUseFirstLevelOnly()) {
101                redisCacheImpl.put(cacheName, key, value);
102            }
103        } finally {
104            publishMessage(JbootCaredisMessage.ACTION_PUT, cacheName, key);
105        }
106    }
107
108
109    @Override
110    public void put(String cacheName, Object key, Object value, int liveSeconds) {
111        if (liveSeconds <= 0) {
112            put(cacheName, key, value);
113            return;
114        }
115        try {
116            caffeineCacheImpl.put(cacheName, key, value, liveSeconds);
117            if (!config.isUseFirstLevelOnly()) {
118                redisCacheImpl.put(cacheName, key, value, liveSeconds);
119            }
120        } finally {
121            publishMessage(JbootCaredisMessage.ACTION_PUT, cacheName, key);
122        }
123    }
124
125    @Override
126    public void remove(String cacheName, Object key) {
127        try {
128            caffeineCacheImpl.remove(cacheName, key);
129            if (!config.isUseFirstLevelOnly()) {
130                redisCacheImpl.remove(cacheName, key);
131            }
132        } finally {
133            publishMessage(JbootCaredisMessage.ACTION_REMOVE, cacheName, key);
134        }
135    }
136
137    @Override
138    public void removeAll(String cacheName) {
139        try {
140            caffeineCacheImpl.removeAll(cacheName);
141            if (!config.isUseFirstLevelOnly()) {
142                redisCacheImpl.removeAll(cacheName);
143            }
144        } finally {
145            publishMessage(JbootCaredisMessage.ACTION_REMOVE_ALL, cacheName, null);
146        }
147    }
148
149    @Override
150    public <T> T get(String cacheName, Object key, IDataLoader dataLoader) {
151        T value = get(cacheName, key);
152        if (value != null) {
153            return value;
154        }
155
156        value = (T) dataLoader.load();
157        if (value != null) {
158            put(cacheName, key, value);
159        }
160        return value;
161    }
162
163    @Override
164    public <T> T get(String cacheName, Object key, IDataLoader dataLoader, int liveSeconds) {
165        if (liveSeconds <= 0) {
166            return get(cacheName, key, dataLoader);
167        }
168
169        T value = get(cacheName, key);
170        if (value != null) {
171            return value;
172        }
173
174        value = (T) dataLoader.load();
175        if (value != null) {
176            put(cacheName, key, value, liveSeconds);
177        }
178        return value;
179    }
180
181    @Override
182    public Integer getTtl(String cacheName, Object key) {
183        Integer ttl = caffeineCacheImpl.getTtl(cacheName, key);
184        if (ttl == null && !config.isUseFirstLevelOnly()) {
185            ttl = redisCacheImpl.getTtl(cacheName, key);
186        }
187        return ttl;
188    }
189
190
191    @Override
192    public void setTtl(String cacheName, Object key, int seconds) {
193        try {
194            caffeineCacheImpl.setTtl(cacheName, key, seconds);
195
196            if (!config.isUseFirstLevelOnly()) {
197                redisCacheImpl.setTtl(cacheName, key, seconds);
198            }
199        } finally {
200            publishMessage(JbootCaredisMessage.ACTION_REMOVE, cacheName, key);
201        }
202    }
203
204
205    @Override
206    public void refresh(String cacheName, Object key) {
207        publishMessage(JbootCaredisMessage.ACTION_REMOVE, cacheName, key);
208    }
209
210
211    @Override
212    public void refresh(String cacheName) {
213        publishMessage(JbootCaredisMessage.ACTION_REMOVE_ALL, cacheName, null);
214    }
215
216
217    @Override
218    public List getNames() {
219        return config.isUseFirstLevelOnly() ? null : redisCacheImpl.getNames();
220    }
221
222
223    @Override
224    public List getKeys(String cacheName) {
225        List list = keysCache.getIfPresent(cacheName);
226        if (list != null) {
227            return list;
228        }
229
230        if (!config.isUseFirstLevelOnly()) {
231            list = redisCacheImpl.getKeys(cacheName);
232            if (list == null) {
233                list = new ArrayList();
234            }
235            keysCache.put(cacheName, list);
236        }
237
238        return list;
239    }
240
241
242    private void publishMessage(int action, String cacheName, Object key) {
243        clearKeysCache(cacheName);
244        JbootCaredisMessage message = new JbootCaredisMessage(clientId, action, cacheName, key);
245        redis.publish(serializer.serialize(channel), serializer.serialize(message));
246    }
247
248    private void clearKeysCache(String cacheName) {
249        keysCache.invalidate(cacheName);
250    }
251
252    public void onMessage(String channel, Object obj) {
253
254        JbootCaredisMessage message = (JbootCaredisMessage) obj;
255
256        //不处理自己发送的消息
257        if (clientId.equals(message.getClientId())) {
258            return;
259        }
260
261        clearKeysCache(message.getCacheName());
262
263        switch (message.getAction()) {
264            case JbootCaredisMessage.ACTION_PUT:
265            case JbootCaredisMessage.ACTION_REMOVE:
266                caffeineCacheImpl.remove(message.getCacheName(), message.getKey());
267                break;
268            case JbootCaredisMessage.ACTION_REMOVE_ALL:
269                caffeineCacheImpl.removeAll(message.getCacheName());
270                break;
271        }
272    }
273
274
275    public CaffeineCacheImpl getCaffeineCacheImpl() {
276        return caffeineCacheImpl;
277    }
278
279    public JbootRedisCacheImpl getRedisCacheImpl() {
280        return redisCacheImpl;
281    }
282
283}