001/**
002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com).
003 * <p>
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * <p>
008 * http://www.apache.org/licenses/LICENSE-2.0
009 * <p>
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package io.jboot.components.mq.qpidmq;
017
018import com.jfinal.log.Log;
019import io.jboot.Jboot;
020import io.jboot.utils.ConfigUtil;
021import io.jboot.components.mq.Jbootmq;
022import io.jboot.components.mq.JbootmqBase;
023import io.jboot.components.mq.JbootmqConfig;
024import io.jboot.exception.JbootException;
025import io.jboot.exception.JbootIllegalConfigException;
026import io.jboot.utils.ArrayUtil;
027import io.jboot.utils.StrUtil;
028import org.apache.qpid.client.AMQAnyDestination;
029import org.apache.qpid.client.AMQConnection;
030import org.apache.qpid.jms.Connection;
031
032import javax.jms.*;
033import java.util.Map;
034
035/**
036 * @author 徐海峰 (27533892@qq.com)
037 * @version V1.0
038 */
039public class JbootQpidmqImpl extends JbootmqBase implements Jbootmq {
040
041    private static final Log LOG = Log.getLog(JbootQpidmqImpl.class);
042
043    private Connection connection = null;
044    private boolean serializerEnable = true;
045    private JbootQpidmqConfig qpidConfig = null;
046
047    private Thread queueThread;
048    private Thread topicThread;
049
050    public JbootQpidmqImpl(JbootmqConfig config) {
051        super(config);
052
053        String typeName = config.getTypeName();
054        if (StrUtil.isNotBlank(typeName)) {
055            Map<String, JbootQpidmqConfig> configModels = ConfigUtil.getConfigModels(JbootQpidmqConfig.class);
056            if (!configModels.containsKey(typeName)) {
057                throw new JbootIllegalConfigException("Please config \"jboot.mq.qpid." + typeName + ".host\" in your jboot.properties.");
058            }
059            qpidConfig = configModels.get(typeName);
060        } else {
061            qpidConfig = Jboot.config(JbootQpidmqConfig.class);
062        }
063
064
065        serializerEnable = qpidConfig.isSerializerEnable();
066
067        try {
068            String url = getConnectionUrl();
069            connection = new AMQConnection(url);
070            connection.start();
071
072        } catch (Exception e) {
073            throw new JbootException("can not connection qpidmq server", e);
074        }
075    }
076
077    @Override
078    protected void onStartListening() {
079        try {
080            startReceiveMsgThread();
081        } catch (Exception e) {
082            throw new JbootException(e.toString(), e);
083        }
084    }
085
086    @Override
087    protected void onStopListening() {
088        queueThread.interrupt();
089        topicThread.interrupt();
090    }
091
092
093    @Override
094    public void enqueue(Object message, String toChannel) {
095        String addr = getQueueAddr(toChannel);
096        sendMsg(addr, message);
097    }
098
099    @Override
100    public void publish(Object message, String toChannel) {
101        String addr = getTopicAddr(toChannel);
102        sendMsg(addr, message);
103    }
104
105    public void sendMsg(String addr, Object message) {
106        try {
107            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
108            Destination destination = new AMQAnyDestination(addr.toString());
109            MessageProducer producer = session.createProducer(destination);
110            producer.setTimeToLive(30000);
111
112            Message sendMsg = null;
113            if (message instanceof Message) {
114                sendMsg = (Message) message;
115            } else if (!serializerEnable) {
116                sendMsg = session.createTextMessage((String) message);
117            } else {
118                byte[] data = getSerializer().serialize(message);
119                sendMsg = session.createBytesMessage();
120                sendMsg.setIntProperty("data-len", data.length);
121                ((BytesMessage) sendMsg).writeBytes(data);
122            }
123
124            producer.send(sendMsg);
125
126        } catch (Exception e) {
127            LOG.error(e.toString(), e);
128        }
129    }
130
131    public String getConnectionUrl() {
132        StringBuffer url = new StringBuffer();
133        url.append("amqp://");
134        url.append(qpidConfig.getUsername());
135        url.append(":");
136        url.append(qpidConfig.getPassword());
137        url.append("@");
138        url.append("/");
139        url.append(qpidConfig.getVirtualHost());
140        url.append("?failover='roundrobin'");
141        url.append("&brokerlist='");
142
143        String host = qpidConfig.getHost();
144        String[] hosts = host.split(",");
145        for (String h : hosts) {
146            if (StrUtil.isBlank(h)) {
147                continue;
148            }
149            url.append("tcp://" + h + ";");
150        }
151
152        url.append("'");
153
154        return url.toString();
155    }
156
157    public String getQueueAddr(String channel) {
158        StringBuffer addr = new StringBuffer();
159        addr.append("ADDR:");
160        addr.append(channel);
161        addr.append(";{create:always}");
162
163        return addr.toString();
164    }
165
166    public String getTopicAddr(String channel) {
167        StringBuffer addr = new StringBuffer();
168        addr.append("ADDR:amq.topic/");
169        addr.append(channel);
170
171        return addr.toString();
172    }
173
174    public void startReceiveMsgThread() throws Exception {
175        if (ArrayUtil.isNullOrEmpty(this.channels)) {
176            return;
177        }
178
179        for (String channel : this.channels) {
180            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
181
182            String queueAddr = getQueueAddr(channel);
183            Destination queue = new AMQAnyDestination(queueAddr);
184            MessageConsumer queueConsumer = session.createConsumer(queue);
185            queueThread = new Thread(new ReceiveMsgThread(queueConsumer, channel, serializerEnable));
186            queueThread.start();
187
188            String topicAddr = getTopicAddr(channel);
189            Destination topic = new AMQAnyDestination(topicAddr);
190            MessageConsumer topicConsumer = session.createConsumer(topic);
191            topicThread = new Thread(new ReceiveMsgThread(topicConsumer, channel, serializerEnable));
192            topicThread.start();
193        }
194    }
195
196    private class ReceiveMsgThread implements Runnable {
197        private MessageConsumer consumer;
198        private String channel;
199        private boolean serializerEnable;
200
201        public ReceiveMsgThread(MessageConsumer consumer, String channel, boolean serializerEnable) {
202            this.consumer = consumer;
203            this.channel = channel;
204            this.serializerEnable = serializerEnable;
205        }
206
207        @Override
208        public void run() {
209            try {
210                while (isStarted) {
211                    Message message = consumer.receive();
212                    if (message == null) {
213                        continue;
214                    }
215
216                    Object object = null;
217                    if (!serializerEnable) {
218                        TextMessage textMessage = (TextMessage) message;
219                        object = textMessage.getText();
220                    } else {
221                        BytesMessage bytesMessage = (BytesMessage) message;
222                        int dataLen = bytesMessage.getIntProperty("data-len");
223                        byte[] data = new byte[dataLen];
224                        if (dataLen != bytesMessage.readBytes(data)) {
225                            continue;
226                        }
227                        object = getSerializer().deserialize(data);
228                    }
229
230                    if (object != null) {
231                        notifyListeners(channel, object, new QpidmqMessageContext(JbootQpidmqImpl.this, message));
232                    }
233                }
234            } catch (Exception e) {
235                LOG.error(e.toString(), e);
236            }
237        }
238    }
239}