001/** 002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com). 003 * <p> 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * <p> 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * <p> 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package io.jboot.components.mq.aliyunmq; 017 018import com.aliyun.openservices.ons.api.*; 019import com.jfinal.log.Log; 020import io.jboot.Jboot; 021import io.jboot.components.mq.Jbootmq; 022import io.jboot.components.mq.JbootmqBase; 023import io.jboot.components.mq.JbootmqConfig; 024import io.jboot.exception.JbootIllegalConfigException; 025import io.jboot.utils.ConfigUtil; 026import io.jboot.utils.StrUtil; 027 028import java.util.Map; 029import java.util.Properties; 030 031 032public class JbootAliyunmqImpl extends JbootmqBase implements Jbootmq { 033 private static final Log LOG = Log.getLog(JbootAliyunmqImpl.class); 034 035 private Producer producer; 036 private Consumer consumer; 037 private JbootAliyunmqConfig aliyunmqConfig; 038 039 public JbootAliyunmqImpl(JbootmqConfig config) { 040 super(config); 041 String typeName = config.getTypeName(); 042 if (StrUtil.isNotBlank(typeName)) { 043 Map<String, JbootAliyunmqConfig> configModels = ConfigUtil.getConfigModels(JbootAliyunmqConfig.class); 044 if (!configModels.containsKey(typeName)) { 045 throw new JbootIllegalConfigException("Please config \"jboot.mq.aliyun." + typeName + ".addr\" in your jboot.properties."); 046 } 047 aliyunmqConfig = configModels.get(typeName); 048 } else { 049 aliyunmqConfig = Jboot.config(JbootAliyunmqConfig.class); 050 } 051 } 052 053 054 @Override 055 protected void onStartListening() { 056 startQueueConsumer(); 057 startBroadCastConsumer(); 058 } 059 060 @Override 061 protected void onStopListening() { 062 if (consumer != null) { 063 consumer.shutdown(); 064 consumer = null; 065 } 066 } 067 068 069 public void startQueueConsumer() { 070 Properties properties = createProperties(); 071 consumer = ONSFactory.createConsumer(properties); 072 for (String channel : channels) { 073 consumer.subscribe(aliyunmqConfig.getBroadcastChannelPrefix() + channel, aliyunmqConfig.getSubscribeSubExpression(), (message, consumeContext) -> { 074 AliyunmqMessageContext context = new AliyunmqMessageContext(this, message, consumeContext); 075 notifyListeners(channel, getSerializer().deserialize(message.getBody()) 076 , context); 077 return context.getReturnAction(); 078 }); 079 } 080 consumer.start(); 081 } 082 083 084 public void startBroadCastConsumer() { 085 Properties properties = createProperties(); 086 properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING); 087 consumer = ONSFactory.createConsumer(properties); 088 for (String channel : channels) { 089 consumer.subscribe(channel, aliyunmqConfig.getSubscribeSubExpression(), (message, consumeContext) -> { 090 AliyunmqMessageContext aliyunmqMessageInfo = new AliyunmqMessageContext(this, message, consumeContext); 091 notifyListeners(channel, getSerializer().deserialize(message.getBody()) 092 , aliyunmqMessageInfo); 093 return aliyunmqMessageInfo.getReturnAction(); 094 }); 095 } 096 consumer.start(); 097 } 098 099 100 @Override 101 public void enqueue(Object message, String toChannel) { 102 Message sendMsg = null; 103 if (message instanceof Message) { 104 sendMsg = (Message) message; 105 } else { 106 byte[] bytes = getSerializer().serialize(message); 107 sendMsg = new Message(toChannel, "*", bytes); 108 } 109 SendResult result = getProducer().send(sendMsg); 110 if (result == null) { 111 LOG.warn("Rockect mq send message fail!!!"); 112 } 113 } 114 115 116 @Override 117 public void publish(Object message, String toChannel) { 118 Message sendMsg = null; 119 if (message instanceof Message) { 120 sendMsg = (Message) message; 121 } else { 122 byte[] bytes = getSerializer().serialize(message); 123 sendMsg = new Message(aliyunmqConfig.getBroadcastChannelPrefix() + toChannel, "*", bytes); 124 } 125 SendResult result = getProducer().send(sendMsg); 126 if (result == null) { 127 LOG.warn("Rockect mq send message fail!!!"); 128 } 129 } 130 131 132 public Producer getProducer() { 133 if (producer == null) { 134 synchronized (this) { 135 if (producer == null) { 136 createProducer(); 137 } 138 } 139 } 140 return producer; 141 } 142 143 144 public void createProducer() { 145 Properties properties = createProperties(); 146 producer = ONSFactory.createProducer(properties); 147 producer.start(); 148 } 149 150 151 public Properties createProperties() { 152 153 Properties properties = new Properties(); 154 properties.put(PropertyKeyConst.AccessKey, aliyunmqConfig.getAccessKey());//AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建 155 properties.put(PropertyKeyConst.SecretKey, aliyunmqConfig.getSecretKey());//SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建 156 properties.put(PropertyKeyConst.ProducerId, aliyunmqConfig.getProducerId());//您在控制台创建的Producer ID 157 properties.put(PropertyKeyConst.NAMESRV_ADDR, aliyunmqConfig.getAddr()); 158 properties.put(PropertyKeyConst.InstanceName, aliyunmqConfig.getInstanceName()); 159 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, aliyunmqConfig.getSendMsgTimeoutMillis());//设置发送超时时间,单位毫秒 160 161 162 return properties; 163 } 164}