前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ActiveMQ点对点消息传递

ActiveMQ点对点消息传递

作者头像
用户4919348
发布2019-04-02 11:24:42
1.2K1
发布2019-04-02 11:24:42
举报
文章被收录于专栏:波波烤鸭波波烤鸭

  上篇文章中详细介绍了ActiveMQ。本文继续介绍ActiveMQ的具体操作

ActiveMQ

处理对象消息

1.定义消息载体对象

代码语言:javascript
复制
/**
 * Order Bean
 * 定义消息载体类型. 即要在ActiveMQ中传递的数据实体类型.
 * 消息载体对象必须实现接口java.io.Serializable, 因为消息需要在网络中传递,要求必须可序列化
 * @author dengp
 *
 */
public class Order implements Serializable{

	private static final long serialVersionUID = 1L;
	
	private String id;
	private String nick;
	private Long price;
	private Date createTime;
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getNick() {
		return nick;
	}
	public void setNick(String nick) {
		this.nick = nick;
	}
	public Long getPrice() {
		return price;
	}
	public void setPrice(Long price) {
		this.price = price;
	}
	public Date getCreateTime() {
		return createTime;
	}
	public void setCreateTime(Date createTime) {
		this.createTime = createTime;
	}
	public static long getSerialversionuid() {
		return serialVersionUID;
	}
	@Override
	public String toString() {
		return "Order [id=" + id + ", nick=" + nick + ", price=" + price + ", createTime=" + createTime + "]";
	}
}

2.定义消息生产者

代码语言:javascript
复制
/**
 * ActiveMQ中的生产者(Producer)
 * @author dengp
 *
 */
public class OrderProducer {

