001/** 002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com). 003 * <p> 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * <p> 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * <p> 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package io.jboot.components.mq.redismq; 017 018import com.jfinal.log.Log; 019import io.jboot.Jboot; 020import io.jboot.components.mq.Jbootmq; 021import io.jboot.components.mq.JbootmqBase; 022import io.jboot.components.mq.JbootmqConfig; 023import io.jboot.components.mq.JbootmqMessageListener; 024import io.jboot.exception.JbootIllegalConfigException; 025import io.jboot.support.redis.JbootRedis; 026import io.jboot.support.redis.JbootRedisManager; 027import io.jboot.utils.ConfigUtil; 028import io.jboot.utils.StrUtil; 029import redis.clients.jedis.BinaryJedisPubSub; 030 031import java.util.HashMap; 032import java.util.Map; 033 034 035public class JbootRedismqImpl extends JbootmqBase implements Jbootmq, Runnable { 036 037 private static final Log LOG = Log.getLog(JbootRedismqImpl.class); 038 039 private JbootRedis redis; 040 private Thread dequeueThread; 041 private BinaryJedisPubSub jedisPubSub; 042 private long interval = 100L; 043 044 private Integer database = 0; 045 046 public JbootRedismqImpl(JbootmqConfig config) { 047 super(config); 048 049 JbootRedismqConfig redisConfig = null; 050 String typeName = config.getTypeName(); 051 if (StrUtil.isNotBlank(typeName)) { 052 Map<String, JbootRedismqConfig> configModels = ConfigUtil.getConfigModels(JbootRedismqConfig.class); 053 if (!configModels.containsKey(typeName)) { 054 throw new JbootIllegalConfigException("Please config \"jboot.mq.redis." + typeName + ".host\" in your jboot.properties."); 055 } 056 redisConfig = configModels.get(typeName); 057 } else { 058 redisConfig = Jboot.config(JbootRedismqConfig.class); 059 } 060 061 database = redisConfig.getDatabase(); 062 063 if (redisConfig.isConfigOk()) { 064 redis = JbootRedisManager.me().getRedis(redisConfig); 065 } else { 066 redis = Jboot.getRedis(); 067 } 068 069 if (redis == null) { 070 throw new JbootIllegalConfigException("can not use redis mq (redis mq is default), " + 071 "please config jboot.redis.host=your-host , or use other mq component. "); 072 } 073 } 074 075 private Map<String, String> outterChannelMap = new HashMap<>(); 076 077 @Override 078 protected void onStartListening() { 079 String[] channels = this.channels.toArray(new String[]{}); 080 jedisPubSub = new BinaryJedisPubSub() { 081 @Override 082 public void onMessage(byte[] channel, byte[] message) { 083 String thisChannel = redis.bytesToKey(channel); 084 String realChannel = outterChannelMap.get(thisChannel); 085 if (realChannel == null) { 086 LOG.warn("Jboot has recevied mq message, But it has no listener to process. channel:" + thisChannel); 087 } 088 notifyListeners(realChannel, getSerializer().deserialize(message) 089 , new RedismqMessageContext(JbootRedismqImpl.this)); 090 } 091 }; 092 093 for (int i = 0; i< channels.length; i++) { 094 outterChannelMap.put(channels[i] + "_" + database, channels[i]); 095 channels[i] = channels[i] + "_" + database; 096 } 097 redis.subscribe(jedisPubSub, redis.keysToBytesArray(channels)); 098 099 dequeueThread = new Thread(this, "redis-dequeue-thread"); 100 dequeueThread.start(); 101 } 102 103 @Override 104 protected void onStopListening() { 105 if (jedisPubSub != null) { 106 jedisPubSub.unsubscribe(); 107 } 108 dequeueThread.interrupt(); 109 } 110 111 112 @Override 113 public void enqueue(Object message, String toChannel) { 114 redis.lpush(toChannel + "_" + database, message); 115 } 116 117 118 @Override 119 public void publish(Object message, String toChannel) { 120 redis.publish(redis.keyToBytes(toChannel + "_" + database), getSerializer().serialize(message)); 121 } 122 123 @Override 124 public void run() { 125 while (isStarted) { 126 try { 127 doExecuteDequeue(); 128 Thread.sleep(interval); 129 } catch (Exception ex) { 130 LOG.error(ex.toString(), ex); 131 } 132 } 133 } 134 135 public void doExecuteDequeue() { 136 for (String channel : this.channels) { 137 Object data = redis.lpop(channel + "_" + database); 138 if (data != null) { 139 notifyListeners(channel, data, new RedismqMessageContext(JbootRedismqImpl.this)); 140 } 141 } 142 } 143 144 public long getInterval() { 145 return interval; 146 } 147 148 public void setInterval(long interval) { 149 this.interval = interval; 150 } 151}