001/** 002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com). 003 * <p> 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * <p> 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * <p> 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package io.jboot.components.mq; 017 018import com.jfinal.kit.LogKit; 019import com.jfinal.log.Log; 020import io.jboot.Jboot; 021import io.jboot.components.serializer.JbootSerializer; 022import io.jboot.utils.NamedThreadPools; 023import io.jboot.utils.StrUtil; 024 025import java.util.*; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.CopyOnWriteArrayList; 028import java.util.concurrent.ExecutorService; 029 030 031public abstract class JbootmqBase implements Jbootmq { 032 033 private static final Log LOG = Log.getLog(JbootmqBase.class); 034 035 protected final JbootmqConfig config; 036 037 private List<JbootmqMessageListener> globalListeners = new CopyOnWriteArrayList<>(); 038 private Map<String, List<JbootmqMessageListener>> channelListeners = new ConcurrentHashMap<>(); 039 040 protected Set<String> channels = new HashSet<>(); 041 protected Set<String> syncReceiveMessageChannels = new HashSet<>(); 042 protected JbootSerializer serializer; 043 044 private ExecutorService threadPool = NamedThreadPools.newFixedThreadPool("jbootmq"); 045 046 public JbootmqBase(JbootmqConfig config) { 047 this.config = config; 048 String channelString = config.getChannel(); 049 if (StrUtil.isBlank(channelString)) { 050 return; 051 } 052 053 this.channels.addAll(StrUtil.splitToSet(channelString, ",")); 054 055 if (StrUtil.isNotBlank(config.getSyncRecevieMessageChannel())) { 056 this.syncReceiveMessageChannels.addAll(StrUtil.splitToSet(config.getSyncRecevieMessageChannel(), ",")); 057 } 058 } 059 060 061 @Override 062 public void addMessageListener(JbootmqMessageListener listener) { 063 globalListeners.add(listener); 064 } 065 066 067 @Override 068 public void addMessageListener(JbootmqMessageListener listener, String forChannel) { 069 String[] forChannels = forChannel.split(","); 070 for (String channel : forChannels) { 071 if (StrUtil.isNotBlank(channel)) { 072 addChannelListener(channel.trim(), listener); 073 } 074 } 075 } 076 077 public final synchronized void addChannelListener(String channel, JbootmqMessageListener listener) { 078 List<JbootmqMessageListener> listeners = channelListeners.get(channel); 079 if (listeners == null) { 080 listeners = new CopyOnWriteArrayList<>(); 081 channelListeners.put(channel, listeners); 082 } 083 listeners.add(listener); 084 channels.add(channel); 085 } 086 087 088 @Override 089 public void removeListener(JbootmqMessageListener listener) { 090 globalListeners.remove(listener); 091 for (List<JbootmqMessageListener> listeners : channelListeners.values()) { 092 listeners.remove(listener); 093 } 094 } 095 096 @Override 097 public void removeAllListeners() { 098 globalListeners.clear(); 099 channelListeners.forEach((s, list) -> list.clear()); 100 channelListeners.clear(); 101 } 102 103 104 @Override 105 public Collection<JbootmqMessageListener> getGlobalListeners() { 106 return globalListeners; 107 } 108 109 110 @Override 111 public Collection<JbootmqMessageListener> getListenersByChannel(String channel) { 112 return channelListeners.get(channel); 113 } 114 115 public void notifyListeners(String channel, Object message, MessageContext context) { 116 117 boolean globalResult = notifyListeners(channel, message, context, globalListeners); 118 boolean channelResult = notifyListeners(channel, message, context, channelListeners.get(channel)); 119 120 if (!globalResult && !channelResult) { 121 LOG.warn("Jboot has received mq message, But it has no listener to process. channel:" + 122 channel + " message:" + message); 123 } 124 } 125 126 127 protected boolean notifyListeners(String channel, Object message, MessageContext context, Collection<JbootmqMessageListener> listeners) { 128 if (listeners == null || listeners.size() == 0) { 129 return false; 130 } 131 132 if (syncReceiveMessageChannels.contains(channel)) { 133 for (JbootmqMessageListener listener : listeners) { 134 try { 135 listener.onMessage(channel, message, context); 136 } catch (Throwable ex) { 137 LOG.warn("listener[" + listener.getClass().getName() + "] execute mq message is error. channel:" + 138 channel + " message:" + message); 139 } 140 } 141 } else { 142 for (JbootmqMessageListener listener : listeners) { 143 threadPool.execute(() -> { 144 listener.onMessage(channel, message, context); 145 }); 146 } 147 } 148 149 return true; 150 } 151 152 153 public JbootSerializer getSerializer() { 154 if (serializer == null) { 155 serializer = StrUtil.isNotBlank(config.getSerializer()) 156 ? Jboot.getSerializer(config.getSerializer()) 157 : Jboot.getSerializer(); 158 } 159 return serializer; 160 } 161 162 163 protected boolean isStarted = false; 164 165 @Override 166 public boolean startListening() { 167 if (isStarted) { 168 return true; 169 } 170 171 if (channels == null || channels.isEmpty()) { 172 LogKit.warn("Jboot MQ started fail. because it's channels is empty, please config channels. " + 173 "MQ name: {}, type:{}", config.getName(), config.getType()); 174 return false; 175 } 176 177 try { 178 isStarted = true; 179 onStartListening(); 180 } catch (Exception ex) { 181 LogKit.error("Jboot MQ start fail!", ex); 182 isStarted = false; 183 return false; 184 } 185 186 return true; 187 } 188 189 190 @Override 191 public boolean stopListening() { 192 if (!isStarted) { 193 return true; 194 } 195 196 try { 197 isStarted = false; 198 onStopListening(); 199 } catch (Exception ex) { 200 LogKit.error("Jboot MQ stop fail!", ex); 201 isStarted = true; 202 return false; 203 } 204 205 return true; 206 } 207 208 public boolean isStarted() { 209 return isStarted; 210 } 211 212 protected abstract void onStartListening(); 213 214 protected abstract void onStopListening(); 215 216 217 @Override 218 public JbootmqConfig getConfig() { 219 return config; 220 } 221 222 public void setSerializer(JbootSerializer serializer) { 223 this.serializer = serializer; 224 } 225 226 public ExecutorService getThreadPool() { 227 return threadPool; 228 } 229 230 public void setThreadPool(ExecutorService threadPool) { 231 this.threadPool = threadPool; 232 } 233}