	public void sendhello2ActiveMq(Order messageObject) {
		ConnectionFactory factory = null;
		Connection conn = null;
		Session session = null;
		Destination destination = null;
		MessageProducer producer = null;
		Message message = null;
		try {
			
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
			// 创建链接对象
			conn = factory.createConnection();
			// 启动连接对象
			conn.start();
			
			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
			// 创建目的地,目的地的命名既是队列的命令
			destination = session.createQueue("MQ-Hello-Object");
			producer = session.createProducer(destination);
			// 创建消息对象. 此消息是对象消息, 其中保存数据为对象.
			message = session.createObjectMessage(messageObject);
			// 发送消息
			producer.send(message);
		} catch (Exception e) {
			e.printStackTrace();
			System.out.println("访问ActiveMQ服务发生错误!!");
		} finally {
			try {
				// 回收消息发送者资源
				if (null != producer)
					producer.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
			try {
				// 回收会话资源
				if (null != session)
					session.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
			try {
				// 回收链接资源
				if (null != conn)
					conn.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
}

3.定义消息消费者

代码语言:javascript
复制
/**
 * ActiveMQ中的消费者(Consumer)
 * @author dengp
 *
 */
public class OrderConsumer {

	public void reciveOrderFormActiveMq() {
		ConnectionFactory factory = null;
		Connection conn = null;
		Session session = null;
		Destination destination = null;
		MessageConsumer consumer = null;
		Message message = null;
		try {
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
			conn = factory.createConnection();
			conn.start();
			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("MQ-Hello-Object");
			// 创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地.
			consumer = session.createConsumer(destination);
			// 从ActiveMQ中获取消息
			message = consumer.receive();
			Object obj = ((ObjectMessage)message).getObject();
			System.out.println("ActiveMQ获取的消息是:"+obj);
		} catch (Exception e) {
			e.printStackTrace();
			System.out.println("访问ActiveMQ服务发生错误!!");
		} finally {
			try {
				// 回收消息发送者资源
				if (null != consumer)
					consumer.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
			try {
				// 回收会话资源
				if (null != session)
					session.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
			try {
				// 回收链接资源
				if (null != conn)
					conn.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
}

4.测试

生产者测试

代码语言:javascript
复制
public static void main(String[] args) {
	OrderProducer pro = new OrderProducer();
	Order order = new Order();
	order.setId("100");
	order.setNick("波波烤鸭");
	order.setPrice(9999l);
	order.setCreateTime(new Date());
	pro.sendhello2ActiveMq(order);
}
在这里插入图片描述
在这里插入图片描述

消费者测试 获取到了相关的信息

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

实现队列服务监听

1.观察者模式

1.1事件源

事件发生的源头。 监听器监听的具体位置。

1.2事件

具体触发的事件。 如: 单击事件, 双击事件 等。 其中必然包含事件源信息。

1.3监听器

处理事件的代码逻辑。 Java观察者模式(Observer)

2.定义生成者代码

代码语言:javascript
复制
package com.dpb.observe;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * ActiveMQ中的生产者(Producer)
 * @author dengp
 *
 */
public class MyProducer {

	public void sendhello2ActiveMq(String messageText) {
		// 连接工厂,用于创建Connection对象
		ConnectionFactory factory = null;
		// activeMQ 连接对象
		Connection conn = null;
		// 一次和ActiveMQ的持久会话对象
		Session session = null;
		// 目的地
		Destination destination = null;
		// 消息发送者
		MessageProducer producer = null;
		// 封装消息的对象
		Message message = null;
		try {
			/*
			 * 创建链接工厂 ActiveMQConnectionFactory -由ActiveMQ实现的ConnectionFactory接口实现类. 
			 * 构造方法: public ActiveMQConnectionFactory(String userName, String password,
			 * String brokerURL) 
			 * userName - 访问ActiveMQ服务的用户名,用户名可以通过jetty-realm.properties配置文件配置. 
			 * password - 访问ActiveMQ服务的密码,密码可以通过jetty-realm.properties配置文件配置. 
			 * brokerURL -访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号 此链接基于TCP/IP协议.
			 */
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
			// 创建链接对象
			conn = factory.createConnection();
			// 启动连接对象
			conn.start();
			/*
			 * 创建会话对象 
			 * 方法 - connection.createSession(boolean transacted, int acknowledgeMode); 
			 * transacted - 是否使用事务, 
			 * 可选值为true|false 
			 * true - 使用事务, 当设置此变量值, 则acknowledgeMode参数无效, 
			 * 建议传递的acknowledgeMode参数值为 Session.SESSION_TRANSACTED 
			 * false - 不使用事务, 设置此变量值,则acknowledgeMode参数必须设置. 
			 * acknowledgeMode - 消息确认机制, 可选值为:
			 * Session.AUTO_ACKNOWLEDGE - 自动确认消息机制 
			 * Session.CLIENT_ACKNOWLEDGE -客户端确认消息机制 
			 * Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制
			 */
			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
			// 创建目的地,目的地的命名既是队列的命令
			destination = session.createQueue("MQ-Hello-observe");
			// 创建消息生成者, 创建的消息生成者与某目的地对应, 即方法参数目的地.
			producer = session.createProducer(destination);
			// 创建消息对象,创建一个文本消息对象。此消息对象中保存要传递的文本数据.
			message = session.createTextMessage(messageText);

			// 发送消息
			producer.send(message);
		} catch (Exception e) {
			e.printStackTrace();
			System.out.println("访问ActiveMQ服务发生错误!!");
		} finally {
			try {
				// 回收消息发送者资源
				if (null != producer)
					producer.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
			try {
				// 回收会话资源
				if (null != session)
					session.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
			try {
				// 回收链接资源
				if (null != conn)
					conn.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
}

3.定义消费者代码

代码语言:javascript
复制
package com.dpb.observe;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * ActiveMQ中的消费者(Consumer)
 * @author dengp
 *
 */
public class MyConsumer {

	public void reciveHelloFormActiveMq() {
		// 连接工厂,用于创建Connection对象
		ConnectionFactory factory = null;
		// activeMQ 连接对象
		Connection conn = null;
		// 一次和ActiveMQ的持久会话对象
		Session session = null;
		// 目的地
		Destination destination = null;
		// 消息消费者
		MessageConsumer consumer = null;
		// 封装消息的对象
		Message message = null;
		try {
			/*
			 * 创建链接工厂 ActiveMQConnectionFactory -由ActiveMQ实现的ConnectionFactory接口实现类. 
			 * 构造方法: public ActiveMQConnectionFactory(String userName, String password,
			 * String brokerURL) 
			 * userName - 访问ActiveMQ服务的用户名,用户名可以通过jetty-realm.properties配置文件配置. 
			 * password - 访问ActiveMQ服务的密码,密码可以通过jetty-realm.properties配置文件配置. 
			 * brokerURL -访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号 此链接基于TCP/IP协议.
			 */
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
			// 创建链接对象
			conn = factory.createConnection();
			// 启动连接对象
			conn.start();
			/*
			 * 创建会话对象 
			 * 方法 - connection.createSession(boolean transacted, int acknowledgeMode); 
			 * transacted - 是否使用事务, 
			 * 可选值为true|false 
			 * true - 使用事务, 当设置此变量值, 则acknowledgeMode参数无效, 
			 * 建议传递的acknowledgeMode参数值为 Session.SESSION_TRANSACTED 
			 * false - 不使用事务, 设置此变量值,则acknowledgeMode参数必须设置. 
			 * acknowledgeMode - 消息确认机制, 可选值为:
			 * Session.AUTO_ACKNOWLEDGE - 自动确认消息机制 
			 * Session.CLIENT_ACKNOWLEDGE -客户端确认消息机制 
			 * Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制
			 */
			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
			// 创建目的地,目的地的命名既是队列的命令
			destination = session.createQueue("MQ-Hello-observe");
			// 创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地.
			consumer = session.createConsumer(destination);
			// 监听ActiveMQ服务中的消息,当发现消息的时候,自动处理
			consumer.setMessageListener(new MessageListener() {
				/**
				 * 当用消息到来的时候触发该方法,在该方法中处理消息
				 */
				@Override
				public void onMessage(Message message) {
					TextMessage textMessage = (TextMessage) message;
					String messageString = null;
					try {
						messageString = textMessage.getText();
					} catch (JMSException e) {
						e.printStackTrace();
						messageString = "处理消息失败!!";
					}
					System.out.println("处理的消息内容是 : " + messageString);
				}
			});
			// 阻塞进程
			System.in.read();
		} catch (Exception e) {
			e.printStackTrace();
			System.out.println("访问ActiveMQ服务发生错误!!");
		} finally {
			try {
				// 回收消息发送者资源
				if (null != consumer)
					consumer.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
			try {
				// 回收会话资源
				if (null != session)
					session.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
			try {
				// 回收链接资源
				if (null != conn)
					conn.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
}

4.测试

生产者

代码语言:javascript
复制
public static void main(String[] args) {
	MyProducer pro = new MyProducer();
	pro.sendhello2ActiveMq("你好啊...listener");
}
在这里插入图片描述
在这里插入图片描述

消费者

代码语言:javascript
复制
public static void main(String[] args) {
	MyConsumer con = new MyConsumer();
	con.reciveHelloFormActiveMq();
}
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

web页面中可以看到还在线的消费者。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019年02月16日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ActiveMQ
    • 处理对象消息
      • 1.定义消息载体对象
      • 2.定义消息生产者
      • 3.定义消息消费者
      • 4.测试
    • 实现队列服务监听
      • 1.观察者模式
      • 2.定义生成者代码
      • 3.定义消费者代码
      • 4.测试
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档