001/** 002 * Copyright (c) 2015-2022, Michael Yang 杨福海 (fuhai999@gmail.com). 003 * <p> 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * <p> 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * <p> 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package io.jboot.components.mq.qpidmq; 017 018import com.jfinal.log.Log; 019import io.jboot.Jboot; 020import io.jboot.utils.ConfigUtil; 021import io.jboot.components.mq.Jbootmq; 022import io.jboot.components.mq.JbootmqBase; 023import io.jboot.components.mq.JbootmqConfig; 024import io.jboot.exception.JbootException; 025import io.jboot.exception.JbootIllegalConfigException; 026import io.jboot.utils.ArrayUtil; 027import io.jboot.utils.StrUtil; 028import org.apache.qpid.client.AMQAnyDestination; 029import org.apache.qpid.client.AMQConnection; 030import org.apache.qpid.jms.Connection; 031 032import javax.jms.*; 033import java.util.Map; 034 035/** 036 * @author 徐海峰 (27533892@qq.com) 037 * @version V1.0 038 */ 039public class JbootQpidmqImpl extends JbootmqBase implements Jbootmq { 040 041 private static final Log LOG = Log.getLog(JbootQpidmqImpl.class); 042 043 private Connection connection = null; 044 private boolean serializerEnable = true; 045 private JbootQpidmqConfig qpidConfig = null; 046 047 private Thread queueThread; 048 private Thread topicThread; 049 050 public JbootQpidmqImpl(JbootmqConfig config) { 051 super(config); 052 053 String typeName = config.getTypeName(); 054 if (StrUtil.isNotBlank(typeName)) { 055 Map<String, JbootQpidmqConfig> configModels = ConfigUtil.getConfigModels(JbootQpidmqConfig.class); 056 if (!configModels.containsKey(typeName)) { 057 throw new JbootIllegalConfigException("Please config \"jboot.mq.qpid." + typeName + ".host\" in your jboot.properties."); 058 } 059 qpidConfig = configModels.get(typeName); 060 } else { 061 qpidConfig = Jboot.config(JbootQpidmqConfig.class); 062 } 063 064 065 serializerEnable = qpidConfig.isSerializerEnable(); 066 067 try { 068 String url = getConnectionUrl(); 069 connection = new AMQConnection(url); 070 connection.start(); 071 072 } catch (Exception e) { 073 throw new JbootException("can not connection qpidmq server", e); 074 } 075 } 076 077 @Override 078 protected void onStartListening() { 079 try { 080 startReceiveMsgThread(); 081 } catch (Exception e) { 082 throw new JbootException(e.toString(), e); 083 } 084 } 085 086 @Override 087 protected void onStopListening() { 088 queueThread.interrupt(); 089 topicThread.interrupt(); 090 } 091 092 093 @Override 094 public void enqueue(Object message, String toChannel) { 095 String addr = getQueueAddr(toChannel); 096 sendMsg(addr, message); 097 } 098 099 @Override 100 public void publish(Object message, String toChannel) { 101 String addr = getTopicAddr(toChannel); 102 sendMsg(addr, message); 103 } 104 105 public void sendMsg(String addr, Object message) { 106 try { 107 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 108 Destination destination = new AMQAnyDestination(addr.toString()); 109 MessageProducer producer = session.createProducer(destination); 110 producer.setTimeToLive(30000); 111 112 Message sendMsg = null; 113 if (message instanceof Message) { 114 sendMsg = (Message) message; 115 } else if (!serializerEnable) { 116 sendMsg = session.createTextMessage((String) message); 117 } else { 118 byte[] data = getSerializer().serialize(message); 119 sendMsg = session.createBytesMessage(); 120 sendMsg.setIntProperty("data-len", data.length); 121 ((BytesMessage) sendMsg).writeBytes(data); 122 } 123 124 producer.send(sendMsg); 125 126 } catch (Exception e) { 127 LOG.error(e.toString(), e); 128 } 129 } 130 131 public String getConnectionUrl() { 132 StringBuffer url = new StringBuffer(); 133 url.append("amqp://"); 134 url.append(qpidConfig.getUsername()); 135 url.append(":"); 136 url.append(qpidConfig.getPassword()); 137 url.append("@"); 138 url.append("/"); 139 url.append(qpidConfig.getVirtualHost()); 140 url.append("?failover='roundrobin'"); 141 url.append("&brokerlist='"); 142 143 String host = qpidConfig.getHost(); 144 String[] hosts = host.split(","); 145 for (String h : hosts) { 146 if (StrUtil.isBlank(h)) { 147 continue; 148 } 149 url.append("tcp://" + h + ";"); 150 } 151 152 url.append("'"); 153 154 return url.toString(); 155 } 156 157 public String getQueueAddr(String channel) { 158 StringBuffer addr = new StringBuffer(); 159 addr.append("ADDR:"); 160 addr.append(channel); 161 addr.append(";{create:always}"); 162 163 return addr.toString(); 164 } 165 166 public String getTopicAddr(String channel) { 167 StringBuffer addr = new StringBuffer(); 168 addr.append("ADDR:amq.topic/"); 169 addr.append(channel); 170 171 return addr.toString(); 172 } 173 174 public void startReceiveMsgThread() throws Exception { 175 if (ArrayUtil.isNullOrEmpty(this.channels)) { 176 return; 177 } 178 179 for (String channel : this.channels) { 180 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 181 182 String queueAddr = getQueueAddr(channel); 183 Destination queue = new AMQAnyDestination(queueAddr); 184 MessageConsumer queueConsumer = session.createConsumer(queue); 185 queueThread = new Thread(new ReceiveMsgThread(queueConsumer, channel, serializerEnable)); 186 queueThread.start(); 187 188 String topicAddr = getTopicAddr(channel); 189 Destination topic = new AMQAnyDestination(topicAddr); 190 MessageConsumer topicConsumer = session.createConsumer(topic); 191 topicThread = new Thread(new ReceiveMsgThread(topicConsumer, channel, serializerEnable)); 192 topicThread.start(); 193 } 194 } 195 196 private class ReceiveMsgThread implements Runnable { 197 private MessageConsumer consumer; 198 private String channel; 199 private boolean serializerEnable; 200 201 public ReceiveMsgThread(MessageConsumer consumer, String channel, boolean serializerEnable) { 202 this.consumer = consumer; 203 this.channel = channel; 204 this.serializerEnable = serializerEnable; 205 } 206 207 @Override 208 public void run() { 209 try { 210 while (isStarted) { 211 Message message = consumer.receive(); 212 if (message == null) { 213 continue; 214 } 215 216 Object object = null; 217 if (!serializerEnable) { 218 TextMessage textMessage = (TextMessage) message; 219 object = textMessage.getText(); 220 } else { 221 BytesMessage bytesMessage = (BytesMessage) message; 222 int dataLen = bytesMessage.getIntProperty("data-len"); 223 byte[] data = new byte[dataLen]; 224 if (dataLen != bytesMessage.readBytes(data)) { 225 continue; 226 } 227 object = getSerializer().deserialize(data); 228 } 229 230 if (object != null) { 231 notifyListeners(channel, object, new QpidmqMessageContext(JbootQpidmqImpl.this, message)); 232 } 233 } 234 } catch (Exception e) { 235 LOG.error(e.toString(), e); 236 } 237 } 238 } 239}