JMS(JAVA Message Service,java消息服务)是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
使用队列(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送100条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)
MOM(Message Oriented Middleware),分布式系统的集成,指的是利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
其实也就是为什么要使用MQ。
AMQP即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消 息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时 通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网 物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协 议。
STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息协议,是一种为 MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。STOMP提 供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。
OpenWire is our cross language Wire Protocol to allow native access to ActiveMQ from a number of different languages and platforms. The Java OpenWire transport is the default transport in ActiveMQ 4.x or later. For other languages see the following...
对于ActiveMQ的上述协议,每种协议端口都不一样,可以自行修改。
编辑activemq.xml,在transportConnectors标签中注销、修改或删除不使用的协议。
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
三、Quick start
a、到这个ActiveMQ官网下http://activemq.apache.org/activemq-5158-release.html下载ActiveMQ
b、解压直接执行bin下面的start.cmd文件
c、输入localhost:8161 检查是否正常打开
添加Maven依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.8</version>
</dependency>
生产者发送消息测试方法:
@Test
public void testQueueProducer() throws Exception {
// 1、创建一个连接工厂对象,需要指定服务的ip及端口。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616");
// 2、使用工厂对象创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
// 3、开启连接,调用Connection对象的start方法。
connection.start();
// 4、创建一个Session对象。
// 第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
// 第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用queue
Queue queue = session.createQueue("test-queue ");
// 6、使用Session对象创建一个Producer对象。
MessageProducer producer = session.createProducer(queue);
// 7、创建一个Message对象,可以使用TextMessage。
for (int i = 0; i < 50; i++) {
TextMessage textMessage = session.createTextMessage("第 "+i+ "一个ActiveMQ队列目的地的消息 ");
// 8、发送消息
producer.send(textMessage);
}
// 9、关闭资源
producer.close();
session.close();
connection.close();
}
消费者消费消息测试方法
@Test
public void testQueueConsumer() throws Exception {
// 创建一个ConnectionFactory对象连接MQ服务器
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616");
// 创建一个连接对象
Connection connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 使用Connection对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个Destination对象。queue对象
Queue queue = session.createQueue("test-queue ");
// 使用Session对象创建一个消费者对象。
MessageConsumer consumer = session.createConsumer(queue);
// 接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// 打印结果
TextMessage textMessage = (TextMessage) message;
String text;
try {
text = textMessage.getText();
System.out.println("这是接收到的消息: " + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 等待接收消息
System.in.read();
// 关闭资源
consumer.close();
session.close();
connection.close();
}
生产者发送消息测试方法:
@Test
public void testTopicProducer() throws Exception {
// 1、创建一个连接工厂对象,需要指定服务的ip及端口。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616");
// 2、使用工厂对象创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
// 3、开启连接,调用Connection对象的start方法。
connection.start();
// 4、创建一个Session对象。
// 第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
// 第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用topic
Topic topic = session.createTopic("test-topic ");
// 6、使用Session对象创建一个Producer对象。
MessageProducer producer = session.createProducer(topic);
// 7、创建一个Message对象,可以使用TextMessage。
for (int i = 0; i < 50; i++) {
TextMessage textMessage = session.createTextMessage("第 "+i+ "一个ActiveMQ队列目的地的消息 ");
// 8、发送消息
producer.send(textMessage);
}
// 9、关闭资源
producer.close();
session.close();
connection.close();
}
消费者消费消息测试方法:
@Test
public void testTopicConsumer() throws Exception {
// 创建一个ConnectionFactory对象连接MQ服务器
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616 ");
// 创建一个连接对象
Connection connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 使用Connection对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个Destination对象。topic对象
Topic topic = session.createTopic("test-topic ");
// 使用Session对象创建一个消费者对象。
MessageConsumer consumer = session.createConsumer(topic);
// 接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// 打印结果
TextMessage textMessage = (TextMessage) message;
String text;
try {
text = textMessage.getText();
System.out.println("这是接收到的消息: " + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.out.println("topic消费者启动。。。。 ");
// 等待接收消息
System.in.read();
// 关闭资源
consumer.close();
session.close();
connection.close();
}
先运行两个消费者进程(提前订阅,不然收不到发送的消息),然后运行生产者测试方法发送消息。
结果是: 两个消费者进程都可以接收到生产者发送过来的所有消息。
我们从上面代码就可以看出,点对点通信和发布订阅通信模式的区别就是创建生产者和消费者对象时提供的Destination对象不同,如果是点对点通信创建的Destination对象是Queue,发布订阅通信模式通信则是Topic。
步骤:
a、在消费端设置连接对象的clientID
b、为订阅的topic指定一个对应clientID
c、使用持久订阅的情况下面,生产者发送的消息必须是持久的。
实现代码如下:
生产者:
public class TopicProducer {
@Test
public void testTopicProducer() throws Exception {
// 1、创建一个连接工厂对象,需要指定服务的ip及端口。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 2、使用工厂对象创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
// 3、开启连接,调用Connection对象的start方法。
connection.start();
// 4、创建一个Session对象。
// 第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
// 第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用topic
Topic topic = session.createTopic("test-topic ");
// 6、使用Session对象创建一个Producer对象。
MessageProducer producer = session.createProducer(topic);
// 生产者发送的消息默认是持久的
// producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 7、创建一个Message对象,可以使用TextMessage。
for (int i = 0; i < 5; i++) {
TextMessage textMessage = session.createTextMessage("第 "+i+ "一个ActiveMQ队列目的地的消息 ");
// 8、发送消息
producer.send(textMessage);
}
// 9、关闭资源
producer.close();
session.close();
connection.close();
}
}
消费者:
public class TopicPersistenceConsumer {
@Test
public void testTopicConsumer() throws Exception {
// 创建一个ConnectionFactory对象连接MQ服务器
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建一个连接对象
Connection connection = connectionFactory.createConnection();
// ★★★设置CLientID
connection.setClientID("rm-vip1");
// 开启连接
connection.start();
// 使用Connection对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个Destination对象。topic对象
Topic topic = session.createTopic("test-topic ");
// ★★★使用Session对象创建一个持久订阅消费者对象。
MessageConsumer consumer = session.createDurableSubscriber(topic, "rm-vip1");
// 生产者发送的消息默认是持久的(注意点)
// producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// 打印结果
TextMessage textMessage = (TextMessage) message;
String text;
try {
text = textMessage.getText();
System.out.println("这是接收到的消息: " + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.out.println("topic消费者启动。。。。 ");
// 等待接收消息
System.in.read();
// 关闭资源
consumer.close();
session.close();
connection.close();
}
}
整合spring除了我们上面依赖的Jar包还要依赖
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.2.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>4.2.7.RELEASE</version>
</dependency>
发送消息的配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.155:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic" />
</bean>
</beans>
发送消息的测试方法:
@Test
public void testSpringActiveMq() throws Exception {
//初始化spring容器
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
//从spring容器中获得JmsTemplate对象
JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
//从spring容器中取Destination对象
Destination destination = (Destination) applicationContext.getBean("queueDestination");
//使用JmsTemplate对象发送消息。
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
//创建一个消息对象并返回
TextMessage textMessage = session.createTextMessage("spring activemq queue message");
return textMessage;
}
});
}
我们上面直接ApplicationContext的getBean方法获取的对象,实际在项目使用依赖注入即可。
创建一个MessageListener的实现类。
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
//取消息内容
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
接收消息的配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.168:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic" />
</bean>
<!-- 接收消息 -->
<!-- 配置监听器 -->
<bean id="myMessageListener" class="cn.e3mall.search.listener.MyMessageListener" />
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>
测试接收消息的代码
@Test
public void testQueueConsumer() throws Exception {
//初始化spring容器
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
//等待
System.in.read();
}
JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:
Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
Session.CLIENT_ACKNOWLEDGE。 客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消 费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有前5个消息都会被确认。
Session.DUPS_ACKNOWLEDGE。 该选择只是会话迟钝第确认消息的提交。如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true。
ActiveMQSession,实现了JMS的session,QueueSession, TopicSession
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE 每条消息都必须显式调用acknowledge方法确认消息。
JMS 支持以下两种消息提交模式:
DeliveryMode.PERSISTENT 指示JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失。 消息持久化在硬盘中,ActiveMQ持久化有三种方式:AMQ、KahaDB、JDBC。
DeliveryMode.NON_PERSISTENT 不要求JMS provider持久保存消息,消息存放在内存中,读写速度快,在JMS服务停止后消息会消失,没有持久化到硬盘。
1、kahaDB(默认的持久方式)(key-value)
2、AMQ [基于文件的存储方式]
activemq.xml
<persistenceAdapter>
<amqPersistenceAdapter directory="${activmq.data}/amq" maxFileLength="32mb" />
</persistenceAdapter>
写入速度比较快,容易恢复
文件大写默认为32mb
3、JDBC(基于数据库持久化)
1、在activemq.xml中配置基于jdbc持久化
注意:createTablesOnstartup="true" 代表是每次重启mq的borker都会重新创建表,这样会导致原理存储的数据丢失,所有正确做法应该是,第一次运行的时候设置为true,如果数据库中生成了表,就需要把它改为false。
<persistenceAdapter>
<jdbcPersistenceAdapter datasource="#mysqlDatasource" createTablesOnstartup="true" />
</persistenceAdapter>
2、把连接数据所需依赖加入到activemq的lib目录下面
3、在broker同级的元素下面配置数据源
<bean id="mysqlDatasource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/mq_test?useSSL=false" />
<property name="username" value="root" />
<property name="password" value="123" />
</bean>
activemq_acks 存储持久订阅的信息表
activemq_msgs 消息表
activemq_lock 锁表(用来做集群使用的,实现master选举的表)
4、基于内存持久化。
5、基于activemq的ha集群levelDB持久化。
ActiveMQ在接收消息的Client有以下几种操作的时候,需要重新传递消息:
a、Client用了transactions(事务),且在session中调用了rollback()
b、Client用了transactions,且在调用commit()之前关闭
c、Client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover()
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 定义ReDelivery(重发机制)机制 ,重发时间间隔是100毫秒,最大重发次数是3次 -->
<bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<!--是否在每次尝试重新发送失败后,增长这个等待时间 -->
<property name="useExponentialBackOff" value="true"></property>
<!--重发次数,默认为6次 这里设置为1次 -->
<property name="maximumRedeliveries" value="2"></property>
<!--重发时间间隔,默认为1秒 -->
<property name="initialRedeliveryDelay" value="1000"></property>
<!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value -->
<property name="backOffMultiplier" value="2"></property>
<!--最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第
二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 -->
<property name="maximumRedeliveryDelay" value="1000"></property>
</bean>
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>activemq-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic" />
</bean>
<!-- 接收消息 -->
<!-- 配置监听器 -->
<bean id="myMessageListener" class="com.rm.vip.mq.demo.receiver.MyMessageListener" />
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="myMessageListener" />
<!-- 配置应答模式 -->
<property name="sessionAcknowledgeMode" value="4" />
</bean>
</beans>
消费端的监听类编写
/**
* @ProjectName: mq-demo
* @Auther: GERRY
* @Date: 2019/2/25 21:35
* @Description: 消息处理类
*/
public class MyMessageListener implements SessionAwareMessageListener {
//测试方法
public void onMessage(Message message, Session session) throws JMSException {
System.out.println("--------------------开始接受消息------------------------");
TextMessage textMsg = (TextMessage) message;
try {
if ("重发机制".equals(textMsg.getText())) {
System.out.println("----------------");
throw new RuntimeException("故意抛出的异常");
}
System.out.println(textMsg.getText());
System.out.println("--------------------接受消息结束------------------------");
} catch (Exception e) {
// 触发重发信息使用,当该代码注释掉之后,该队列消息不能移出队列
session.recover();
System.out.println("异常");
}
}
}
最后进行测试:
如果发送的信息为“重发机制”,将会触发重发测试,否则正常消费
正常发送结果:
重发结果:
a.先修改节点名称,这里三台机器必须一致(下边的brokerName)
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="activemqCluster" dataDirectory="${activemq.data}">
b.三台机器的activemq.xml的持久化配置如下(leveldb)
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:62621"
zkAddress="localhost:2181"
hostname="localhost"
zkPath="/activemq/leveldb-stores"
/>
</persistenceAdapter>
c.修改三台机器的jetty.xml的端口为8161、8162、8163
d.传输了连接器,三个都需要修改61616、61617、61618
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
e.启动zookeeper服务器
f.启动ActiveMQ
a服务器配置(中转服务器,桥接)
必须加在persistenceAdapter元素前
static:(master,slave)
<networkConnectors>
<networkConnector name="local_network" uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/>
</networkConnectors>
b、c服务器
<networkConnectors>
<networkConnector name="local_a" uri="static:(tcp://127.0.0.1:61616)"/>
</networkConnectors>
<persistenceAdapter>
<kahaDB directory="D:/J2EE/mq/kahadb"/>
</persistenceAdapter>
注意:高性能方案可能丢失消息
最佳解决方案,高可用+高性能配置
可参考:https://blog.csdn.net/qq279071978/article/details/74315951