001/**
002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com).
003 * <p>
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * <p>
008 * http://www.apache.org/licenses/LICENSE-2.0
009 * <p>
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package io.jboot.components.mq.rocketmq;
017
018import com.jfinal.log.Log;
019import io.jboot.Jboot;
020import io.jboot.components.mq.Jbootmq;
021import io.jboot.components.mq.JbootmqBase;
022import io.jboot.components.mq.JbootmqConfig;
023import io.jboot.exception.JbootIllegalConfigException;
024import io.jboot.utils.ConfigUtil;
025import io.jboot.utils.StrUtil;
026import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
027import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
028import org.apache.rocketmq.client.exception.MQClientException;
029import org.apache.rocketmq.client.producer.DefaultMQProducer;
030import org.apache.rocketmq.client.producer.MQProducer;
031import org.apache.rocketmq.client.producer.SendResult;
032import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
033import org.apache.rocketmq.common.message.Message;
034import org.apache.rocketmq.common.message.MessageExt;
035import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
036
037import java.util.Map;
038
039
040public class JbootRocketmqImpl extends JbootmqBase implements Jbootmq {
041
042    private static final Log LOG = Log.getLog(JbootRocketmqImpl.class);
043    private JbootRocketmqConfig rocketmqConfig;
044    private MQProducer mqProducer;
045    private DefaultMQPushConsumer queueConsumer;
046    private DefaultMQPushConsumer broadcastConsumer;
047
048    public JbootRocketmqImpl(JbootmqConfig config) {
049        super(config);
050
051        String typeName = config.getTypeName();
052        if (StrUtil.isNotBlank(typeName)) {
053            Map<String, JbootRocketmqConfig> configModels = ConfigUtil.getConfigModels(JbootRocketmqConfig.class);
054            if (!configModels.containsKey(typeName)) {
055                throw new JbootIllegalConfigException("Please config \"jboot.mq.rocket." + typeName + ".namesrvAddr\" in your jboot.properties.");
056            }
057            rocketmqConfig = configModels.get(typeName);
058        } else {
059            rocketmqConfig = Jboot.config(JbootRocketmqConfig.class);
060        }
061    }
062
063    @Override
064    protected void onStartListening() {
065        try {
066            startQueueConsumer();
067            startBroadcastConsumer();
068        } catch (MQClientException e) {
069            LOG.error(e.toString(), e);
070        }
071    }
072
073    @Override
074    protected void onStopListening() {
075        if (queueConsumer != null) {
076            queueConsumer.shutdown();
077        }
078
079        if (broadcastConsumer != null) {
080            broadcastConsumer.shutdown();
081        }
082    }
083
084
085    public void startQueueConsumer() throws MQClientException {
086        // 实例化消费者
087        queueConsumer = new DefaultMQPushConsumer(rocketmqConfig.getConsumerGroup());
088
089        // 设置NameServer的地址
090        queueConsumer.setNamesrvAddr(rocketmqConfig.getNamesrvAddr());
091        queueConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
092
093        if (StrUtil.isNotBlank(rocketmqConfig.getNamespace())) {
094            queueConsumer.setNamespace(rocketmqConfig.getNamespace());
095        }
096
097        if (rocketmqConfig.getConsumeMessageBatchMaxSize() != null) {
098            queueConsumer.setConsumeMessageBatchMaxSize(rocketmqConfig.getConsumeMessageBatchMaxSize());
099        }
100
101        for (String channel : channels) {
102            queueConsumer.subscribe(channel, rocketmqConfig.getSubscribeSubExpression());
103        }
104
105        // 注册回调实现类来处理从broker拉取回来的消息
106        queueConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
107            RokectmqMessageContext msgContext = new RokectmqMessageContext(this, msgs, context);
108            if (msgs != null && !msgs.isEmpty()) {
109                for (MessageExt messageExt : msgs) {
110                    notifyListeners(messageExt.getTopic(), getSerializer().deserialize(messageExt.getBody()), msgContext);
111                }
112            }
113
114            return msgContext.getReturnStatus();
115        });
116
117
118        queueConsumer.start();
119
120    }
121
122
123    public void startBroadcastConsumer() throws MQClientException {
124        // 实例化消费者
125        broadcastConsumer = new DefaultMQPushConsumer(rocketmqConfig.getBroadcastChannelPrefix() + rocketmqConfig.getConsumerGroup());
126
127        // 设置NameServer的地址
128        broadcastConsumer.setNamesrvAddr(rocketmqConfig.getNamesrvAddr());
129
130        if (StrUtil.isNotBlank(rocketmqConfig.getNamespace())) {
131            broadcastConsumer.setNamespace(rocketmqConfig.getNamespace());
132        }
133
134        broadcastConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
135        broadcastConsumer.setMessageModel(MessageModel.BROADCASTING);
136
137        if (rocketmqConfig.getConsumeMessageBatchMaxSize() != null) {
138            broadcastConsumer.setConsumeMessageBatchMaxSize(rocketmqConfig.getConsumeMessageBatchMaxSize());
139        }
140
141        for (String channel : channels) {
142            broadcastConsumer.subscribe(rocketmqConfig.getBroadcastChannelPrefix() + channel, rocketmqConfig.getSubscribeSubExpression());
143        }
144
145        final int len = rocketmqConfig.getBroadcastChannelPrefix().length();
146        broadcastConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
147            RokectmqMessageContext rokectMqMessageInfo = new RokectmqMessageContext(this, msgs, context);
148            if (msgs != null && !msgs.isEmpty()) {
149                for (MessageExt messageExt : msgs) {
150                    String topic = messageExt.getTopic();
151                    notifyListeners(topic.substring(len), getSerializer().deserialize(messageExt.getBody()), rokectMqMessageInfo);
152                }
153            }
154            return rokectMqMessageInfo.getReturnStatus();
155        });
156
157        broadcastConsumer.start();
158    }
159
160
161    @Override
162    public void enqueue(Object message, String toChannel) {
163        doSendMessage(message, toChannel);
164    }
165
166
167    @Override
168    public void publish(Object message, String toChannel) {
169        doSendMessage(message, rocketmqConfig.getBroadcastChannelPrefix() + toChannel);
170    }
171
172
173    public void doSendMessage(Object message, String topic) {
174        try {
175            Message sendMsg = null;
176            if (message instanceof Message) {
177                sendMsg = (Message) message;
178            } else {
179                sendMsg = new Message(topic, getSerializer().serialize(message));
180            }
181
182            SendResult result = getMQProducer().send(sendMsg);
183            // if (result.getSendStatus() != SendStatus.SEND_OK) {
184            // 只要不等于 null 就是发送成功
185            if (result == null) {
186                LOG.warn("Rockect mq send message fail!!!");
187            }
188        } catch (Exception e) {
189            LOG.error(e.toString(), e);
190        }
191    }
192
193
194    public MQProducer getMQProducer() throws MQClientException {
195        if (mqProducer == null) {
196            synchronized (this) {
197                if (mqProducer == null) {
198                    createMqProducer();
199                }
200            }
201        }
202        return mqProducer;
203    }
204
205
206    public void createMqProducer() throws MQClientException {
207        DefaultMQProducer producer = new DefaultMQProducer(rocketmqConfig.getProducerGroup());
208        producer.setNamesrvAddr(rocketmqConfig.getNamesrvAddr());
209
210        if (StrUtil.isNotBlank(rocketmqConfig.getNamespace())) {
211            producer.setNamespace(rocketmqConfig.getNamespace());
212        }
213
214        if (StrUtil.isNotBlank(rocketmqConfig.getInstanceName())) {
215            producer.setInstanceName(rocketmqConfig.getInstanceName());
216        }
217
218        if (StrUtil.isNotBlank(rocketmqConfig.getClientIP())) {
219            producer.setClientIP(rocketmqConfig.getClientIP());
220        }
221
222        if (StrUtil.isNotBlank(rocketmqConfig.getCreateTopicKey())) {
223            producer.setCreateTopicKey(rocketmqConfig.getCreateTopicKey());
224        }
225
226        if (rocketmqConfig.getUseTLS() != null) {
227            producer.setUseTLS(rocketmqConfig.getUseTLS());
228        }
229
230        if (rocketmqConfig.getSendLatencyFaultEnable() != null) {
231            producer.setSendLatencyFaultEnable(rocketmqConfig.getSendLatencyFaultEnable());
232        }
233
234        if (rocketmqConfig.getSendMessageWithVIPChannel() != null) {
235            producer.setSendMessageWithVIPChannel(rocketmqConfig.getSendMessageWithVIPChannel());
236        }
237
238        if (rocketmqConfig.getSendMsgTimeout() != null) {
239            producer.setSendMsgTimeout(rocketmqConfig.getSendMsgTimeout());
240        }
241
242        if (rocketmqConfig.getRetryAnotherBrokerWhenNotStoreOK() != null) {
243            producer.setRetryAnotherBrokerWhenNotStoreOK(rocketmqConfig.getRetryAnotherBrokerWhenNotStoreOK());
244        }
245
246        if (rocketmqConfig.getRetryTimesWhenSendAsyncFailed() != null) {
247            producer.setRetryTimesWhenSendAsyncFailed(rocketmqConfig.getRetryTimesWhenSendAsyncFailed());
248        }
249
250        if (rocketmqConfig.getRetryTimesWhenSendFailed() != null) {
251            producer.setRetryTimesWhenSendFailed(rocketmqConfig.getRetryTimesWhenSendFailed());
252        }
253
254        mqProducer = producer;
255        producer.start();
256    }
257}
258
259