引入maven依赖
<!-- activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
为了便于管理mq这里统一在xml中配置:
<mq-clients>
<producer>
<id>demo.test</id>
<topic>MQ_TEST</topic>
<mq.type>1</mq.type>
<delivery.mode>1</delivery.mode>
<acknowledge>1</acknowledge>
</producer>
<producer>
<id>demo.test2</id>
<topic>MQ_TEST2</topic>
<mq.type>2</mq.type>
<delivery.mode>1</delivery.mode>
<acknowledge>1</acknowledge>
</producer>
<consumer>
<id>demo.consumer.test1</id>
<topic>MQ_TEST</topic>
<mq.type>1</mq.type>
<message.listener>com.ule.microtopup.mq.listener.ActiveMQMessageListener</message.listener>
</consumer>
<consumer>
<id>demo.consumer.test2</id>
<topic>MQ_TEST2</topic>
<mq.type>2</mq.type>
<message.listener>com.ule.microtopup.mq.listener.ActiveMQMessageListener2</message.listener>
</consumer>
</mq-clients>
XMLUtil:用来读取xml
/**
* 获取所有节点
* @param root 根节点
* @param map 记录每个节点及值
*/
@SuppressWarnings("unchecked")
private static void getNode(Element root, LinkedHashMap<String, String> map) {
List<Element> list = root.elements();
Iterator<Element> iterator = list.iterator();
while (iterator.hasNext()) {
Element element = iterator.next();
if (element.elements() != null && element.elements().size() > 0) {
System.out.println("element:"+element.getName());
getNode(element, map);
} else {
map.put(element.getParent().getName() + "." + element.getName(),
element.getTextTrim());
}
}
}
/**
* 读XML文件指定节点内容
* @param xmlName xml文件名
* @param nodeName 指定节点
* @return
* @throws Exception
*/
public static Map<String, String> reader(String xmlName,String nodeName)throws Exception{
if(StringUtils.isEmpty(xmlName)){
throw new NullPointerException("xmlName cannot be null!");
}
LinkedHashMap<String, String> returnValue = new LinkedHashMap<String, String>();
InputStream in = XMLUtil.class.getClassLoader().getResourceAsStream(xmlName);
SAXReader reader = new SAXReader();
Document document = reader.read(in);
Element root = document.getRootElement();
if(StringUtils.isNotEmpty(nodeName)){
root = document.getRootElement().element(nodeName);
}
//获取节点
getNode(root, returnValue);
if (returnValue.size()>0) {
for (String key : returnValue.keySet()) {
System.out.println("key:" + key + " ,value:" + returnValue.get(key));
}
}
return returnValue;
}
/**
* 读XML文件所有内容,并将文件转成对象
* @param xmlName 文件名
* @param cls
* @return
* @throws Exception
*/
@SuppressWarnings("unchecked")
public static <T> T readerXmlToBean(String xmlName ,Class<?>...cls)throws Exception{
if(StringUtils.isEmpty(xmlName)){
throw new NullPointerException("xmlName cannot be null!");
}
InputStream in = XMLUtil.class.getClassLoader().getResourceAsStream(xmlName);
JAXBContext context = JAXBContext.newInstance(cls);// 获取上下文对象
Unmarshaller unmarshaller = context.createUnmarshaller();
T t = (T)unmarshaller.unmarshal(in);
return t;
}
Producer:
@XmlRootElement(name="producer")
public class Producer {
private String id;
// 主题
private String topic;
// 类型,1-queue,2-topic
private Integer mqType;
// 持久化方式 :1-非持久,2-持久化
private Integer deliveryMode;
// 签收方式:1-自动签收,2-客户端确认,3-自动批量确认,0-事务提交并确认
private Integer acknowledge;
//省略get set
}
Consumer:
@XmlRootElement(name = "consumer")
public class Consumer {
private String id;
private String topic;
private Integer mqType;
private Class<? extends MessageListener> messageListener;
...
}
MessageUtil:mq消息集中处理类,包括发送消息,启动消费监听等
private static MqConnectionFactory mqFactory = MqConnectionFactory.INSTANCE;
private static Connection conn = null;
private static Session session = null;
public static void init() {
try {
// 获取一个连接
if (conn == null) {
conn = mqFactory.getConnection();
}
conn.start();
// 自动提交事务
if (session == null) {
/*
* Session.AUTO_ACKNOWLEDGE 消息自动签收
* Session.CLIENT_ACKNOWLEDGE 客戶端调用acknowledge方法手动签收
* Session.DUPS_OK_ACKNOWLEDGE 不是必须签收,消息可能会重复发送。在第二次重新传送消息的时候,消息
* 头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。
*/
session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* @param obj 序列化对象
* @param topic
* @param isQueue
* @throws Exception
*/
public static void sendObjectMessage(Serializable obj, String id)
throws Exception {
init();
Producer p = getProducerById(id);
MessageProducer producer = getMessageProducer(getDestination(p), p.getDeliveryMode());
producer.send(session.createObjectMessage(obj));
destroy(producer);
}
private static Producer getProducerById(String id) {
Producer p = MQUtil.getProducerById(id);
if (p == null) {
throw new NullPointerException("according to id:" + id + ", not found produer.");
}
return p;
}
public static void sendTextMessage(String mes, String id)
throws Exception {
init();
Producer p = getProducerById(id);
MessageProducer producer = getMessageProducer(getDestination(p), p.getDeliveryMode());
producer.send(session.createTextMessage(mes));
destroy(producer);
}
private static MessageProducer getMessageProducer(Destination destination, Integer deliveryMode)
throws Exception {
MessageProducer producer = session.createProducer(destination);
/**
* PERSISTENT(持久性消息):
* 这是ActiveMQ的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。
* 可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。这意味着在持久性消息传送至目标时,
* 消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,
* 它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。
* NON_PERSISTENT(非持久性消息):
* 保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。
* 此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。
*
*/
producer.setDeliveryMode(deliveryMode);
return producer;
}
private static Destination getDestination(Producer p) throws Exception {
return getDestination(p.getMqType(), p.getTopic());
}
private static Destination getDestination(Consumer c) throws Exception {
return getDestination(c.getMqType(), c.getTopic());
}
private static Destination getDestination(Integer mqType, String topic) throws Exception {
Destination destination = null;
if (ActiveMqType.QUEUE == mqType)
destination = session.createQueue(topic);
else if (ActiveMqType.TOPIC == mqType)
destination = session.createTopic(topic);
else
throw new IllegalArgumentException("mqType must be 1 or 2.");
return destination;
}
/**
* 启动所有监听
* @param c
* @throws Exception
*/
public static void startConsumer(Consumer c) throws Exception {
init();
MessageConsumer consumer = session.createConsumer(MessageUtil.getDestination(c));
MessageListener listener = c.getMessageListener().newInstance();
consumer.setMessageListener(listener);
}
private static void destroy(MessageProducer producer) throws JMSException {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
session = null;
}
if (conn != null) {
conn.close();
conn = null;
}
}
public static void destroy(MessageConsumer consumer) throws JMSException {
if (consumer != null) {
consumer.close();
consumer = null;
}
if (session != null) {
session.close();
session = null;
}
if (conn != null) {
conn.close();
conn = null;
}
}
细节不在赘述,具体代码已上传至码云:https://gitee.com/savage_xiao/boot.demo/tree/master
有兴趣可以下载下来看一下,其中有包含其他springboot的研究
测试代码:
public static void main(String[] args) {
try {
for(int i = 101; i<200;i++){
MessageUtil.sendTextMessage("hello world!"+","+(i+1), "demo.test");
}
} catch (Exception e) {
e.printStackTrace();
}
}
-----------------------
public static void main(String[] args) {
try {
for(int i = 0; i<100;i++){
MessageUtil.sendTextMessage("hello world2!"+","+(i+1), "demo.test2");
}
} catch (Exception e) {
e.printStackTrace();
}
}
测试结果:
listener2:hello world2!,1
listener2:hello world2!,2
listener2:hello world2!,3
listener2:hello world2!,4
listener2:hello world2!,5
....
....略
listener2:hello world2!,98
listener2:hello world2!,99
listener2:hello world2!,100
listener:hello world!,102
listener:hello world!,103
...
...略
listener:hello world!,197
listener:hello world!,198
listener:hello world!,199
listener:hello world!,200