我们都知道使用Spring可以简化我们的开发,同样的使用Spring也可以集成JMS来连接ActiveMQ,这里说明一下几个需要用到的类:
1.首先是 ConnectionFactory 的实现类,Spring封装了两个连接工厂的实现类。因为JmsTemplate每次发消息都会重新创建连接、会话和productor,所以Spring提供的这两个实现类都是具有连接池的功能的。这两个实现类分别是 SingleConnectionFactory 和 CachingConnectionFactory:
SingleConnectionFactory:对于建立JMS服务器链接的请求只会一直返回同一个Connection,并且会忽略Connection的close方法调用。(org.springframework.jms.connection.SingleConnectionFactory) CachingConnectionFactory:继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer。(org.springframework.jms.connection.CachingConnectionFactory)
2.JmsTemplate 这是Spring提供的用于发送和接收消息的模板类,只需向Spring容器内注册这个类就可以使用JmsTemplate方便的操作jms,JmsTemplate 类是线程安全的,我们可以在整个应用范围使用。
3.MessageListener 消息监听器,实现一个onMessage方法,该方法只接受一个Message参数,在该方法内对消息进行处理。
创建一个Maven工程,在pom.xml文件中,添加如下依赖:
<properties>
<spring.version>4.2.5.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
然后创建Spring的配置文件,不同的角色我们希望使用不同的配置文件,但这些配置文件有些配置是一致的,所以我们先创建一个通用的配置文件把可复用的部分抽取出来,内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:annotation-config/>
<context:component-scan base-package="org.zero01.jms"/>
<!-- ActiveMQ为我们提供的ConnectionFactory -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://192.168.190.129:61616"
/>
<!-- SpringJMS为我们提供的连接池 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"
p:targetConnectionFactory-ref="targetConnectionFactory"
/>
<!-- 一个队列的目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 目的地的名称 -->
<constructor-arg value="queue"/>
</bean>
</beans>
然后创建消息生产者的配置文件,内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<import resource="common.xml"/>
<context:component-scan base-package="org.zero01.jms"/>
<!-- SpringJMS提供的用于发送和接收消息的模板类 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
p:connectionFactory-ref="connectionFactory"
/>
</beans>
基本的配置完成之后,我们先来创建消息发送服务。新建一个 ProducerService 接口,代码如下:
package org.zero01.jms.producer;
public interface ProducerService {
void sendMessage(String message);
}
新建该接口的实现类,代码如下:
package org.zero01.jms.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.TextMessage;
/**
* @program: jms-spring
* @description: 消息发送服务
* @author: 01
* @create: 2018-05-27 09:28
**/
@Service("producerService")
public class ProducerServiceImpl implements ProducerService {
@Autowired
private JmsTemplate jmsTemplate;
@Resource(name = "queueDestination")
private Destination destination;
/**
* 发送消息
* @param message
*/
public void sendMessage(String message) {
// 使用JmsTemplate来发送消息
jmsTemplate.send(destination, (session) -> {
// 创建一个文本类型的消息体
return session.createTextMessage(message);
});
System.out.println("发送消息: " + message);
}
}
然后新建一个消息生产者,代码如下:
package org.zero01.jms.producer;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @program: jms-spring
* @description: 消息生产者
* @author: 01
* @create: 2018-05-27 10:11
**/
public class AppProducer {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
ProducerService producerService = (ProducerService) context.getBean("producerService");
for (int i = 0; i < 100; i++) {
producerService.sendMessage("test" + i);
}
context.close();
}
}
运行消息生产者的代码,到ActiveMQ管理界面上,确认消息已发送到队列中,如下:
如此一来,我们的消息生产者就开发好了。
在上文中,我们已经开发好了生产者,并且也成功发送了消息到队列中。我们接下来开发一个消费者来消费这些消息,首先我们需要实现消息监听器接口:
package org.zero01.jms.consumer;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* @program: jms-spring
* @description: 消费者消息监听器
* @author: 01
* @create: 2018-05-27 10:51
**/
public class ConsumerMessageListener implements MessageListener {
/**
* 接收消息
* @param message
*/
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收消息: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
然后新建一个运行类,作为我们的消息消费者。代码如下:
package org.zero01.jms.consumer;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @program: jms-spring
* @description: 消息消费者
* @author: 01
* @create: 2018-05-27 10:57
**/
public class AppConsumer {
public static void main(String[] args){
ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
}
}
新建consumer.xml配置文件,配置内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<import resource="common.xml"/>
<bean id="consumerMessageListener" class="org.zero01.jms.consumer.ConsumerMessageListener"/>
<!-- 配置消息监听容器 -->
<bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
p:connectionFactory-ref="connectionFactory"
p:destination-ref="queueDestination"
p:messageListener-ref="consumerMessageListener"
/>
</beans>
运行消息消费者的代码,到ActiveMQ管理界面上,确认能够成功从消息队列中消费消息
如此一来,我们的消息消费者也开发好了。
以上演示的是队列模式的开发,接下来我们简单演示一下主题模式。主题模式的代码和队列模式的代码几乎是一样的,区别只在于目的地的配置。在common.xml配置文件中,新增主题模式的目的地:
<!-- 一个主题的目的地,发布/订阅模式 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 目的地的名称 -->
<constructor-arg value="topic"/>
</bean>
修改consumer.xml配置文件中,目的地的配置:
<!-- 配置消息监听容器 -->
<bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
p:connectionFactory-ref="connectionFactory"
p:destination-ref="topicDestination"
p:messageListener-ref="consumerMessageListener"
/>
最后修改一下 ProducerServiceImpl 实现类中,指定注入的目的地名称:
...
@Service("producerService")
public class ProducerServiceImpl implements ProducerService {
@Resource(name = "topicDestination")
private Destination destination;
...
}
完成以上修改后,就已经从队列模式切换到主题模式了。接下来进行一个简单的测试,由于是主题模式的原因,所以我们先运行消费者的代码,然后再运行生产者的代码。运行完毕后,到ActiveMQ管理界面上,确认消费者能够成功从主题中订阅消息,如下: