001/**
002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com).
003 * <p>
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * <p>
008 * http://www.apache.org/licenses/LICENSE-2.0
009 * <p>
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package io.jboot.components.mq.rabbitmq;
017
018import com.rabbitmq.client.*;
019import io.jboot.Jboot;
020import io.jboot.app.JbootApplicationConfig;
021import io.jboot.utils.ConfigUtil;
022import io.jboot.components.mq.Jbootmq;
023import io.jboot.components.mq.JbootmqBase;
024import io.jboot.components.mq.JbootmqConfig;
025import io.jboot.exception.JbootException;
026import io.jboot.exception.JbootIllegalConfigException;
027import io.jboot.utils.StrUtil;
028
029import java.io.IOException;
030import java.util.Map;
031import java.util.concurrent.ConcurrentHashMap;
032
033/**
034 * doc : http://www.rabbitmq.com/api-guide.html
035 */
036public class JbootRabbitmqImpl extends JbootmqBase implements Jbootmq {
037
038
039    private Connection connection;
040    private Map<String, Channel> channelMap = new ConcurrentHashMap<>();
041
042    private JbootRabbitmqConfig rabbitmqConfig;
043    private JbootApplicationConfig appConfig;
044
045
046    public JbootRabbitmqImpl(JbootmqConfig config) {
047        super(config);
048
049        String typeName = config.getTypeName();
050        if (StrUtil.isNotBlank(typeName)) {
051            Map<String, JbootRabbitmqConfig> configModels = ConfigUtil.getConfigModels(JbootRabbitmqConfig.class);
052            if (!configModels.containsKey(typeName)) {
053                throw new JbootIllegalConfigException("Please config \"jboot.mq.rabbitmq." + typeName + ".host\" in your jboot.properties.");
054            }
055            rabbitmqConfig = configModels.get(typeName);
056        } else {
057            rabbitmqConfig = Jboot.config(JbootRabbitmqConfig.class);
058        }
059
060        try {
061            ConnectionFactory factory = new ConnectionFactory();
062            factory.setHost(rabbitmqConfig.getHost());
063            factory.setPort(rabbitmqConfig.getPort());
064
065            if (StrUtil.isNotBlank(rabbitmqConfig.getVirtualHost())) {
066                factory.setVirtualHost(rabbitmqConfig.getVirtualHost());
067            }
068
069            if (StrUtil.isNotBlank(rabbitmqConfig.getUsername())) {
070                factory.setUsername(rabbitmqConfig.getUsername());
071            }
072
073            if (StrUtil.isNotBlank(rabbitmqConfig.getPassword())) {
074                factory.setPassword(rabbitmqConfig.getPassword());
075            }
076
077            connection = factory.newConnection();
078
079        } catch (Exception e) {
080            throw new JbootException("Can not connection rabbitmq server", e);
081        }
082    }
083
084
085    @Override
086    protected void onStartListening() {
087        for (String toChannel : channels) {
088
089            //广播通道
090            if (rabbitmqConfig.isBroadcastEnable()) {
091                Channel broadcastChannel = getChannel(toChannel, false);
092                bindChannel(broadcastChannel, buildBroadcastChannelName(toChannel), toChannel);
093            }
094
095            //队列通道
096            if (rabbitmqConfig.isQueueEnable()) {
097                final Channel queueChannel = getChannel(toChannel, true);
098                bindChannel(queueChannel, toChannel, toChannel);
099            }
100        }
101    }
102
103    @Override
104    protected void onStopListening() {
105        connection.abort();
106    }
107
108
109    public void bindChannel(Channel channel, String name, String orginaChannelName) {
110        if (channel != null) {
111            try {
112                channel.basicConsume(name, rabbitmqConfig.isAutoAck(), new DefaultConsumer(channel) {
113                    @Override
114                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
115                        Object o = getSerializer().deserialize(body);
116                        notifyListeners(orginaChannelName, o, new RabbitmqMessageContext(JbootRabbitmqImpl.this, channel, consumerTag, envelope, properties));
117                    }
118                });
119            } catch (IOException e) {
120                e.printStackTrace();
121            }
122        }
123    }
124
125
126    public synchronized Channel getChannel(String toChannel, boolean queueMode) {
127        Channel channel = channelMap.get(toChannel + queueMode);
128        if (channel == null) {
129            try {
130                channel = connection.createChannel();
131
132                //队列模式,只需要创建 队列就可以了,不需要定义交换机
133                if (queueMode) {
134                    channel.queueDeclare(toChannel
135                            , rabbitmqConfig.isQueueDeclareDurable()
136                            , rabbitmqConfig.isQueueDeclareExclusive()
137                            , rabbitmqConfig.isQueueDeclareAutoDelete()
138                            , null);
139                }
140
141                //广播模式,需要定义交换机,发送者直接把消息发送到交换机里
142                else {
143                    channel.queueDeclare(buildBroadcastChannelName(toChannel)
144                            , rabbitmqConfig.isBroadcastQueueDeclareDurable()
145                            , rabbitmqConfig.isBroadcastQueueDeclareExclusive()
146                            , rabbitmqConfig.isBroadcastQueueDeclareAutoDelete()
147                            , null);
148
149
150                    BuiltinExchangeType exchangeType = BuiltinExchangeType.FANOUT;
151                    for (BuiltinExchangeType type : BuiltinExchangeType.values()) {
152                        if (type.getType().equals(rabbitmqConfig.getBroadcastExchangeDeclareExchangeType())) {
153                            exchangeType = type;
154                        }
155                    }
156                    channel.exchangeDeclare(toChannel, exchangeType, rabbitmqConfig.isBroadcastExchangeDeclareDurable());
157                    channel.queueBind(buildBroadcastChannelName(toChannel), toChannel, rabbitmqConfig.getBroadcastChannelRoutingKey());
158                }
159
160            } catch (Exception ex) {
161                throw new JbootException("Can not create rabbit mq channel.", ex);
162            }
163
164            channelMap.put(toChannel + queueMode, channel);
165        }
166
167        return channel;
168    }
169
170    public String buildBroadcastChannelName(String channel) {
171        return rabbitmqConfig.getBroadcastChannelPrefix() + channel;
172    }
173
174
175    @Override
176    public void enqueue(Object message, String toChannel) {
177        Channel channel = getChannel(toChannel, true);
178        try {
179            byte[] bytes = getSerializer().serialize(message);
180            channel.basicPublish("", toChannel, MessageProperties.BASIC, bytes);
181        } catch (IOException e) {
182            e.printStackTrace();
183        }
184    }
185
186
187    @Override
188    public void publish(Object message, String toChannel) {
189        Channel channel = getChannel(toChannel, false);
190        try {
191            byte[] bytes = getSerializer().serialize(message);
192            channel.basicPublish(toChannel, "", MessageProperties.BASIC, bytes);
193        } catch (IOException e) {
194            e.printStackTrace();
195        }
196    }
197
198
199}