专栏首页大大的微笑续谈ActiveMQ之java如何操作ActiveMQ(springBoot项目)

续谈ActiveMQ之java如何操作ActiveMQ(springBoot项目)

引入maven依赖
         <!-- activemq -->
		 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

为了便于管理mq这里统一在xml中配置:

<mq-clients>
	<producer>
		<id>demo.test</id>
		<topic>MQ_TEST</topic>
		<mq.type>1</mq.type>
		<delivery.mode>1</delivery.mode>
		<acknowledge>1</acknowledge>
	</producer>
	<producer>
		<id>demo.test2</id>
		<topic>MQ_TEST2</topic>
		<mq.type>2</mq.type>
		<delivery.mode>1</delivery.mode>
		<acknowledge>1</acknowledge>
	</producer>
	<consumer>
		<id>demo.consumer.test1</id>
		<topic>MQ_TEST</topic>
		<mq.type>1</mq.type>
		<message.listener>com.ule.microtopup.mq.listener.ActiveMQMessageListener</message.listener>
	</consumer>
	<consumer>
		<id>demo.consumer.test2</id>
		<topic>MQ_TEST2</topic>
		<mq.type>2</mq.type>
		<message.listener>com.ule.microtopup.mq.listener.ActiveMQMessageListener2</message.listener>
	</consumer>
</mq-clients>

XMLUtil:用来读取xml

/**
	 * 获取所有节点
	 * @param root 根节点
	 * @param map  记录每个节点及值
	 */
	@SuppressWarnings("unchecked")
	private static void getNode(Element root, LinkedHashMap<String, String> map) {
		List<Element> list = root.elements();
		Iterator<Element> iterator = list.iterator();
		while (iterator.hasNext()) {
			Element element = iterator.next();
			if (element.elements() != null && element.elements().size() > 0) {
				System.out.println("element:"+element.getName());
				getNode(element, map);

			} else {
				map.put(element.getParent().getName() + "." + element.getName(),
				        element.getTextTrim());
			}
		}

	}
	/**
	 * 读XML文件指定节点内容
	 * @param xmlName  xml文件名
	 * @param nodeName 指定节点
	 * @return
	 * @throws Exception
	 */
	public static Map<String, String> reader(String xmlName,String nodeName)throws Exception{
		if(StringUtils.isEmpty(xmlName)){
			throw new NullPointerException("xmlName cannot be null!");
		}
		
		LinkedHashMap<String, String> returnValue = new LinkedHashMap<String, String>();
		InputStream in = XMLUtil.class.getClassLoader().getResourceAsStream(xmlName);
		SAXReader reader = new SAXReader();
		Document document = reader.read(in);
		Element root = document.getRootElement();
		if(StringUtils.isNotEmpty(nodeName)){
			root = document.getRootElement().element(nodeName);
		}
		//获取节点
		getNode(root, returnValue);
		
		if (returnValue.size()>0) {
			for (String key : returnValue.keySet()) {
				System.out.println("key:" + key + " ,value:" + returnValue.get(key));
			}

		}
		return returnValue;
	}
	
	
	/**
	 * 读XML文件所有内容,并将文件转成对象
	 * @param xmlName 文件名
	 * @param cls
	 * @return
	 * @throws Exception
	 */
	@SuppressWarnings("unchecked")
    public static <T> T readerXmlToBean(String xmlName ,Class<?>...cls)throws Exception{
		if(StringUtils.isEmpty(xmlName)){
			throw new NullPointerException("xmlName cannot be null!");
		}
		InputStream in = XMLUtil.class.getClassLoader().getResourceAsStream(xmlName);
		JAXBContext context = JAXBContext.newInstance(cls);// 获取上下文对象  
        Unmarshaller unmarshaller = context.createUnmarshaller();
        T t =  (T)unmarshaller.unmarshal(in);
		return t;
	}

Producer:

