1、什么是ActiveMQ
1 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
2 主要特点:
3 1). 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
4 2). 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)。
5 3.) 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性。
6 4.) 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上。
7 5). 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA。
8 6). 支持通过JDBC和journal提供高速的消息持久化。
9 7). 从设计上保证了高性能的集群,客户端-服务器,点对点。
10 8). 支持Ajax。
11 9). 支持与Axis的整合。
12 10). 可以很容易得调用内嵌JMS provider,进行测试。
2、JMS介绍:
1 1)、JMS的全称是Java Message Service,即Java消息服务。用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
2
3 2)、它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。
3、ActiveMQ的两种消息形式。
1 1)、对于消息的传递有两种类型。
2 a)、一种是点对点的,即一个生产者和一个消费者一一对应。
3 b)、另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
4
5 2)、JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
6 a)、 StreamMessage -- Java原始值的数据流。
7 b)、 MapMessage--一套名称-值对。
8 c)、 TextMessage--一个字符串对象。
9 d)、 ObjectMessage--一个序列化的 Java对象。
10 e)、 BytesMessage--一个字节的数据流。
4、ActiveMQ的安装。官方网址:http://activemq.apache.org/
由于ActiveMQ是java开发的,所以需要先安装jdk(注意:安装jdk,需要jdk1.7以上版本)的哦。这里使用的是apache-activemq-5.12.0-bin.tar.gz版本的。
开始进行解压缩操作。
1 [root@localhost package]# ls
2 apache-activemq-5.12.0-bin.tar.gz apache-activemq-5.12.0-bin.zip apache-tomcat-7.0.47.tar.gz IK Analyzer 2012FF_hf1 IK Analyzer 2012FF_hf1.rar jdk-7u55-linux-i586.tar.gz solr-4.10.3.tgz.tgz zookeeper-3.4.6.tar.gz
3 [root@localhost package]# tar -zxvf apache-activemq-5.12.0-bin.tar.gz -C /home/hadoop/soft/
解压缩完以后进入bin目录。开始进行启动操作。
启动:[root@localhost bin]# ./activemq start
停止:[root@localhost bin]# ./activemq stop
查看状态:[root@localhost bin]# ./activemq status
1 [root@localhost soft]# cd apache-activemq-5.12.0/
2 [root@localhost apache-activemq-5.12.0]# ls
3 activemq-all-5.12.0.jar bin conf data docs examples lib LICENSE NOTICE README.txt webapps webapps-demo
4 [root@localhost apache-activemq-5.12.0]# ll
5 total 9384
6 -rwxr-xr-x. 1 root root 9524668 Aug 10 2015 activemq-all-5.12.0.jar
7 drwxr-xr-x. 5 root root 4096 Sep 15 00:39 bin
8 drwxr-xr-x. 2 root root 4096 Sep 15 00:39 conf
9 drwxr-xr-x. 2 root root 4096 Sep 15 00:39 data
10 drwxr-xr-x. 2 root root 4096 Sep 15 00:39 docs
11 drwxr-xr-x. 8 root root 4096 Sep 15 00:39 examples
12 drwxr-xr-x. 6 root root 4096 Sep 15 00:39 lib
13 -rw-r--r--. 1 root root 40580 Aug 10 2015 LICENSE
14 -rw-r--r--. 1 root root 3334 Aug 10 2015 NOTICE
15 -rw-r--r--. 1 root root 2610 Aug 10 2015 README.txt
16 drwxr-xr-x. 7 root root 4096 Sep 15 00:39 webapps
17 drwxr-xr-x. 3 root root 4096 Sep 15 00:39 webapps-demo
18 [root@localhost apache-activemq-5.12.0]# cd bin/
19 [root@localhost bin]# ls
20 activemq activemq-diag activemq.jar env linux-x86-32 linux-x86-64 macosx wrapper.jar
21 [root@localhost bin]# ./activemq start
22 INFO: Loading '/home/hadoop/soft/apache-activemq-5.12.0//bin/env'
23 INFO: Using java '/home/hadoop/soft/jdk1.7.0_55/bin/java'
24 INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
25 INFO: pidfile created : '/home/hadoop/soft/apache-activemq-5.12.0//data/activemq.pid' (pid '9318')
26 [root@localhost bin]# ./activemq status
27 INFO: Loading '/home/hadoop/soft/apache-activemq-5.12.0//bin/env'
28 INFO: Using java '/home/hadoop/soft/jdk1.7.0_55/bin/java'
29 ActiveMQ is running (pid '9318')
30 [root@localhost bin]#
然后你可以访问后台管理界面,账号和密码默认都是admin的。访问地址:http://192.168.110.142:8161/admin
Home是当前的欢迎页,Queues是点到点形式,Topics是发布订阅模式,Subscribers话题消息的发布与订阅,Connections客户端链接,Network当前网络的链接状态,Scheduled计划任务,Send可以测试发送消息。
5、ActiveMQ的使用方法,JMS消息发送模式。
注意: 1)、在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被概括为:只有一个消费者将获得消息。生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。每一个成功处理的消息都由接收者签收。 2)、发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。这种模式被概括为:多个消费者可以获得消息,在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。
6、JMS应用程序接口。
1 1)、ConnectionFactory 接口(连接工厂)
2 用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。 管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。
3 2)、Connection 接口(连接)
4 连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。
5 3)、Destination 接口(目标)
6 目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。
7 4)、MessageConsumer 接口(消息消费者)
8 由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。
9 5)、MessageProducer 接口(消息生产者)
10 由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。
11 6)、Message 接口(消息)
12 是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序。一个消息有三个主要部分:
13 消息头(必须):包含用于识别和为消息寻找路由的操作设置。
14 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
15 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。
16 消息接口非常灵活,并提供了许多方式来定制消息的内容。
17 7)、Session 接口(会话)
18 表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。
7、如何使用java操作activeMQ呢,把ActiveMQ依赖的jar包添加到工程中。
使用maven工程,则添加jar包的依赖:
1 <dependency>
2 <groupId>org.apache.activemq</groupId>
3 <artifactId>activemq-all</artifactId>
4 <version>5.11.2</version>
5 </dependency>
然后你就可以愉快得开发了。是不是很开森呢。
8、ActiveMQ点对点模式(point-to-point)。
ActiveMq的点对点生产者。
1 package com.taotao.activemq;
2
3 import javax.jms.Connection;
4 import javax.jms.ConnectionFactory;
5 import javax.jms.JMSException;
6 import javax.jms.MessageProducer;
7 import javax.jms.Queue;
8 import javax.jms.Session;
9 import javax.jms.TextMessage;
10
11 import org.apache.activemq.ActiveMQConnectionFactory;
12 import org.apache.activemq.command.ActiveMQTextMessage;
13 import org.junit.Test;
14
15 /**
16 *
17 * @ClassName: ActiveMqMain.java
18 * @author: biehl
19 * @since: 2019年9月15日 下午4:44:57
20 * @Copyright: ©2019 biehl 版权所有
21 * @version: 0.0.1
22 * @Description:
23 */
24 public class ActiveMqMain {
25
26 // activeMq得点对点生产者
27 @Test
28 public void queueProducer() throws JMSException {
29 // 1、创建一个连接工厂对象ConnectionFactory对象。需要指定mq服务得ip以及端口号61616。
30 String brokerURL = "tcp://192.168.110.142:61616";
31 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
32 // 2、使用ConnectionFactory创建一个连接Connection对象。
33 Connection connection = connectionFactory.createConnection();
34 // 3、开启连接。调用Connection对象得start方法。
35 connection.start();
36 // 4、使用Connection对象创建一个Session对象。
37 // 参数一是否开启事务,一般不开启事务,保证数据得最终一致性,可以使用消息队列实现数据最终一致性。如果第一个参数为true,第二个参数自动忽略
38 // 参数二是消息得应答模式。两种模式,自动应答和手动应答。一般使用自动应答。
39 boolean transacted = false;// 不开启事务
40 int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;// 1
41 Session session = connection.createSession(transacted, acknowledgeMode);
42 // 5、使用Session对象创建一个Destination对象。两种形式queue、topic。现在应该使用queue。
43 String queueName = "queue1";// 当前消息队列得名称
44 Queue queue = session.createQueue(queueName);
45 // 6、使用Session对象创建一个Producer对象。
46 // interface Queue extends Destination。destination是一个接口。
47 MessageProducer producer = session.createProducer(queue);
48 // 7、创建一个TextMessage对象。
49 // 创建TextMessage方式一
50 // TextMessage textMessage = new ActiveMQTextMessage();
51 // textMessage.setText("hello activeMq......");
52 // 方式二
53 TextMessage textMessage = session.createTextMessage("hello activeMq......");
54 // 8、发送消息。
55 producer.send(textMessage);
56 // 9、关闭资源。
57 producer.close();// 关闭producer
58 session.close();// 关闭session
59 connection.close();// 关闭connection
60 }
61
62 }
ActiveMQ的点对点消息生产成功以后,可以在ActiveMQ提供的web界面可以看到一些信息。
activeMq的点对点消费者。
1 package com.taotao.activemq;
2
3 import java.io.IOException;
4
5 import javax.jms.Connection;
6 import javax.jms.ConnectionFactory;
7 import javax.jms.JMSException;
8 import javax.jms.Message;
9 import javax.jms.MessageConsumer;
10 import javax.jms.MessageListener;
11 import javax.jms.MessageProducer;
12 import javax.jms.Queue;
13 import javax.jms.Session;
14 import javax.jms.TextMessage;
15
16 import org.apache.activemq.ActiveMQConnectionFactory;
17 import org.apache.activemq.command.ActiveMQTextMessage;
18 import org.junit.Test;
19
20 /**
21 *
22 * @ClassName: ActiveMqMain.java
23 * @author: biehl
24 * @since: 2019年9月15日 下午4:44:57
25 * @Copyright: ©2019 biehl 版权所有
26 * @version: 0.0.1
27 * @Description:
28 */
29 public class ActiveMqMain {
30
31 // activeMq的点对点生产者
32 @Test
33 public void queueProducer() throws JMSException {
34 // 1、创建一个连接工厂对象ConnectionFactory对象。需要指定mq服务得ip以及端口号61616。
35 String brokerURL = "tcp://192.168.110.142:61616";
36 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
37 // 2、使用ConnectionFactory创建一个连接Connection对象。
38 Connection connection = connectionFactory.createConnection();
39 // 3、开启连接。调用Connection对象得start方法。
40 connection.start();
41 // 4、使用Connection对象创建一个Session对象。
42 // 参数一是否开启事务,一般不开启事务,保证数据得最终一致性,可以使用消息队列实现数据最终一致性。如果第一个参数为true,第二个参数自动忽略
43 // 参数二是消息得应答模式。两种模式,自动应答和手动应答。一般使用自动应答。
44 boolean transacted = false;// 不开启事务
45 int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;// 1
46 Session session = connection.createSession(transacted, acknowledgeMode);
47 // 5、使用Session对象创建一个Destination对象。两种形式queue、topic。现在应该使用queue。
48 String queueName = "queue1";// 当前消息队列得名称
49 Queue queue = session.createQueue(queueName);
50 // 6、使用Session对象创建一个Producer对象。
51 // interface Queue extends Destination。destination是一个接口。
52 MessageProducer producer = session.createProducer(queue);
53 // 7、创建一个TextMessage对象。
54 // 创建TextMessage方式一
55 // TextMessage textMessage = new ActiveMQTextMessage();
56 // textMessage.setText("hello activeMq......");
57 // 方式二
58 TextMessage textMessage = session.createTextMessage("hello activeMq......");
59 // 8、发送消息。
60 producer.send(textMessage);
61 // 9、关闭资源。
62 producer.close();// 关闭producer
63 session.close();// 关闭session
64 connection.close();// 关闭connection
65 }
66
67 // activeMq的点对点消费者
68 @Test
69 public void queueConsumer() throws JMSException {
70 // 1、创建一个连接工厂ConnectionFactory 对象
71 String brokerURL = "tcp://192.168.110.142:61616";
72 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
73 // 2、使用连接工厂对象创建一个连接
74 Connection connection = connectionFactory.createConnection();
75 // 3、开启连接
76 connection.start();
77 // 4、使用连接对象创建一个Session对象
78 boolean transacted = false;// 关闭事务
79 int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;// 自动响应
80 Session session = connection.createSession(transacted, acknowledgeMode);
81 // 5、使用Session创建一个Destination,Destination应该和消息的发送端一致的。
82 String queueName = "queue1";
83 Queue queue = session.createQueue(queueName);
84 // 6、使用Session创建一个Consumer对象。
85 MessageConsumer consumer = session.createConsumer(queue);
86 // 7、向Consumer对象中设置一个MessageListener对象,用来接受消息。
87 // 匿名内部类,new 接口,后面加上{},相当于实现了这个接口的实现类。然后创建这个实现类的对象listener。
88 MessageListener listener = new MessageListener() {
89
90 @Override
91 public void onMessage(Message message) {
92 // 接受事件的。当消息到达就可以在这里接受到消息了的。
93 // 8、取出消息的内容。
94 if (message instanceof TextMessage) {
95 TextMessage textMessage = (TextMessage) message;
96 // 9、打印消息内容。
97 try {
98 String text = textMessage.getText();
99 System.out.println(text);
100 } catch (JMSException e) {
101 e.printStackTrace();
102 }
103 }
104 }
105 };
106 consumer.setMessageListener(listener);
107
108 // 关闭资源以前,系统等待,等待接受消息。
109 /*while (true) {
110 try {
111 Thread.sleep(100);
112 } catch (InterruptedException e) {
113 e.printStackTrace();
114 }
115 }*/
116
117 // 等待键盘输入。才回接着向下执行的。
118 try {
119 System.in.read();
120 } catch (IOException e) {
121 e.printStackTrace();
122 }
123
124
125 // 10、关闭资源。
126 consumer.close();// 关闭consumer
127 session.close();// 关闭session
128 connection.close();// 关闭connection
129 }
130
131 }
执行了activeMq的点对点消费者。可以在界面看到变化。可以看到有一个消费者,然后生产了7条消息,7条消息进队和7条消息出队。
9、ActiveMQ发布订阅模式(publish/subscribe)。
消费者有两种消费方法(这里使用异步消费): a、同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。
b、异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。
实现MessageListener接口,在MessageListener()方法中实现消息的处理逻辑。
1 package com.taotao.activemq;
2
3 import java.io.IOException;
4
5 import javax.jms.Connection;
6 import javax.jms.ConnectionFactory;
7 import javax.jms.JMSException;
8 import javax.jms.Message;
9 import javax.jms.MessageConsumer;
10 import javax.jms.MessageListener;
11 import javax.jms.MessageProducer;
12 import javax.jms.Session;
13 import javax.jms.TextMessage;
14 import javax.jms.Topic;
15
16 import org.apache.activemq.ActiveMQConnectionFactory;
17 import org.junit.Test;
18
19 /**
20 * Active的发布订阅模式
21 *
22 * @ClassName: ActiveMqTopics.java
23 * @author: biehl
24 * @since: 2019年9月19日 上午10:51:14
25 * @Copyright: ©2019 biehl 版权所有
26 * @version: 0.0.1
27 * @Description:
28 */
29 public class ActiveMqTopics {
30
31 // 发布订阅模式,生产者。topic生产者生产消息默认不持久化客户端的。
32 @Test
33 public void topicProducer() {
34 try {
35 // 1、创建一个连接工厂对象。需要指定mq服务的ip地址以及端口号61616
36 String brikerURL = "tcp://192.168.110.142:61616";
37 // 创建ConnectionFactory接口对象,实现类ActiveMQConnectionFactory
38 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brikerURL);
39
40 // 2、创建Connection连接
41 Connection connection = connectionFactory.createConnection();
42
43 // 3、开启连接,调用Connection的start方法。
44 connection.start();
45
46 // 4、创建Session,使用Connection对象创建一个session
47 // 参数一是否开启事务,一般不开启事务,保证数据得最终一致性,可以使用消息队列实现数据最终一致性。如果第一个参数为true,第二个参数自动忽略
48 boolean transacted = false;
49 // 参数二是消息得应答模式。两种模式,自动应答和手动应答。一般使用自动应答。
50 int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
51 Session session = connection.createSession(transacted, acknowledgeMode);
52
53 // 5、创建Destination,应该使用topic,区别于点对点的queue
54 String topicName = "topic01";
55 Topic topic = session.createTopic(topicName);
56
57 // 6、创建一个Producer对象
58 // interface Topic extends Destination.
59 // Destination是一个接口,Topic接口继承Destination这个接口。
60 MessageProducer producer = session.createProducer(topic);
61
62 // 7、创建一个TextMessage对象
63 String message = null;
64 TextMessage textMessage = null;
65 for (int i = 0; i < 100; i++) {
66 message = i + " ActiveMQ topics......";
67 textMessage = session.createTextMessage(message);
68
69 // 8、发送消息
70 producer.send(textMessage);
71 }
72
73 // 9、关闭资源
74 producer.close();// 关闭producer
75 session.close();// 关闭session
76 connection.close();// 关闭connection
77 } catch (JMSException e) {
78 e.printStackTrace();
79 }
80 }
81
82 // 发布订阅模式,消费者必须一直等待生产者生产的消息,因为发布订阅模式不持久化。
83 @Test
84 public void topicConsumer() {
85 try {
86 // 1、创建一个连接工厂对象
87 String brokerURL = "tcp://192.168.110.142:61616";
88 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
89
90 // 2、使用连接工厂对象创建一个连接
91 Connection connection = connectionFactory.createConnection();
92
93 // 3、开启连接
94 connection.start();
95
96 // 4、使用连接对象创建一个Session对象
97 // 参数一是否开启事务,一般不开启事务,保证数据得最终一致性,可以使用消息队列实现数据最终一致性。如果第一个参数为true,第二个参数自动忽略
98 boolean transacted = false;
99 // 参数二是消息得应答模式。两种模式,自动应答和手动应答。一般使用自动应答。
100 int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
101 Session session = connection.createSession(transacted, acknowledgeMode);
102
103 // 5、使用session创建destination,注意,destination应该和消息的发送端一致的。
104 String topicName = "topic01";
105 Topic topic = session.createTopic(topicName);
106
107 // 6、使用session创建一个consumer对象
108 MessageConsumer consumer = session.createConsumer(topic);
109
110 // 7、向Consumer对象中设置一个MessageListener对象,用来接受消息。
111 // 匿名内部类,new 接口,后面加上{},相当于实现了这个接口的实现类。然后创建这个实现类的对象listener。
112 MessageListener listener = new MessageListener() {
113 // 接受事件的。当消息到达就可以在这里接受到消息了的。
114 // 8、取出消息的内容。
115 @Override
116 public void onMessage(Message message) {
117 if (message instanceof TextMessage) {
118 TextMessage textMessage = (TextMessage) message;
119 // 9、打印消息内容。
120 try {
121 String text = textMessage.getText();
122 System.out.println(text);
123 } catch (JMSException e) {
124 e.printStackTrace();
125 }
126 }
127 }
128 };
129 consumer.setMessageListener(listener);
130
131 // 启动三次,模拟是三个消费者
132 System.out.println("消费者1.......");
133 // System.out.println("消费者2.......");
134 // System.out.println("消费者3.......");
135
136 // 等待键盘输入。才回接着向下执行的。
137 try {
138 System.in.read();
139 } catch (IOException e) {
140 e.printStackTrace();
141 }
142
143 // 9、关闭资源
144 consumer.close();// 关闭producer
145 session.close();// 关闭session
146 connection.close();// 关闭connection
147 } catch (JMSException e) {
148 e.printStackTrace();
149 }
150
151 }
152
153 }
执行了activeMq的发布订阅模式。可以在界面看到变化。可以看到有三个消费者,然后生产了201条消息,201条消息进队和603条消息出队。
10、ActiveMQ与Spring整合如下所示:
在pom.xml配置文件中引入自己的依赖的jar包。
1 <dependency>
2 <groupId>org.springframework</groupId>
3 <artifactId>spring-jms</artifactId>
4 </dependency>
5 <dependency>
6 <groupId>org.springframework</groupId>
7 <artifactId>spring-context-support</artifactId>
8 </dependency>
在配置文件applicationContext-activemq.xml里面配置ConnectionFactory。如下所示:
1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:context="http://www.springframework.org/schema/context"
4 xmlns:p="http://www.springframework.org/schema/p"
5 xmlns:aop="http://www.springframework.org/schema/aop"
6 xmlns:tx="http://www.springframework.org/schema/tx"
7 xmlns:jms="http://www.springframework.org/schema/jms"
8 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
9 xsi:schemaLocation="http://www.springframework.org/schema/beans
10 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
11 http://www.springframework.org/schema/context
12 http://www.springframework.org/schema/context/spring-context-4.0.xsd
13 http://www.springframework.org/schema/aop
14 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
15 http://www.springframework.org/schema/tx
16 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
17 http://www.springframework.org/schema/jms
18 http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
19 http://www.springframework.org/schema/util
20 http://www.springframework.org/schema/util/spring-util-4.0.xsd">
21
22
23 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
24 <bean id="targetConnectionFactory"
25 class="org.apache.activemq.ActiveMQConnectionFactory">
26 <property name="brokerURL"
27 value="tcp://192.168.110.142:61616" />
28 </bean>
29
30 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
31 <bean id="connectionFactory"
32 class="org.springframework.jms.connection.SingleConnectionFactory">
33 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
34 <property name="targetConnectionFactory"
35 ref="targetConnectionFactory" />
36 </bean>
37 </beans>
开始配置生产者的spring配置。
1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:context="http://www.springframework.org/schema/context"
4 xmlns:p="http://www.springframework.org/schema/p"
5 xmlns:aop="http://www.springframework.org/schema/aop"
6 xmlns:tx="http://www.springframework.org/schema/tx"
7 xmlns:jms="http://www.springframework.org/schema/jms"
8 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
9 xsi:schemaLocation="http://www.springframework.org/schema/beans
10 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
11 http://www.springframework.org/schema/context
12 http://www.springframework.org/schema/context/spring-context-4.0.xsd
13 http://www.springframework.org/schema/aop
14 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
15 http://www.springframework.org/schema/tx
16 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
17 http://www.springframework.org/schema/jms
18 http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
19 http://www.springframework.org/schema/util
20 http://www.springframework.org/schema/util/spring-util-4.0.xsd">
21
22
23 <!-- 1、真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
24 <bean id="targetConnectionFactory"
25 class="org.apache.activemq.ActiveMQConnectionFactory">
26 <property name="brokerURL"
27 value="tcp://192.168.110.142:61616" />
28 </bean>
29
30 <!-- 2、Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
31 <bean id="connectionFactory"
32 class="org.springframework.jms.connection.SingleConnectionFactory">
33 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
34 <!-- 给属性targetConnectionFactory传值 -->
35 <property name="targetConnectionFactory"
36 ref="targetConnectionFactory" />
37 </bean>
38
39 <!-- 3、开始配置生产者配置 -->
40 <!-- 配置生产者 -->
41 <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
42 <bean id="jmsTemplate"
43 class="org.springframework.jms.core.JmsTemplate">
44 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
45 <!-- 给属性connectionFactory传值 -->
46 <property name="connectionFactory" ref="connectionFactory" />
47 </bean>
48
49 <!-- 4、配置消息的Destination对象 -->
50 <!-- 点对点模式 -->
51 <!-- 这个是队列目的地,点对点的。 -->
52 <bean id="queueDestination"
53 class="org.apache.activemq.command.ActiveMQQueue">
54 <constructor-arg>
55 <!-- 给ActiveMQQueue构造参数传递一个值为queue -->
56 <value>queue</value>
57 </constructor-arg>
58 </bean>
59
60 <!-- 发布订阅模式 -->
61 <!-- 这个是主题目的地,一对多的。 -->
62 <bean id="topicDestination"
63 class="org.apache.activemq.command.ActiveMQTopic">
64 <!-- 给ActiveMQTopic构造参数传递一个值为topic -->
65 <constructor-arg value="topic" />
66 </bean>
67
68 </beans>
生产者测试代码如下所示:
可以根据之前的消费者测试一下,消息的消费。
1 package com.taotao.activemq;
2
3 import javax.jms.Destination;
4 import javax.jms.JMSException;
5 import javax.jms.Message;
6 import javax.jms.Session;
7 import javax.jms.TextMessage;
8
9 import org.junit.Test;
10 import org.springframework.context.ApplicationContext;
11 import org.springframework.context.support.ClassPathXmlApplicationContext;
12 import org.springframework.jms.core.JmsTemplate;
13 import org.springframework.jms.core.MessageCreator;
14
15 /**
16 *
17 * @ClassName: SpringActiveMQ.java
18 * @author: biehl
19 * @since: 2019年9月19日 下午7:01:43
20 * @Copyright: ©2019 biehl 版权所有
21 * @version: 0.0.1
22 * @Description:
23 */
24 public class SpringActiveMQ {
25
26 // 使用spring与activemq整合,是哟个jmsTemplate发送消息
27 @Test
28 public void jmsTemplateProducer() {
29 // 1、初始化spring容器
30 ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
31 "classpath:/spring/applicationContext-activemq.xml");
32 // 2、从容器中获得jmsTemplate对象。根据类型获取到bean的对象
33 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
34 // 3、从容器中获得Destination对象。根据名称获取到bean的对象
35 Destination destination = (Destination) applicationContext.getBean("queueDestination");
36
37 // 4、发送消息
38 jmsTemplate.send(destination, new MessageCreator() {
39
40 @Override
41 public Message createMessage(Session session) throws JMSException {
42 // 定义一个消息
43 String message = "hello activeMq......";
44 // 发送消息
45 TextMessage textMessage = session.createTextMessage(message);
46 return textMessage;
47 }
48 });
49 }
50
51 }
效果如下所示:
开始配置消费者的spring配置。
1)、注意:那么消费者是通过Spring为我们封装的消息监听容器MessageListenerContainer实现的,它负责接收信息,并把接收到的信息分发给真正的MessageListener进行处理。每个消费者对应每个目的地都需要有对应的MessageListenerContainer。 2)、对于消息监听容器而言,除了要知道监听哪个目的地之外,还需要知道到哪里去监听,也就是说它还需要知道去监听哪个JMS服务器,这是通过在配置MessageConnectionFactory的时候往里面注入一个ConnectionFactory来实现的。 3)、所以在配置一个MessageListenerContainer的时候有三个属性必须指定: a、一个是表示从哪里监听的ConnectionFactory b、一个是表示监听什么的Destination; c、一个是接收到消息以后进行消息处理的MessageListener。 4)、常用的MessageListenerContainer实现类是DefaultMessageListenerContainer。
1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:context="http://www.springframework.org/schema/context"
4 xmlns:p="http://www.springframework.org/schema/p"
5 xmlns:aop="http://www.springframework.org/schema/aop"
6 xmlns:tx="http://www.springframework.org/schema/tx"
7 xmlns:jms="http://www.springframework.org/schema/jms"
8 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
9 xsi:schemaLocation="http://www.springframework.org/schema/beans
10 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
11 http://www.springframework.org/schema/context
12 http://www.springframework.org/schema/context/spring-context-4.0.xsd
13 http://www.springframework.org/schema/aop
14 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
15 http://www.springframework.org/schema/tx
16 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
17 http://www.springframework.org/schema/jms
18 http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
19 http://www.springframework.org/schema/util
20 http://www.springframework.org/schema/util/spring-util-4.0.xsd">
21
22
23 <!-- 1、真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
24 <bean id="targetConnectionFactory"
25 class="org.apache.activemq.ActiveMQConnectionFactory">
26 <property name="brokerURL"
27 value="tcp://192.168.110.142:61616" />
28 </bean>
29
30 <!-- 2、Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
31 <bean id="connectionFactory"
32 class="org.springframework.jms.connection.SingleConnectionFactory">
33 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
34 <!-- 给属性targetConnectionFactory传值 -->
35 <property name="targetConnectionFactory"
36 ref="targetConnectionFactory" />
37 </bean>
38
39 <!-- 3、配置消息的Destination对象。接受消息的目的地。 -->
40 <!-- 点对点模式 -->
41 <!-- 这个是队列目的地,点对点的。 -->
42 <bean id="queueDestination"
43 class="org.apache.activemq.command.ActiveMQQueue">
44 <constructor-arg>
45 <!-- 给ActiveMQQueue构造参数传递一个值为queue -->
46 <value>queue</value>
47 </constructor-arg>
48 </bean>
49
50 <!-- 发布订阅模式 -->
51 <!-- 这个是主题目的地,一对多的。 -->
52 <bean id="topicDestination"
53 class="org.apache.activemq.command.ActiveMQTopic">
54 <!-- 给ActiveMQTopic构造参数传递一个值为topic -->
55 <constructor-arg value="topic" />
56 </bean>
57
58 <!-- 4、配置消息接收者 -->
59 <!-- 配置一个监听器 -->
60 <bean id="activeMqMessageListener"
61 class="com.taotao.search.listener.ActiveMqMessageListener" />
62
63 <!-- 配置监听容器 -->
64 <bean
65 class="org.springframework.jms.listener.DefaultMessageListenerContainer">
66 <!-- 属性设置 -->
67 <!-- 一个是表示从哪里监听的ConnectionFactory -->
68 <property name="connectionFactory" ref="connectionFactory" />
69 <!-- 一个是表示监听什么的Destination -->
70 <property name="destination" ref="queueDestination" />
71 <!-- 一个是接收到消息以后进行消息处理的MessageListener -->
72 <property name="messageListener" ref="activeMqMessageListener" />
73 </bean>
74
75
76 </beans>
然后可以写消息监听器,用来监听生产者生产的消息,以便实现自己的业务逻辑。
1 package com.taotao.search.listener;
2
3 import java.text.SimpleDateFormat;
4 import java.util.Date;
5
6 import javax.jms.JMSException;
7 import javax.jms.Message;
8 import javax.jms.MessageListener;
9 import javax.jms.TextMessage;
10
11 /**
12 * 接受ActiveMQ发送的消息.
13 *
14 * @ClassName: ActiveMqMessageListener.java
15 * @author: biehl
16 * @since: 2019年9月19日 下午7:55:24
17 * @Copyright: ©2019 biehl 版权所有
18 * @version: 0.0.1
19 * @Description:
20 */
21 public class ActiveMqMessageListener implements MessageListener {
22
23 @Override
24 public void onMessage(Message message) {
25 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
26 System.out.println("监听生产者生产的消息,消费者进行消息消费.......");
27 // 消息到了onMessage就接受到了消息
28 if (message instanceof TextMessage) {
29 TextMessage textMessage = (TextMessage) message;
30 try {
31 String text = textMessage.getText();
32 System.out.println(sdf.format(new Date()) + " : " + text);
33 } catch (JMSException e) {
34 e.printStackTrace();
35 }
36 }
37 }
38
39 }
由于这里只是简单的测试,如果是正式项目的话,直接加载这个配置文件,然后就可以进行消息的监听消费,我这里只是加载一下这个配置文件即可。
1 package com.taotao.search.service;
2
3 import java.io.IOException;
4
5 import org.springframework.context.ApplicationContext;
6 import org.springframework.context.support.ClassPathXmlApplicationContext;
7
8 /**
9 *
10 * @ClassName: ActiveMqConsumer.java
11 * @author: biehl
12 * @since: 2019年9月19日 下午8:10:55
13 * @Copyright: ©2019 biehl 版权所有
14 * @version: 0.0.1
15 * @Description:
16 */
17 public class ActiveMqConsumer {
18
19 // 启动spring容器。就可以实现监听生产者发送消息,消费者消费小的目的地。
20 public static void main(String[] args) {
21 // 初始化spring容器
22 ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
23 "classpath:/spring/applicationContext-activemq.xml");
24 System.out.println("spring容器加载完毕,开始监听生产者生产的消息.......");
25 try {
26 System.in.read();
27 } catch (IOException e) {
28 e.printStackTrace();
29 }
30 }
31 }
实现效果如下所示:
控制台打印如下所示,只要你生产消息,这里就可以进行消息的消费。
待续......