001/**
002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com).
003 * <p>
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * <p>
008 * http://www.apache.org/licenses/LICENSE-2.0
009 * <p>
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package io.jboot.components.mq.redismq;
017
018import com.jfinal.log.Log;
019import io.jboot.Jboot;
020import io.jboot.components.mq.Jbootmq;
021import io.jboot.components.mq.JbootmqBase;
022import io.jboot.components.mq.JbootmqConfig;
023import io.jboot.components.mq.JbootmqMessageListener;
024import io.jboot.exception.JbootIllegalConfigException;
025import io.jboot.support.redis.JbootRedis;
026import io.jboot.support.redis.JbootRedisManager;
027import io.jboot.utils.ConfigUtil;
028import io.jboot.utils.StrUtil;
029import redis.clients.jedis.BinaryJedisPubSub;
030
031import java.util.HashMap;
032import java.util.Map;
033
034
035public class JbootRedismqImpl extends JbootmqBase implements Jbootmq, Runnable {
036
037    private static final Log LOG = Log.getLog(JbootRedismqImpl.class);
038
039    private JbootRedis redis;
040    private Thread dequeueThread;
041    private BinaryJedisPubSub jedisPubSub;
042    private long interval = 100L;
043
044    private Integer database = 0;
045
046    public JbootRedismqImpl(JbootmqConfig config) {
047        super(config);
048
049        JbootRedismqConfig redisConfig = null;
050        String typeName = config.getTypeName();
051        if (StrUtil.isNotBlank(typeName)) {
052            Map<String, JbootRedismqConfig> configModels = ConfigUtil.getConfigModels(JbootRedismqConfig.class);
053            if (!configModels.containsKey(typeName)) {
054                throw new JbootIllegalConfigException("Please config \"jboot.mq.redis." + typeName + ".host\" in your jboot.properties.");
055            }
056            redisConfig = configModels.get(typeName);
057        } else {
058            redisConfig = Jboot.config(JbootRedismqConfig.class);
059        }
060        
061        database = redisConfig.getDatabase();
062        
063        if (redisConfig.isConfigOk()) {
064            redis = JbootRedisManager.me().getRedis(redisConfig);
065        } else {
066            redis = Jboot.getRedis();
067        }
068
069        if (redis == null) {
070            throw new JbootIllegalConfigException("can not use redis mq (redis mq is default), " +
071                    "please config jboot.redis.host=your-host , or use other mq component. ");
072        }
073    }
074
075    private Map<String, String> outterChannelMap = new HashMap<>();
076    
077    @Override
078    protected void onStartListening() {
079        String[] channels = this.channels.toArray(new String[]{});
080        jedisPubSub = new BinaryJedisPubSub() {
081            @Override
082            public void onMessage(byte[] channel, byte[] message) {
083                String thisChannel = redis.bytesToKey(channel);
084                String realChannel = outterChannelMap.get(thisChannel);
085                if (realChannel == null) {
086                    LOG.warn("Jboot has recevied mq message, But it has no listener to process. channel:" + thisChannel);
087                }
088                                notifyListeners(realChannel, getSerializer().deserialize(message)
089                        , new RedismqMessageContext(JbootRedismqImpl.this));
090            }
091        };
092
093        for (int i = 0; i< channels.length; i++) {
094                outterChannelMap.put(channels[i] + "_" + database, channels[i]);
095                channels[i] =  channels[i] + "_" + database;
096        }
097        redis.subscribe(jedisPubSub, redis.keysToBytesArray(channels));
098
099        dequeueThread = new Thread(this, "redis-dequeue-thread");
100        dequeueThread.start();
101    }
102
103    @Override
104    protected void onStopListening() {
105        if (jedisPubSub != null) {
106            jedisPubSub.unsubscribe();
107        }
108        dequeueThread.interrupt();
109    }
110
111
112    @Override
113    public void enqueue(Object message, String toChannel) {
114        redis.lpush(toChannel + "_" + database, message);
115    }
116
117
118    @Override
119    public void publish(Object message, String toChannel) {
120        redis.publish(redis.keyToBytes(toChannel + "_" + database), getSerializer().serialize(message));
121    }
122
123    @Override
124    public void run() {
125        while (isStarted) {
126            try {
127                doExecuteDequeue();
128                Thread.sleep(interval);
129            } catch (Exception ex) {
130                LOG.error(ex.toString(), ex);
131            }
132        }
133    }
134
135    public void doExecuteDequeue() {
136        for (String channel : this.channels) {
137            Object data = redis.lpop(channel + "_" + database);
138            if (data != null) {
139                notifyListeners(channel, data, new RedismqMessageContext(JbootRedismqImpl.this));
140            }
141        }
142    }
143
144    public long getInterval() {
145        return interval;
146    }
147
148    public void setInterval(long interval) {
149        this.interval = interval;
150    }
151}