1.使用同步方式:消费者会一直等待生产者发送消息或者超市。因其是阻塞式接收消息,故当第一次接收生产者发送过来的消息并消费后,第二次生产者提供的消息不再消费。 2.使用异步监听方式:消费者通过注册监听器,每当生产者有新的消息提供过来是会触发MessageListener的回调方法onMessage()方法。便于后续消息处理。 小结:使用同步方式相当于一次性消费,在实际生产环境中不可能采用这种方式,不至于每次推送数据都重启不服务,不符合实际运用场景,推荐使用注册监听方式。
<!-- activemq 相关maven依赖 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.14.RELEASE</version>
</dependency>
applicationContext-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="${activemq.brokerURL}"
userName="${activemq.userName}"
password="${activemq.password}"/>
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"/>
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100"/>
</bean>
<!-- 定义消息队列(Queue) -->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg>
<value>ZA_GAMES</value>
</constructor-arg>
</bean>
<!-- Spring JmsTemplate 的消息生产者 start-->
<!-- 定义JmsTemplate的Queue类型 -->
<!--<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">-->
<!--<!– 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 –>-->
<!--<constructor-arg ref="connectionFactory"/>-->
<!--<!– 非pub/sub模型(发布/订阅),即队列模式 (这种模式被概括为:只有一个消费者将获得消息)–>-->
<!--<property name="pubSubDomain" value="false"/>-->
<!--</bean>-->
<!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestination" ref="demoQueueDestination"/>
<property name="receiveTimeout" value="10000"/>
<!-- true是topic,false是queue,默认是false,此处显示写出false -->
<property name="pubSubDomain" value="false"/>
</bean>
<!-- 定义JmsTemplate的Topic类型 -->
<!--<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">-->
<!--<!– 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 –>-->
<!--<constructor-arg ref="connectionFactory"/>-->
<!--<!– pub/sub模型(发布/订阅),即发布者/订阅者模型(这种模式被概括为:多个消费者可以获得消息) –>-->
<!--<property name="pubSubDomain" value="true"/>-->
<!--</bean>-->
<!--Spring JmsTemplate 的消息生产者 end-->
<!-- 配置消息队列监听者(Queue) -->
<bean id="queueMessageListener" class="com.???.QueueMessageListener"/>
<!-- 消息消费者 采用异步接收 start-->
<!-- 显示注入消息监听容器(Queue) -->
<bean id="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!--配置连接工厂-->
<property name="connectionFactory" ref="connectionFactory"/>
<!--自定义监听器-->
<property name="messageListener" ref="queueMessageListener"/>
<!--监听目标-->
<property name="destination" ref="demoQueueDestination"/>
</bean>
<!--<!– 定义Topic监听器 –>-->
<!--<jms:listener-container destination-type="topic">-->
<!--<jms:listener destination="test.topic" ref="topicReceiver1"/>-->
<!--<jms:listener destination="test.topic" ref="topicReceiver2"/>-->
<!--</jms:listener-container>-->
<!-- 消息消费者 end -->
</beans>
import com.sun.xml.internal.bind.v2.TODO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
/**
* QueueMessageListener
*
* @author weixiang.wu
* @date 2018 -04-03 10:01
*/
public class QueueMessageListener implements MessageListener {
private final static Logger logger = LoggerFactory.getLogger(QueueMessageListener.class);
/**
* 当收到消息时,自动调用该方法。
*
* @param message
*/
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage tm = (TextMessage) message;
try {
logger.info("QueueMessageListener 监听到" +
"队列[" + String.valueOf(message.getJMSDestination()) + "] 消息:" + tm.getText());
} catch (JMSException e) {
logger.error("队列监听发生异常", e.getMessage());
}
}
if (message instanceof ObjectMessage) {
ObjectMessage objectMessage = (ObjectMessage) message;
logger.info("当前队列监听到对象,待有相关处理...");
// TODO do something ...
}
}
}