前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ActiveMQ的入门程序

ActiveMQ的入门程序

作者头像
用户5927264
发布2019-08-01 10:09:20
2990
发布2019-08-01 10:09:20
举报
文章被收录于专栏:OSChina
代码语言:javascript
复制
package com.shi.page;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.junit.Test;

/**
 * 
 * @author: SHF
 * @date: 2018年3月16日 上午8:48:10
 * @Description:消息队列测试类
 */
public class ActiveMQTest {

	/**
	 * 点到点形式 发送 消息 生产者
	 * @throws Exception
	 */
	@Test
	public void queueProducerTest()throws Exception{
		//1.创建一个连接工厂对象,需要指定服务的ip和端口
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616");
		//2.使用工厂对象创建一个Connection对象
		Connection connection = connectionFactory.createConnection();
		//3.开启连接,调用Connection对象的start方法
		connection.start();
		//4.创建一个Session对象
				//第一个参数:是否开启事物。如果开启事物第二个参数无意义。一般不开启事物。
		 		//第二个参数:应答模式,一般:自动应答,手动应答。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5.使用Session对象创建一个Destination对象,俩种形式queue,topic,现在使用queue
		Queue queue = session.createQueue("test-queue");//Queue extends Destination
		//6.使用Session对象创建一个producer对象
		MessageProducer producer = session.createProducer(queue);
		//7.创建一个Message对像,可以使用TextMessage
		/*TextMessage textMessage=new ActiveMQTextMessage();
		textMessage.setText("你要发送的消息");*/
		TextMessage textMessage = session.createTextMessage("queue你要发送的消息");
		//8.发送消息
		producer.send(textMessage);
		//9.关闭资源
		producer.close();
		session.close();
		connection.close();
	}
	
	/**
	 * 点到点接受消息  消费者
	 * @throws Exception
	 */
	@Test
	public void queueConsumerTest()throws Exception{
		//1 创建一个ConnectionFactory对象连接MQ服务器
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616");
		//2 创建一个连接对象
		Connection connection = connectionFactory.createConnection();
		//3 开启连接
		connection.start();
		//4 使用Connection对象创建一个Session对象
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5 创建一个Destination对象 queue对象
		Queue queue = session.createQueue("test-queue");
		//6 使用Session对象创建一个消费者对象
		MessageConsumer consumer = session.createConsumer(queue);
		//7 接受消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message paramMessage) {
				// 接受到消息的回调函数
				TextMessage testMessage=(TextMessage) paramMessage;
				try {
					//8 打印消息
					System.out.println(testMessage.getText());
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
		System.in.read();//等待接受消息
		//9 关闭连接
		consumer.close();
		session.close();
		connection.close();
	}
	
	/**
	 * 一对多 发送消息  生产者
	 * @throws Exception
	 */
	@Test
	public void topicProducerTest()throws Exception{
		//1.创建一个连接工厂对象,需要指定服务的ip和端口
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616");
		//2.使用工厂对象创建一个Connection对象
		Connection connection = connectionFactory.createConnection();
		//3.开启连接,调用Connection对象的start方法
		connection.start();
		//4.创建一个Session对象
				//第一个参数:是否开启事物。如果开启事物第二个参数无意义。一般不开启事物。
		 		//第二个参数:应答模式,一般:自动应答,手动应答。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5.使用Session对象创建一个Destination对象,俩种形式queue,topic,现在使用topic
		Topic topic = session.createTopic("test-topic");//Queue extends Destination
		//6.使用Session对象创建一个producer对象
		MessageProducer producer = session.createProducer(topic);
		//7.创建一个Message对像,可以使用TextMessage
		/*TextMessage textMessage=new ActiveMQTextMessage();
		textMessage.setText("你要发送的消息");*/
		TextMessage textMessage = session.createTextMessage("topic你要发送的消息");
		//8.发送消息
		producer.send(textMessage);
		//9.关闭资源
		producer.close();
		session.close();
		connection.close();
	}
	
	
	/**
	 * 一对多接受消息  消费者
	 * @throws Exception
	 */
	@Test
	public void topicConsumerTest()throws Exception{
		//1 创建一个ConnectionFactory对象连接MQ服务器
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616");
		//2 创建一个连接对象
		Connection connection = connectionFactory.createConnection();
		//3 开启连接
		connection.start();
		//4 使用Connection对象创建一个Session对象
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5 创建一个Destination对象 topic对象
		Topic topic = session.createTopic("test-topic");
		//6 使用Session对象创建一个消费者对象
		MessageConsumer consumer = session.createConsumer(topic);
		//7 接受消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message paramMessage) {
				// 接受到消息的回调函数
				TextMessage testMessage=(TextMessage) paramMessage;
				try {
					//8 打印消息
					System.out.println(testMessage.getText());
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
		System.out.println("topic消费者3 已经启动...");
		System.in.read();//等待接受消息
		//9 关闭连接
		consumer.close();
		session.close();
		connection.close();
	}
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档