001/**
002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com).
003 * <p>
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * <p>
008 * http://www.apache.org/licenses/LICENSE-2.0
009 * <p>
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package io.jboot.components.mq;
017
018import com.jfinal.kit.LogKit;
019import com.jfinal.log.Log;
020import io.jboot.Jboot;
021import io.jboot.components.serializer.JbootSerializer;
022import io.jboot.utils.NamedThreadPools;
023import io.jboot.utils.StrUtil;
024
025import java.util.*;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.CopyOnWriteArrayList;
028import java.util.concurrent.ExecutorService;
029
030
031public abstract class JbootmqBase implements Jbootmq {
032
033    private static final Log LOG = Log.getLog(JbootmqBase.class);
034
035    protected final JbootmqConfig config;
036
037    private List<JbootmqMessageListener> globalListeners = new CopyOnWriteArrayList<>();
038    private Map<String, List<JbootmqMessageListener>> channelListeners = new ConcurrentHashMap<>();
039
040    protected Set<String> channels = new HashSet<>();
041    protected Set<String> syncReceiveMessageChannels = new HashSet<>();
042    protected JbootSerializer serializer;
043
044    private ExecutorService threadPool = NamedThreadPools.newFixedThreadPool("jbootmq");
045
046    public JbootmqBase(JbootmqConfig config) {
047        this.config = config;
048        String channelString = config.getChannel();
049        if (StrUtil.isBlank(channelString)) {
050            return;
051        }
052
053        this.channels.addAll(StrUtil.splitToSet(channelString, ","));
054
055        if (StrUtil.isNotBlank(config.getSyncRecevieMessageChannel())) {
056            this.syncReceiveMessageChannels.addAll(StrUtil.splitToSet(config.getSyncRecevieMessageChannel(), ","));
057        }
058    }
059
060
061    @Override
062    public void addMessageListener(JbootmqMessageListener listener) {
063        globalListeners.add(listener);
064    }
065
066
067    @Override
068    public void addMessageListener(JbootmqMessageListener listener, String forChannel) {
069        String[] forChannels = forChannel.split(",");
070        for (String channel : forChannels) {
071            if (StrUtil.isNotBlank(channel)) {
072                addChannelListener(channel.trim(), listener);
073            }
074        }
075    }
076
077    public final synchronized void addChannelListener(String channel, JbootmqMessageListener listener) {
078        List<JbootmqMessageListener> listeners = channelListeners.get(channel);
079        if (listeners == null) {
080            listeners = new CopyOnWriteArrayList<>();
081            channelListeners.put(channel, listeners);
082        }
083        listeners.add(listener);
084        channels.add(channel);
085    }
086
087
088    @Override
089    public void removeListener(JbootmqMessageListener listener) {
090        globalListeners.remove(listener);
091        for (List<JbootmqMessageListener> listeners : channelListeners.values()) {
092            listeners.remove(listener);
093        }
094    }
095
096    @Override
097    public void removeAllListeners() {
098        globalListeners.clear();
099        channelListeners.forEach((s, list) -> list.clear());
100        channelListeners.clear();
101    }
102
103
104    @Override
105    public Collection<JbootmqMessageListener> getGlobalListeners() {
106        return globalListeners;
107    }
108
109
110    @Override
111    public Collection<JbootmqMessageListener> getListenersByChannel(String channel) {
112        return channelListeners.get(channel);
113    }
114
115    public void notifyListeners(String channel, Object message, MessageContext context) {
116
117        boolean globalResult = notifyListeners(channel, message, context, globalListeners);
118        boolean channelResult = notifyListeners(channel, message, context, channelListeners.get(channel));
119
120        if (!globalResult && !channelResult) {
121            LOG.warn("Jboot has received mq message, But it has no listener to process. channel:" +
122                    channel + "  message:" + message);
123        }
124    }
125
126
127    protected boolean notifyListeners(String channel, Object message, MessageContext context, Collection<JbootmqMessageListener> listeners) {
128        if (listeners == null || listeners.size() == 0) {
129            return false;
130        }
131
132        if (syncReceiveMessageChannels.contains(channel)) {
133            for (JbootmqMessageListener listener : listeners) {
134                try {
135                    listener.onMessage(channel, message, context);
136                } catch (Throwable ex) {
137                    LOG.warn("listener[" + listener.getClass().getName() + "] execute mq message is error. channel:" +
138                            channel + "  message:" + message);
139                }
140            }
141        } else {
142            for (JbootmqMessageListener listener : listeners) {
143                threadPool.execute(() -> {
144                    listener.onMessage(channel, message, context);
145                });
146            }
147        }
148
149        return true;
150    }
151
152
153    public JbootSerializer getSerializer() {
154        if (serializer == null) {
155            serializer = StrUtil.isNotBlank(config.getSerializer())
156                    ? Jboot.getSerializer(config.getSerializer())
157                    : Jboot.getSerializer();
158        }
159        return serializer;
160    }
161
162
163    protected boolean isStarted = false;
164
165    @Override
166    public boolean startListening() {
167        if (isStarted) {
168            return true;
169        }
170
171        if (channels == null || channels.isEmpty()) {
172            LogKit.warn("Jboot MQ started fail. because it's channels is empty, please config channels. " +
173                    "MQ name: {}, type:{}", config.getName(), config.getType());
174            return false;
175        }
176
177        try {
178            isStarted = true;
179            onStartListening();
180        } catch (Exception ex) {
181            LogKit.error("Jboot MQ start fail!", ex);
182            isStarted = false;
183            return false;
184        }
185
186        return true;
187    }
188
189
190    @Override
191    public boolean stopListening() {
192        if (!isStarted) {
193            return true;
194        }
195
196        try {
197            isStarted = false;
198            onStopListening();
199        } catch (Exception ex) {
200            LogKit.error("Jboot MQ stop fail!", ex);
201            isStarted = true;
202            return false;
203        }
204
205        return true;
206    }
207
208    public boolean isStarted() {
209        return isStarted;
210    }
211
212    protected abstract void onStartListening();
213
214    protected abstract void onStopListening();
215
216
217    @Override
218    public JbootmqConfig getConfig() {
219        return config;
220    }
221
222    public void setSerializer(JbootSerializer serializer) {
223        this.serializer = serializer;
224    }
225
226    public ExecutorService getThreadPool() {
227        return threadPool;
228    }
229
230    public void setThreadPool(ExecutorService threadPool) {
231        this.threadPool = threadPool;
232    }
233}