001/**
002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com).
003 * <p>
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * <p>
008 * http://www.apache.org/licenses/LICENSE-2.0
009 * <p>
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package io.jboot.components.mq.aliyunmq;
017
018import com.aliyun.openservices.ons.api.*;
019import com.jfinal.log.Log;
020import io.jboot.Jboot;
021import io.jboot.components.mq.Jbootmq;
022import io.jboot.components.mq.JbootmqBase;
023import io.jboot.components.mq.JbootmqConfig;
024import io.jboot.exception.JbootIllegalConfigException;
025import io.jboot.utils.ConfigUtil;
026import io.jboot.utils.StrUtil;
027
028import java.util.Map;
029import java.util.Properties;
030
031
032public class JbootAliyunmqImpl extends JbootmqBase implements Jbootmq {
033    private static final Log LOG = Log.getLog(JbootAliyunmqImpl.class);
034
035    private Producer producer;
036    private Consumer consumer;
037    private JbootAliyunmqConfig aliyunmqConfig;
038
039    public JbootAliyunmqImpl(JbootmqConfig config) {
040        super(config);
041        String typeName = config.getTypeName();
042        if (StrUtil.isNotBlank(typeName)) {
043            Map<String, JbootAliyunmqConfig> configModels = ConfigUtil.getConfigModels(JbootAliyunmqConfig.class);
044            if (!configModels.containsKey(typeName)) {
045                throw new JbootIllegalConfigException("Please config \"jboot.mq.aliyun." + typeName + ".addr\" in your jboot.properties.");
046            }
047            aliyunmqConfig = configModels.get(typeName);
048        } else {
049            aliyunmqConfig = Jboot.config(JbootAliyunmqConfig.class);
050        }
051    }
052
053
054    @Override
055    protected void onStartListening() {
056        startQueueConsumer();
057        startBroadCastConsumer();
058    }
059
060    @Override
061    protected void onStopListening() {
062        if (consumer != null) {
063            consumer.shutdown();
064            consumer = null;
065        }
066    }
067
068
069    public void startQueueConsumer() {
070        Properties properties = createProperties();
071        consumer = ONSFactory.createConsumer(properties);
072        for (String channel : channels) {
073            consumer.subscribe(aliyunmqConfig.getBroadcastChannelPrefix() + channel, aliyunmqConfig.getSubscribeSubExpression(), (message, consumeContext) -> {
074                AliyunmqMessageContext context = new AliyunmqMessageContext(this, message, consumeContext);
075                notifyListeners(channel, getSerializer().deserialize(message.getBody())
076                        , context);
077                return context.getReturnAction();
078            });
079        }
080        consumer.start();
081    }
082
083
084    public void startBroadCastConsumer() {
085        Properties properties = createProperties();
086        properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
087        consumer = ONSFactory.createConsumer(properties);
088        for (String channel : channels) {
089            consumer.subscribe(channel, aliyunmqConfig.getSubscribeSubExpression(), (message, consumeContext) -> {
090                AliyunmqMessageContext aliyunmqMessageInfo = new AliyunmqMessageContext(this, message, consumeContext);
091                notifyListeners(channel, getSerializer().deserialize(message.getBody())
092                        , aliyunmqMessageInfo);
093                return aliyunmqMessageInfo.getReturnAction();
094            });
095        }
096        consumer.start();
097    }
098
099
100    @Override
101    public void enqueue(Object message, String toChannel) {
102        Message sendMsg = null;
103        if (message instanceof Message) {
104            sendMsg = (Message) message;
105        } else {
106            byte[] bytes = getSerializer().serialize(message);
107            sendMsg = new Message(toChannel, "*", bytes);
108        }
109        SendResult result = getProducer().send(sendMsg);
110        if (result == null) {
111            LOG.warn("Rockect mq send message fail!!!");
112        }
113    }
114
115
116    @Override
117    public void publish(Object message, String toChannel) {
118        Message sendMsg = null;
119        if (message instanceof Message) {
120            sendMsg = (Message) message;
121        } else {
122            byte[] bytes = getSerializer().serialize(message);
123            sendMsg = new Message(aliyunmqConfig.getBroadcastChannelPrefix() + toChannel, "*", bytes);
124        }
125        SendResult result = getProducer().send(sendMsg);
126        if (result == null) {
127            LOG.warn("Rockect mq send message fail!!!");
128        }
129    }
130
131
132    public Producer getProducer() {
133        if (producer == null) {
134            synchronized (this) {
135                if (producer == null) {
136                    createProducer();
137                }
138            }
139        }
140        return producer;
141    }
142
143
144    public void createProducer() {
145        Properties properties = createProperties();
146        producer = ONSFactory.createProducer(properties);
147        producer.start();
148    }
149
150
151    public Properties createProperties() {
152
153        Properties properties = new Properties();
154        properties.put(PropertyKeyConst.AccessKey, aliyunmqConfig.getAccessKey());//AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
155        properties.put(PropertyKeyConst.SecretKey, aliyunmqConfig.getSecretKey());//SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
156        properties.put(PropertyKeyConst.ProducerId, aliyunmqConfig.getProducerId());//您在控制台创建的Producer ID
157        properties.put(PropertyKeyConst.NAMESRV_ADDR, aliyunmqConfig.getAddr());
158        properties.put(PropertyKeyConst.InstanceName, aliyunmqConfig.getInstanceName());
159        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, aliyunmqConfig.getSendMsgTimeoutMillis());//设置发送超时时间,单位毫秒
160
161
162        return properties;
163    }
164}