JMS即Java Message Service,Java消息服务。主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。
点对点:一个生产者对应一个消费者;
发布/订阅模式,一个生产者对应多个消费者。
maven:
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>jsr250-api</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
</dependencies>
下载activeMQ(http://activemq.apache.org/download.html),解压运行bin目录activemq.bat启动activeMQ。
ConnectionFactory:SingleConnectionFactory和CachingConnectionFactory。
SingleConnectionFactory:单连接,忽略Connection的close方法调用。
CachingConnectionFactory:继承SingleConnectionFactory,新增缓存功能,可以缓存Session、MessageProducer和MessageConsumer。
1 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"/>
配置:
1 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
2 <bean id="mqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
3 <property name="brokerURL" value="tcp://localhost:61616"/>
4 </bean>
5
6 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
7 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
8 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
9 <property name="targetConnectionFactory" ref="mqConnectionFactory"/>
10 </bean>
当使用PooledConnectionFactory连接池:
1 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
2 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
3 <property name="brokerURL" value="tcp://localhost:61616"/>
4 </bean>
5
6 <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
7 <property name="connectionFactory" ref="targetConnectionFactory"/>
8 <property name="maxConnections" value="10"/>
9 </bean>
10
11 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
12 <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
13 </bean>
1 <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
2 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
3 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
4 <property name="connectionFactory" ref="connectionFactory"/>
5 </bean>
队列:
1 <!--这个是队列目的地,点对点的-->
2 <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
3 <constructor-arg>
4 <value>queue</value>
5 </constructor-arg>
6 </bean>
7 <!--这个是主题目的地,一对多的-->
8 <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
9 <constructor-arg value="topic"/>
10 </bean>
生产者:
1 package com.tiantian.springintejms.service.impl;
2
3 import javax.annotation.Resource;
4 import javax.jms.Destination;
5 import javax.jms.JMSException;
6 import javax.jms.Message;
7 import javax.jms.Session;
8
9 import org.springframework.jms.core.JmsTemplate;
10 import org.springframework.jms.core.MessageCreator;
11 import org.springframework.stereotype.Component;
12
13 import com.tiantian.springintejms.service.ProducerService;
14
15 @Component
16 public class ProducerServiceImpl implements ProducerService {
17
18 private JmsTemplate jmsTemplate;
19
20 public void sendMessage(Destination destination, final String message) {
21 System.out.println("---------------生产者发送消息-----------------");
22 System.out.println("---------------生产者发了一个消息:" + message);
23 jmsTemplate.send(destination, new MessageCreator() {
24 public Message createMessage(Session session) throws JMSException {
25 return session.createTextMessage(message);
26 }
27 });
28 }
29
30 public JmsTemplate getJmsTemplate() {
31 returnjmsTemplate;
32 }
33
34 @Resource
35 public void setJmsTemplate(JmsTemplate jmsTemplate) {
36 this.jmsTemplate = jmsTemplate;
37 }
38
39 }
定义处理消息的MessageListener:实现JMS规范中的MessageListener接口
1 package com.tiantian.springintejms.listener;
2
3 import javax.jms.JMSException;
4 import javax.jms.Message;
5 import javax.jms.MessageListener;
6 import javax.jms.TextMessage;
7
8 public class ConsumerMessageListener implements MessageListener {
9
10 public void onMessage(Message message) {
11 //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换
12 TextMessage textMsg = (TextMessage) message;
13 System.out.println("接收到一个纯文本消息。");
14 try {
15 System.out.println("消息内容是:" + textMsg.getText());
16 } catch (JMSException e) {
17 e.printStackTrace();
18 }
19 }
20
21 }
消息监听容器配置:
1 <!--这个是队列目的地-->
2 <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
3 <constructor-arg>
4 <value>queue</value>
5 </constructor-arg>
6 </bean>
7 <!-- 消息监听器 -->
8 <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>
9
10 <!-- 消息监听容器 -->
11 <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
12 <property name="connectionFactory" ref="connectionFactory" />
13 <property name="destination" ref="queueDestination" />
14 <property name="messageListener" ref="consumerMessageListener" />
15 </bean>
完整配置:
1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
4 xmlns:jms="http://www.springframework.org/schema/jms"
5 xsi:schemaLocation="http://www.springframework.org/schema/beans
6 http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
7 http://www.springframework.org/schema/context
8 http://www.springframework.org/schema/context/spring-context-3.0.xsd
9 http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
10 http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
11
12 <context:component-scan base-package="com.tiantian" />
13
14 <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
15 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
16 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
17 <property name="connectionFactory" ref="connectionFactory"/>
18 </bean>
19
20 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
21 <bean id="mqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
22 <property name="brokerURL" value="tcp://localhost:61616"/>
23 </bean>
24
25 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
26 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
27 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
28 <property name="targetConnectionFactory" ref="mqConnectionFactory"/>
29 </bean>
30
31 <!--这个是队列目的地-->
32 <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
33 <constructor-arg>
34 <value>queue</value>
35 </constructor-arg>
36 </bean>
37 <!-- 消息监听器 -->
38 <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>
39 <!-- 消息监听容器 -->
40 <bean id="jmsContainer"
41 class="org.springframework.jms.listener.DefaultMessageListenerContainer">
42 <property name="connectionFactory" ref="connectionFactory" />
43 <property name="destination" ref="queueDestination" />
44 <property name="messageListener" ref="consumerMessageListener" />
45 </bean>
46 </beans>
测试:
1 package com.tiantian.springintejms.test;
2
3 import javax.jms.Destination;
4
5 import org.junit.Test;
6 import org.junit.runner.RunWith;
7 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.beans.factory.annotation.Qualifier;
9 import org.springframework.test.context.ContextConfiguration;
10 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
11 import com.tiantian.springintejms.service.ProducerService;
12
13 @RunWith(SpringJUnit4ClassRunner.class)
14 @ContextConfiguration("/applicationContext.xml")
15 public class ProducerConsumerTest {
16
17 @Autowired
18 private ProducerService producerService;
19 @Autowired
20 @Qualifier("queueDestination")
21 private Destination destination;
22
23 @Test
24 public void testSend() {
25 for (int i=0; i<2; i++) {
26 producerService.sendMessage(destination, "你好,生产者!这是消息:" + (i+1));
27 }
28 }
29
30 }