续谈ActiveMQ之java如何操作ActiveMQ(springBoot项目)

引入maven依赖
         <!-- activemq -->
		 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

为了便于管理mq这里统一在xml中配置:

<mq-clients>
	<producer>
		<id>demo.test</id>
		<topic>MQ_TEST</topic>
		<mq.type>1</mq.type>
		<delivery.mode>1</delivery.mode>
		<acknowledge>1</acknowledge>
	</producer>
	<producer>
		<id>demo.test2</id>
		<topic>MQ_TEST2</topic>
		<mq.type>2</mq.type>
		<delivery.mode>1</delivery.mode>
		<acknowledge>1</acknowledge>
	</producer>
	<consumer>
		<id>demo.consumer.test1</id>
		<topic>MQ_TEST</topic>
		<mq.type>1</mq.type>
		<message.listener>com.ule.microtopup.mq.listener.ActiveMQMessageListener</message.listener>
	</consumer>
	<consumer>
		<id>demo.consumer.test2</id>
		<topic>MQ_TEST2</topic>
		<mq.type>2</mq.type>
		<message.listener>com.ule.microtopup.mq.listener.ActiveMQMessageListener2</message.listener>
	</consumer>
</mq-clients>

XMLUtil:用来读取xml

/**
	 * 获取所有节点
	 * @param root 根节点
	 * @param map  记录每个节点及值
	 */
	@SuppressWarnings("unchecked")
	private static void getNode(Element root, LinkedHashMap<String, String> map) {
		List<Element> list = root.elements();
		Iterator<Element> iterator = list.iterator();
		while (iterator.hasNext()) {
			Element element = iterator.next();
			if (element.elements() != null && element.elements().size() > 0) {
				System.out.println("element:"+element.getName());
				getNode(element, map);

			} else {
				map.put(element.getParent().getName() + "." + element.getName(),
				        element.getTextTrim());
			}
		}

	}
	/**
	 * 读XML文件指定节点内容
	 * @param xmlName  xml文件名
	 * @param nodeName 指定节点
	 * @return
	 * @throws Exception
	 */
	public static Map<String, String> reader(String xmlName,String nodeName)throws Exception{
		if(StringUtils.isEmpty(xmlName)){
			throw new NullPointerException("xmlName cannot be null!");
		}
		
		LinkedHashMap<String, String> returnValue = new LinkedHashMap<String, String>();
		InputStream in = XMLUtil.class.getClassLoader().getResourceAsStream(xmlName);
		SAXReader reader = new SAXReader();
		Document document = reader.read(in);
		Element root = document.getRootElement();
		if(StringUtils.isNotEmpty(nodeName)){
			root = document.getRootElement().element(nodeName);
		}
		//获取节点
		getNode(root, returnValue);
		
		if (returnValue.size()>0) {
			for (String key : returnValue.keySet()) {
				System.out.println("key:" + key + " ,value:" + returnValue.get(key));
			}

		}
		return returnValue;
	}
	
	
	/**
	 * 读XML文件所有内容,并将文件转成对象
	 * @param xmlName 文件名
	 * @param cls
	 * @return
	 * @throws Exception
	 */
	@SuppressWarnings("unchecked")
    public static <T> T readerXmlToBean(String xmlName ,Class<?>...cls)throws Exception{
		if(StringUtils.isEmpty(xmlName)){
			throw new NullPointerException("xmlName cannot be null!");
		}
		InputStream in = XMLUtil.class.getClassLoader().getResourceAsStream(xmlName);
		JAXBContext context = JAXBContext.newInstance(cls);// 获取上下文对象  
        Unmarshaller unmarshaller = context.createUnmarshaller();
        T t =  (T)unmarshaller.unmarshal(in);
		return t;
	}

Producer:

