001/** 002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com). 003 * <p> 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * <p> 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * <p> 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package io.jboot.components.mq.rabbitmq; 017 018import com.rabbitmq.client.*; 019import io.jboot.Jboot; 020import io.jboot.app.JbootApplicationConfig; 021import io.jboot.utils.ConfigUtil; 022import io.jboot.components.mq.Jbootmq; 023import io.jboot.components.mq.JbootmqBase; 024import io.jboot.components.mq.JbootmqConfig; 025import io.jboot.exception.JbootException; 026import io.jboot.exception.JbootIllegalConfigException; 027import io.jboot.utils.StrUtil; 028 029import java.io.IOException; 030import java.util.Map; 031import java.util.concurrent.ConcurrentHashMap; 032 033/** 034 * doc : http://www.rabbitmq.com/api-guide.html 035 */ 036public class JbootRabbitmqImpl extends JbootmqBase implements Jbootmq { 037 038 039 private Connection connection; 040 private Map<String, Channel> channelMap = new ConcurrentHashMap<>(); 041 042 private JbootRabbitmqConfig rabbitmqConfig; 043 private JbootApplicationConfig appConfig; 044 045 046 public JbootRabbitmqImpl(JbootmqConfig config) { 047 super(config); 048 049 String typeName = config.getTypeName(); 050 if (StrUtil.isNotBlank(typeName)) { 051 Map<String, JbootRabbitmqConfig> configModels = ConfigUtil.getConfigModels(JbootRabbitmqConfig.class); 052 if (!configModels.containsKey(typeName)) { 053 throw new JbootIllegalConfigException("Please config \"jboot.mq.rabbitmq." + typeName + ".host\" in your jboot.properties."); 054 } 055 rabbitmqConfig = configModels.get(typeName); 056 } else { 057 rabbitmqConfig = Jboot.config(JbootRabbitmqConfig.class); 058 } 059 060 try { 061 ConnectionFactory factory = new ConnectionFactory(); 062 factory.setHost(rabbitmqConfig.getHost()); 063 factory.setPort(rabbitmqConfig.getPort()); 064 065 if (StrUtil.isNotBlank(rabbitmqConfig.getVirtualHost())) { 066 factory.setVirtualHost(rabbitmqConfig.getVirtualHost()); 067 } 068 069 if (StrUtil.isNotBlank(rabbitmqConfig.getUsername())) { 070 factory.setUsername(rabbitmqConfig.getUsername()); 071 } 072 073 if (StrUtil.isNotBlank(rabbitmqConfig.getPassword())) { 074 factory.setPassword(rabbitmqConfig.getPassword()); 075 } 076 077 connection = factory.newConnection(); 078 079 } catch (Exception e) { 080 throw new JbootException("Can not connection rabbitmq server", e); 081 } 082 } 083 084 085 @Override 086 protected void onStartListening() { 087 for (String toChannel : channels) { 088 089 //广播通道 090 if (rabbitmqConfig.isBroadcastEnable()) { 091 Channel broadcastChannel = getChannel(toChannel, false); 092 bindChannel(broadcastChannel, buildBroadcastChannelName(toChannel), toChannel); 093 } 094 095 //队列通道 096 if (rabbitmqConfig.isQueueEnable()) { 097 final Channel queueChannel = getChannel(toChannel, true); 098 bindChannel(queueChannel, toChannel, toChannel); 099 } 100 } 101 } 102 103 @Override 104 protected void onStopListening() { 105 connection.abort(); 106 } 107 108 109 public void bindChannel(Channel channel, String name, String orginaChannelName) { 110 if (channel != null) { 111 try { 112 channel.basicConsume(name, rabbitmqConfig.isAutoAck(), new DefaultConsumer(channel) { 113 @Override 114 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 115 Object o = getSerializer().deserialize(body); 116 notifyListeners(orginaChannelName, o, new RabbitmqMessageContext(JbootRabbitmqImpl.this, channel, consumerTag, envelope, properties)); 117 } 118 }); 119 } catch (IOException e) { 120 e.printStackTrace(); 121 } 122 } 123 } 124 125 126 public synchronized Channel getChannel(String toChannel, boolean queueMode) { 127 Channel channel = channelMap.get(toChannel + queueMode); 128 if (channel == null) { 129 try { 130 channel = connection.createChannel(); 131 132 //队列模式,只需要创建 队列就可以了,不需要定义交换机 133 if (queueMode) { 134 channel.queueDeclare(toChannel 135 , rabbitmqConfig.isQueueDeclareDurable() 136 , rabbitmqConfig.isQueueDeclareExclusive() 137 , rabbitmqConfig.isQueueDeclareAutoDelete() 138 , null); 139 } 140 141 //广播模式,需要定义交换机,发送者直接把消息发送到交换机里 142 else { 143 channel.queueDeclare(buildBroadcastChannelName(toChannel) 144 , rabbitmqConfig.isBroadcastQueueDeclareDurable() 145 , rabbitmqConfig.isBroadcastQueueDeclareExclusive() 146 , rabbitmqConfig.isBroadcastQueueDeclareAutoDelete() 147 , null); 148 149 150 BuiltinExchangeType exchangeType = BuiltinExchangeType.FANOUT; 151 for (BuiltinExchangeType type : BuiltinExchangeType.values()) { 152 if (type.getType().equals(rabbitmqConfig.getBroadcastExchangeDeclareExchangeType())) { 153 exchangeType = type; 154 } 155 } 156 channel.exchangeDeclare(toChannel, exchangeType, rabbitmqConfig.isBroadcastExchangeDeclareDurable()); 157 channel.queueBind(buildBroadcastChannelName(toChannel), toChannel, rabbitmqConfig.getBroadcastChannelRoutingKey()); 158 } 159 160 } catch (Exception ex) { 161 throw new JbootException("Can not create rabbit mq channel.", ex); 162 } 163 164 channelMap.put(toChannel + queueMode, channel); 165 } 166 167 return channel; 168 } 169 170 public String buildBroadcastChannelName(String channel) { 171 return rabbitmqConfig.getBroadcastChannelPrefix() + channel; 172 } 173 174 175 @Override 176 public void enqueue(Object message, String toChannel) { 177 Channel channel = getChannel(toChannel, true); 178 try { 179 byte[] bytes = getSerializer().serialize(message); 180 channel.basicPublish("", toChannel, MessageProperties.BASIC, bytes); 181 } catch (IOException e) { 182 e.printStackTrace(); 183 } 184 } 185 186 187 @Override 188 public void publish(Object message, String toChannel) { 189 Channel channel = getChannel(toChannel, false); 190 try { 191 byte[] bytes = getSerializer().serialize(message); 192 channel.basicPublish(toChannel, "", MessageProperties.BASIC, bytes); 193 } catch (IOException e) { 194 e.printStackTrace(); 195 } 196 } 197 198 199}