@XmlRootElement(name="producer")  
public class Producer {
	private String id;
	// 主题
	private String topic;
	// 类型,1-queue,2-topic
	private Integer mqType;
	// 持久化方式 :1-非持久,2-持久化
	private Integer deliveryMode;
	// 签收方式:1-自动签收,2-客户端确认,3-自动批量确认,0-事务提交并确认
	private Integer acknowledge;

//省略get set
}

Consumer:

@XmlRootElement(name = "consumer")
public class Consumer {
	private String id;
	private String topic;
	private Integer mqType;
	private Class<? extends MessageListener> messageListener;
...
}

MessageUtil:mq消息集中处理类,包括发送消息,启动消费监听等

private static MqConnectionFactory mqFactory = MqConnectionFactory.INSTANCE;
	private static Connection conn = null;
	private static Session session = null;

	public static void init() {
		try {
			// 获取一个连接
			if (conn == null) {
				conn = mqFactory.getConnection();
			}
			conn.start();
			// 自动提交事务
			if (session == null) {
				/*
				 * Session.AUTO_ACKNOWLEDGE 消息自动签收
				 * Session.CLIENT_ACKNOWLEDGE 客戶端调用acknowledge方法手动签收
				 * Session.DUPS_OK_ACKNOWLEDGE 不是必须签收,消息可能会重复发送。在第二次重新传送消息的时候,消息
				 * 头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。
				 */
				session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 
	 * @param obj 序列化对象
	 * @param topic
	 * @param isQueue
	 * @throws Exception
	 */
	public static void sendObjectMessage(Serializable obj, String id)
	        throws Exception {
		init();
		Producer p = getProducerById(id);
		MessageProducer producer = getMessageProducer(getDestination(p), p.getDeliveryMode());
		producer.send(session.createObjectMessage(obj));
		destroy(producer);
	}

	private static Producer getProducerById(String id) {
		Producer p = MQUtil.getProducerById(id);
		if (p == null) {
			throw new NullPointerException("according to id:" + id + ", not found produer.");
		}
		return p;
	}

	public static void sendTextMessage(String mes, String id)
	        throws Exception {
		init();
		Producer p = getProducerById(id);
		MessageProducer producer = getMessageProducer(getDestination(p), p.getDeliveryMode());
		producer.send(session.createTextMessage(mes));
		destroy(producer);
	}

	private static MessageProducer getMessageProducer(Destination destination, Integer deliveryMode)
	        throws Exception {
		MessageProducer producer = session.createProducer(destination);
		/**
		 * PERSISTENT(持久性消息):
		 * 这是ActiveMQ的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。
		 * 可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。这意味着在持久性消息传送至目标时,
		 * 消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,
		 * 它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。
		 * NON_PERSISTENT(非持久性消息):
		 * 保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。
		 * 此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。
		 * 
		 */
		producer.setDeliveryMode(deliveryMode);
		return producer;
	}

	private static Destination getDestination(Producer p) throws Exception {
		return getDestination(p.getMqType(), p.getTopic());
	}

	private static Destination getDestination(Consumer c) throws Exception {
		return getDestination(c.getMqType(), c.getTopic());
	}

	private static Destination getDestination(Integer mqType, String topic) throws Exception {
		Destination destination = null;
		if (ActiveMqType.QUEUE == mqType)
			destination = session.createQueue(topic);
		else if (ActiveMqType.TOPIC == mqType)
			destination = session.createTopic(topic);
		else
			throw new IllegalArgumentException("mqType must be 1 or 2.");
		return destination;
	}
	/**
	 * 启动所有监听
	 * @param c
	 * @throws Exception
	 */
	public static void startConsumer(Consumer c) throws Exception {
		init();
		MessageConsumer consumer = session.createConsumer(MessageUtil.getDestination(c));
		MessageListener listener = c.getMessageListener().newInstance();
		consumer.setMessageListener(listener);
	}

	private static void destroy(MessageProducer producer) throws JMSException {
		if (producer != null) {
			producer.close();
		}
		if (session != null) {
			session.close();
			session = null;
		}
		if (conn != null) {
			conn.close();
			conn = null;
		}
	}

	public static void destroy(MessageConsumer consumer) throws JMSException {
		if (consumer != null) {
			consumer.close();
			consumer = null;
		}
		if (session != null) {
			session.close();
			session = null;
		}
		if (conn != null) {
			conn.close();
			conn = null;
		}
	}

细节不在赘述,具体代码已上传至码云:https://gitee.com/savage_xiao/boot.demo/tree/master

有兴趣可以下载下来看一下,其中有包含其他springboot的研究

测试代码:

public static void main(String[] args) {
		try {
			for(int i = 101; i<200;i++){
				MessageUtil.sendTextMessage("hello world!"+","+(i+1), "demo.test");
			}
        } catch (Exception e) {
	        e.printStackTrace();
        }
	}

-----------------------

public static void main(String[] args) {
		try {
			for(int i = 0; i<100;i++){
				MessageUtil.sendTextMessage("hello world2!"+","+(i+1), "demo.test2");
			}
        } catch (Exception e) {
	        e.printStackTrace();
        }
	}

测试结果:

listener2:hello world2!,1
listener2:hello world2!,2
listener2:hello world2!,3
listener2:hello world2!,4
listener2:hello world2!,5
....
....略
listener2:hello world2!,98
listener2:hello world2!,99
listener2:hello world2!,100
listener:hello world!,102
listener:hello world!,103
...
...略
listener:hello world!,197
listener:hello world!,198
listener:hello world!,199
listener:hello world!,200

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 返回执行结果的任务队列:ExecutorCompletionService

    有时候我们需要展示一些内容,如果等所有内容都加载完毕再展示这样反而会降低用户体验; 因为如果消耗时间长那么用户需要瞪着空白的页面,反而会失去兴趣; 所以我们...

    用户1215919
  • java根据wait,notify,synchronize关键字自定义队列

    这里定义了一个有界队列 ,保证先进先出,在队列达到上限时应该阻塞,直到队列有元素被取出才能继续往里继续添加 完整代码: package com.comic....

    用户1215919
  • spring boot加载复杂的yml文件获取不到值的问题

    今天使用spring boot读取yml文件,这种多层嵌套的竟然无法读取到(value注解spring.redis.pool.max.wait),即便加上全名...

    用户1215919
  • 自己写的一个代码自动生成工具_java版_源码下载

    这里要实现的功能是,当我们给出了bean,如:Admin,User,People等实体类后,

    Hongten
  • Java中的static关键字解析

      static关键字是很多朋友在编写代码和阅读代码时碰到的比较难以理解的一个关键字,也是各大公司的面试官喜欢在面试时问到的知识点之一。下面就先讲述一下stat...

    Java团长
  • Java下static关键字用法详解

      本文章介绍了java下static关键字的用法,大部分内容摘自原作者,在此学习并分享给大家。

    哲洛不闹
  • 【Java学习笔记之十五】Java中的static关键字解析

    Java中的static关键字解析   static关键字是很多朋友在编写代码和阅读代码时碰到的比较难以理解的一个关键字,也是各大公司的面试官喜欢在面试时问到的...

    Angel_Kitty
  • JS 中有趣的事实

    使用 JS 有很多有趣的地方。尽管工程师们每天都在与之打交道,但仍有一些语言没有被开发出来。本广将介绍一些你可能想不到的JS特性。

    前端小智@大迁世界
  • JS 中有趣的事实

    使用 JS 有很多有趣的地方。尽管工程师们每天都在与之打交道,但仍有一些语言没有被开发出来。本广将介绍一些你可能想不到的JS特性。

    双面人
  • 深入理解static关键字

    如果给一个属性加上static,那么这个属性不再属于某一个对象了,而是属于N个对象,共用同一个static属性。

    爱学习的孙小白

扫码关注云+社区

领取腾讯云代金券