摘要: 原创出处 https://juejin.im/post/5ad46f34518825651d08265c 「预流」欢迎转载,保留摘要,谢谢!
ActiveMQ 是由 Apache 出品的一款开源消息中间件,旨在为应用程序提供高效、可扩展、稳定、安全的企业级消息通信。它的设计目标是提供标准的、面向消息的、多语言的应用集成消息通信中间件。ActiveMQ 实现了 JMS 1.1 并提供了很多附加的特性,比如 JMX 管理、主从管理、消息组通信、消息优先级、延迟接收消息、虚拟接收者、消息持久化、消息队列监控等等。其主要特性有:
因为 ActiveMQ 是完整支持 JMS 1.1 的,所以从 Java 使用者的角度其基本概念与 JMS 1.1 规范是一致的。
ActiveMQ 使用时包含的基本组件各与 JMS 是相同的:
由于这些概念在 JMS 中已介绍过,这里不再详细介绍。
ActiveMQ Broker 的主要作用是为客户端应用提供一种通信机制,为此 ActiveMQ 提供了一种连接机制,并用连接器(connector)来描述这种连接机制。ActiveMQ 中连接器有两种,一种是用于客户端与消息代理服务器(client-to-broker)之间通信的传输连接器(transport connector),一种是用于消息代理服务器之间(broker-to-broker)通信的网络连接器(network connector)。connector 使用 URI(统一资源定位符)来表示,URI 格式为:<schema name>:<hierarchical part>[?<query>][#<fragment>]
schema name 表示协议,例如:foo://username:password@example.com:8042/over/there/index.dtb?type=animal&name=narwhal#nose
其中 schema name 部分是 foo,hierarchical part 是 username:password@example.com:8042/over/there/index.dtb,query 是 type=animal&name=narwhal,fragment 是 nose。
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector name="ws" uri="ws://localhost:61614/" />
</transportConnectors>
传输连接器定义在<transportConnectors>
元素中,一个<transportConnector>
元素定义一个特定的连接器,一个连接器必须有自己唯一的名字和 URI 属性,但discoveryUri
属性是可选的。目前在 ActiveMQ 最新的5.15版本中常用的传输连接器连接协议有:vm、tcp、udp、multicast、nio、ssl、http、https、websocket、amqp、mqtt、stomp 等等
每个协议的具体配置见官网(http://activemq.apache.org/uri-protocols.html )。除了以上这些基本协议之外 ActiveMQ 还支持一些高级协议也可以通过 URI 的方式进行配置,比如 Failover 和 Fanout 。
使用网络连接器的简单场景
如图所示,服务器 S1 和 S2 通过 NewworkConnector 相连,生产者 P1 发送的消息,消费者 C3 和 C4 都可以接收到,而生产者 P3 发送的消息,消费者 C1 和 C2 也可以接收到。要使用网络连接器的功能需要在服务器 S1 的 activemq.xml 中的 broker 节点下添加如下配置(假设192.168.11.23:61617 为 S2 的地址):
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.11.23:61617)"/>
</networkConnectors>
如果只是这样,S1 可以将消息发送到 S2,但这只是单方向的通信,发送到 S2 上的的消息还不能发送到 S1 上。如果想 S1 也收到从 S2 发来的消息需要在 S2 的 activemq.xml 中的 broker 节点下也添加如下配置(假设192.168.11.45:61617为 S1 的地址):
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.11.45:61617)"/>
</networkConnectors>
这样,S1和S2就可以双向通信了。目前在 ActiveMQ 最新的5.15版本中常用的网络连接器协议有 static 和 multicast 两种。
static://(tcp://ip:61616,tcp://ip2:61616)
对这块感兴趣的话可以看官方文档:http://activemq.apache.org/networks-of-brokers.html
JMS 规范中消息的分发方式有两种:非持久化和持久化。对于非持久化消息 JMS 实现者须保证尽最大努力分发消息,但消息不会持久化存储;而持久化方式分发的消息则必须进行持久化存储。非持久化消息常用于发送通知或实时数据,当你比较看重系统性能并且即使丢失一些消息并不影响业务正常运作时可选择非持久化消息。持久化消息被发送到消息服务器后如果当前消息的消费者并没有运行则该消息继续存在,只有等到消息被处理并被消息消费者确认之后,消息才会从消息服务器中删除。
对以上这两种方式 ActiveMQ 都支持,并且还支持通过缓存在内存中的中间状态消息的方式来恢复消息。概括起来看 ActiveMQ 的消息存储有三种:存储到内存、存储到文件、存储到数据库。具体使用上 ActiveMQ 提供了一个插件式的消息存储机制,类似于消息的多点传播,主要实现了如下几种:
JMS 规范中传递消息的方式有两种,一种是点对点模型的队列(Queue)方式,另一种是发布订阅模型的主题(Topic)方式。下面看下用 ActiveMQ 以主题方式传递消息的 Java 示例。
Java 工程中需要引入 ActiveMQ 包的依赖,jar 包版本同你安装 ActiveMQ 版本一致即可:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.2</version>
</dependency>
package org.study.mq.activeMQ;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TopicPublisher {
/**
* 默认用户名
*/
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认密码
*/
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
//创建连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//创建会话,不需要事务
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建 Topic,用作消费者订阅消息
Topic myTestTopic = session.createTopic("activemq-topic-test1");
//消息生产者
MessageProducer producer = session.createProducer(myTestTopic);
for (int i = 1; i <= 3; i++) {
TextMessage message = session.createTextMessage("发送消息 " + i);
producer.send(myTestTopic, message);
}
//关闭资源
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
在 Topic 模式中消息生产者是用于发布消息的,绝大部分代码与 Queue 模式中相似,不同的是本例中基于 Session 创建的是主题(Topic),该主题作为消费者消费消息的目的地。
package org.study.mq.activeMQ;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TopicSubscriber {
/**
* 默认用户名
*/
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认密码
*/
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
//创建连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//创建会话,不需要事务
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建 Topic
Topic myTestTopic = session.createTopic("activemq-topic-test1");
MessageConsumer messageConsumer = session.createConsumer(myTestTopic);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("消费者1 接收到消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
MessageConsumer messageConsumer2 = session.createConsumer(myTestTopic);
messageConsumer2.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("消费者2 接收到消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
MessageConsumer messageConsumer3 = session.createConsumer(myTestTopic);
messageConsumer3.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("消费者3 接收到消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//让主线程休眠100秒,使消息消费者对象能继续存活一段时间从而能监听到消息
Thread.sleep(100 * 1000);
//关闭资源
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
为了展示主题模式中消息广播给多个订阅者的功能,这里创建了三个消费者对象并订阅了同一个主题,比较特殊的是最后让主线程休眠了一段时间,这么做的目的是让消费者对象能继续存活,从而使控制台能打印出监听到的消息内容。
在 ActiveMQ 的 bin 目录下直接执行activemq start
即启动了 ActiveMQ
需要先运行 TopicSubscriber 类的 main 方法,这样发布者发布消息的时候订阅者才能接收到消息,如果将执行顺序倒过来则消息先发布出去但没有任何订阅者在运行,则看不到消息被消费了。
接着运行 TopicPublisher 类的 main 方法,向主题中发布3条消息,然后可以在 TopicSubscriber 后台看到接收到的消息内容:
消费者接收到消息
在实际项目中如果使用原生的 ActiveMQ API 开发显然比较啰嗦,这中间创建连接工厂、创建连接之类代码完全可以抽取出来由框架统一做,这些事情 Spring 也想到了并帮我们做了。ActiveMQ 完全支持基于 Spring 的方式 配置 JMS 客户端和服务器,下面的例子展示一下在 Spring 中如何使用队列模式和主题模式传递消息。
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.0</version>
</dependency>
工程中除了 activemq 的包之外还要添加 Spring 支持 JMS 的包。由于 connection、session、producer 的创建会消耗大量系统资源,为此这里使用 连接池 来复用这些资源,所以还要添加 activemq-pool 的依赖。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<context:component-scan base-package="org.study.mq.activeMQ.spring"/>
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jmsFactory"/>
<property name="sessionCacheSize" value="1"/>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
<bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="spring-queue"/>
</bean>
<bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic"/>
</bean>
<bean id="queueListener" class="org.study.mq.activeMQ.spring.QueueListener"/>
<bean id="topic1Listener" class="org.study.mq.activeMQ.spring.Topic1Listener"/>
<bean id="topic2Listener" class="org.study.mq.activeMQ.spring.Topic2Listener"/>
<bean id="queueContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destination" ref="testQueue"/>
<property name="messageListener" ref="queueListener"/>
</bean>
<bean id="topic1Container"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destination" ref="testTopic"/>
<property name="messageListener" ref="topic1Listener"/>
</bean>
<bean id="topic2Container"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destination" ref="testTopic"/>
<property name="messageListener" ref="topic2Listener"/>
</bean>
</beans>
下面的项目示例中的 Java 代码采用注解的方式,这也是现在很多程序员的习惯用法,所以在配置文件一开始定义注解扫描包路径org.study.mq.activeMQ.spring
,您可以根据自己实际情况修改包名称,本例中的所有 Java 代码都放在该包之下。
接下来定义了一个 JMS 工厂 bean,采用的是池化连接工厂类org.apache.activemq.pool.PooledConnectionFactory
,实际就是对内部的 ActiveMQ 连接工厂增加了连接池的功能,从其内部配置可以看到就是对org.apache.activemq.ActiveMQConnectionFactory
的功能封装,而ActiveMQConnectionFactory
类则比较熟悉了,就是上面 Java 访问 ActiveMQ 示例一开始创建连接工厂时使用的类。brokerURL 属性配置的就是连接服务器的协议和服务器地址。接下来的 cachingConnectionFactory 是实际项目代码中常用的,对连接工厂的又一层增强,使用连接的缓存功能以提升效率,读者可酌情选择使用。
jmsTemplate 就是 Spring 解决 JMS 访问时冗长重复代码的方案,它需要配置的两个主要属性是 connectionFactory 和 messageConverter,通过 connectionFactory 获取连接、会话等对象,messageConverter 则是配置消息转换器,因为通常消息在发送前和接收后都需要进行一个前置和后置处理,转换器便进行这个工作。这样实际代码直接通过 jmsTemplate 来发送和接收消息,而每次发送接收消息时创建连接工厂、创建连接、创建会话等工作都由 Spring 框架做了。
有了 JMS 模板还需要知道队列和主题作为实际发送和接收消息的目的地,所以接下来定义了 testQueue 和 testTopic 作为两种模式的示例。而异步接收消息时则需要提供 MessageListener 的实现类,所以定义了 queueListener 作为队列模式下异步接收消息的监听器,topic1Listener 和 topic2Listener 作为主题模式下异步接收消息的监听器,主题模式用两个监听器是为了演示多个消费者时都能收到消息。最后的 queueContainer、topic1Container、topic2Container 用于将消息监听器绑定到具体的消息目的地上。
下面是使用 JMS 模板处理消息的消息服务类
package org.study.mq.activeMQ.spring;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.*;
@Service
public class MessageService {
@Resource(name = "jmsTemplate")
private JmsTemplate jmsTemplate;
@Resource(name = "testQueue")
private Destination testQueue;
@Resource(name = "testTopic")
private Destination testTopic;
//向队列发送消息
public void sendQueueMessage(String messageContent) {
jmsTemplate.send(testQueue, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage msg = session.createTextMessage();
// 设置消息内容
msg.setText(messageContent);
return msg;
}
});
}
//向主题发送消息
public void sendTopicMessage(String messageContent) {
jmsTemplate.send(testTopic, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage msg = session.createTextMessage();
// 设置消息内容
msg.setText(messageContent);
return msg;
}
});
}
}
@Service 将该类声明为一个服务,实际项目中很多服务代码也类似。通过 Resource 注解直接将上面配置文件中定义的 jmsTemplate 引入到 MessageService 类中就可以直接使用了,testQueue 和 testTopic 也是类似,服务类中直接引入配置文件中定义好的队列和主题。重点是下面的两个发送消息的方法,sendQueueMessage 向队列发送消息,sendTopicMessage 向主题发送消息,两种模式都使用了 jmsTemplate 的 send 方法,send 方法第1个参数是javax.jms.Destination
类型,表示消息目的地。由于javax.jms.Queue
和javax.jms.Topic
都继承了javax.jms.Destination
接口,所以该方法对队列模式和主题模式都适用。send 方法的第2个参数是org.springframework.jms.core.MessageCreator
,这里使用了匿名内部类的方式创建对象,从支持的 Session 对象中创建文本消息,这样就可以发送消息了。可以看到无论是队列还是主题,通过 Spring 框架来发送消息的代码比之前的 Java 代码示例简洁了很多。
package org.study.mq.activeMQ.spring;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class QueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String messageStr = txtMsg.getText();
System.out.println("队列监听器接收到文本消息:" + messageStr);
} catch (JMSException e) {
e.printStackTrace();
}
} else {
throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
}
}
}
队列消息监听器在收到消息时校验是否是文本消息类型,是的话则打印出内容。
package org.study.mq.activeMQ.spring;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class Topic1Listener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String messageStr = txtMsg.getText();
System.out.println("主题监听器1 接收到文本消息:" + messageStr);
} catch (JMSException e) {
e.printStackTrace();
}
} else {
throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
}
}
}
package org.study.mq.activeMQ.spring;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class Topic2Listener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String messageStr = txtMsg.getText();
System.out.println("主题监听器2 接收到文本消息:" + messageStr);
} catch (JMSException e) {
e.printStackTrace();
}
} else {
throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
}
}
}
主题监听器的代码与队列监听器类似,只是打印时通过不同字符串表示当前是不同监听器接收的消息。
为了演示例子,写了一个 StartApplication 类,在 main 方法中加载 Spring ,获取到 MessageService 服务之后调用 sendQueueMessage 和 sendTopicMessage 方法发送消息。
package org.study.mq.activeMQ.spring;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class StartApplication {
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("spring-context.xml");
MessageService messageService = (MessageService) ctx.getBean("messageService");
messageService.sendQueueMessage("我的测试消息1");
messageService.sendTopicMessage("我的测试消息2");
messageService.sendTopicMessage("我的测试消息3");
}
}
启动好 activeMQ 服务之后运行 StartApplication 类,在控制台看到接收到文本消息:
接收到文本消息
队列监听器监听到了一条消息,两个主题监听器分别监听到了两条消息。