@XmlRootElement(name="producer")  
public class Producer {
	private String id;
	// 主题
	private String topic;
	// 类型,1-queue,2-topic
	private Integer mqType;
	// 持久化方式 :1-非持久,2-持久化
	private Integer deliveryMode;
	// 签收方式:1-自动签收,2-客户端确认,3-自动批量确认,0-事务提交并确认
	private Integer acknowledge;

//省略get set
}

Consumer:

@XmlRootElement(name = "consumer")
public class Consumer {
	private String id;
	private String topic;
	private Integer mqType;
	private Class<? extends MessageListener> messageListener;
...
}

MessageUtil:mq消息集中处理类,包括发送消息,启动消费监听等

private static MqConnectionFactory mqFactory = MqConnectionFactory.INSTANCE;
	private static Connection conn = null;
	private static Session session = null;

	public static void init() {
		try {
			// 获取一个连接
			if (conn == null) {
				conn = mqFactory.getConnection();
			}
			conn.start();
			// 自动提交事务
			if (session == null) {
				/*
				 * Session.AUTO_ACKNOWLEDGE 消息自动签收
				 * Session.CLIENT_ACKNOWLEDGE 客戶端调用acknowledge方法手动签收
				 * Session.DUPS_OK_ACKNOWLEDGE 不是必须签收,消息可能会重复发送。在第二次重新传送消息的时候,消息
				 * 头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。
				 */
				session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 
	 * @param obj 序列化对象
	 * @param topic
	 * @param isQueue
	 * @throws Exception
	 */
	public static void sendObjectMessage(Serializable obj, String id)
	        throws Exception {
		init();
		Producer p = getProducerById(id);
		MessageProducer producer = getMessageProducer(getDestination(p), p.getDeliveryMode());
		producer.send(session.createObjectMessage(obj));
		destroy(producer);
	}

	private static Producer getProducerById(String id) {
		Producer p = MQUtil.getProducerById(id);
		if (p == null) {
			throw new NullPointerException("according to id:" + id + ", not found produer.");
		}
		return p;
	}

	public static void sendTextMessage(String mes, String id)
	        throws Exception {
		init();
		Producer p = getProducerById(id);
		MessageProducer producer = getMessageProducer(getDestination(p), p.getDeliveryMode());
		producer.send(session.createTextMessage(mes));
		destroy(producer);
	}

	private static MessageProducer getMessageProducer(Destination destination, Integer deliveryMode)
	        throws Exception {
		MessageProducer producer = session.createProducer(destination);
		/**
		 * PERSISTENT(持久性消息):
		 * 这是ActiveMQ的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。
		 * 可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。这意味着在持久性消息传送至目标时,
		 * 消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,
		 * 它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。
		 * NON_PERSISTENT(非持久性消息):
		 * 保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。
		 * 此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。
		 * 
		 */
		producer.setDeliveryMode(deliveryMode);
		return producer;
	}

	private static Destination getDestination(Producer p) throws Exception {
		return getDestination(p.getMqType(), p.getTopic());
	}

	private static Destination getDestination(Consumer c) throws Exception {
		return getDestination(c.getMqType(), c.getTopic());
	}

	private static Destination getDestination(Integer mqType, String topic) throws Exception {
		Destination destination = null;
		if (ActiveMqType.QUEUE == mqType)
			destination = session.createQueue(topic);
		else if (ActiveMqType.TOPIC == mqType)
			destination = session.createTopic(topic);
		else
			throw new IllegalArgumentException("mqType must be 1 or 2.");
		return destination;
	}
	/**
	 * 启动所有监听
	 * @param c
	 * @throws Exception
	 */
	public static void startConsumer(Consumer c) throws Exception {
		init();
		MessageConsumer consumer = session.createConsumer(MessageUtil.getDestination(c));
		MessageListener listener = c.getMessageListener().newInstance();
		consumer.setMessageListener(listener);
	}

	private static void destroy(MessageProducer producer) throws JMSException {
		if (producer != null) {
			producer.close();
		}
		if (session != null) {
			session.close();
			session = null;
		}
		if (conn != null) {
			conn.close();
			conn = null;
		}
	}

	public static void destroy(MessageConsumer consumer) throws JMSException {
		if (consumer != null) {
			consumer.close();
			consumer = null;
		}
		if (session != null) {
			session.close();
			session = null;
		}
		if (conn != null) {
			conn.close();
			conn = null;
		}
	}

细节不在赘述,具体代码已上传至码云:https://gitee.com/savage_xiao/boot.demo/tree/master

有兴趣可以下载下来看一下,其中有包含其他springboot的研究

测试代码:

public static void main(String[] args) {
		try {
			for(int i = 101; i<200;i++){
				MessageUtil.sendTextMessage("hello world!"+","+(i+1), "demo.test");
			}
        } catch (Exception e) {
	        e.printStackTrace();
        }
	}

-----------------------

public static void main(String[] args) {
		try {
			for(int i = 0; i<100;i++){
				MessageUtil.sendTextMessage("hello world2!"+","+(i+1), "demo.test2");
			}
        } catch (Exception e) {
	        e.printStackTrace();
        }
	}

测试结果:

listener2:hello world2!,1
listener2:hello world2!,2
listener2:hello world2!,3
listener2:hello world2!,4
listener2:hello world2!,5
....
....略
listener2:hello world2!,98
listener2:hello world2!,99
listener2:hello world2!,100
listener:hello world!,102
listener:hello world!,103
...
...略
listener:hello world!,197
listener:hello world!,198
listener:hello world!,199
listener:hello world!,200

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏蓝天

IDL编译器实现入门

本文不对词法和语法、以及flex和bison进行介绍,如有需要,可以阅读《RPC的实现》。本文试图用直接的方式,以最短的篇幅介绍一个最简单的IDL编译器实现。...

25530
来自专栏Python研发

Django之Model世界

django为使用一种新的方式,即:关系对象映射(Object Relational Mapping,简称ORM)

15420
来自专栏与神兽党一起成长

jFinal路由解析源码分析

jFinal的路由解析是在JFinalFilter中做的,这个Filter也需要在web.xml中配置。JFinalFilter实现了javax.servlet...

17020
来自专栏Java架构师学习

带你深入了解Java线程中的那些事

引言 说到Thread大家都很熟悉,我们平常写并发代码的时候都会接触到,那么我们来看看下面这段代码是如何初始化以及执行的呢? public class Thre...

35080
来自专栏Hongten

java开发_UUID(Universally Unique Identifier,全局唯一标识符)和GUID(Globally Unique Identifier,全球唯一标识符)

GUID: 即Globally Unique Identifier(全球唯一标识符) 也称作 UUID(Universally Unique IDentifie...

12710
来自专栏坚毅的PHP

my php & mysql FAQ

php中文字符串长度及定长截取问题使用str_len("中国") 结果为6,php系统默认一个中文字符长度为3,可改用mb_strlen函数获得长度,mb_su...

39860
来自专栏MasiMaro 的技术博文

Windows平台下的内存泄漏检测

在C/C++中内存泄漏是一个不可避免的问题,很多新手甚至有许多老手也会犯这样的错误,下面说明一下在windows平台下如何检测内存泄漏。 在windows平...

26320
来自专栏知了

ijst:基于反射的 C++ JSON 反序列化库

ijst (iJsonStruct) 一个是 C++ Json 序列化/反序列化库:

35550
来自专栏Jed的技术阶梯

zookeeper案例之4个逻辑思维训练小题目

14120
来自专栏haifeiWu与他朋友们的专栏

造个轮子之基于 Netty 实现自己的 RPC 框架

服务端开发都会或多或少的涉及到 RPC 的使用,当然如果止步于会用,对自己的成长很是不利,所以楼主今天本着知其然,且知其所以然的精神来探讨一下 RPC 这个东西...

16730

扫码关注云+社区

领取腾讯云代金券