前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >activeMQ 原

activeMQ 原

作者头像
wuweixiang
发布2018-08-14 14:23:10
4970
发布2018-08-14 14:23:10
举报
文章被收录于专栏:吴伟祥吴伟祥

ActiveMQ 消费者接收消息两种方式

1.使用同步方式:消费者会一直等待生产者发送消息或者超市。因其是阻塞式接收消息,故当第一次接收生产者发送过来的消息并消费后,第二次生产者提供的消息不再消费。 2.使用异步监听方式:消费者通过注册监听器,每当生产者有新的消息提供过来是会触发MessageListener的回调方法onMessage()方法。便于后续消息处理。 小结:使用同步方式相当于一次性消费,在实际生产环境中不可能采用这种方式,不至于每次推送数据都重启不服务,不符合实际运用场景,推荐使用注册监听方式。

代码语言:javascript
复制
       <!-- activemq 相关maven依赖 -->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-core</artifactId>
                <version>5.7.0</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version>4.3.1.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.apache.xbean</groupId>
                <artifactId>xbean-spring</artifactId>
                <version>4.5</version>
            </dependency>


            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version>4.3.14.RELEASE</version>
            </dependency>

applicationContext-jms.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

    <!-- ActiveMQ 连接工厂 -->
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
    <amq:connectionFactory id="amqConnectionFactory"
                           brokerURL="${activemq.brokerURL}"
                           userName="${activemq.userName}"
                           password="${activemq.password}"/>

    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100"/>
    </bean>


    <!-- 定义消息队列(Queue) -->
    <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 设置消息队列的名字 -->
        <constructor-arg>
            <value>ZA_GAMES</value>
        </constructor-arg>
    </bean>


    <!-- Spring JmsTemplate 的消息生产者 start-->

    <!-- 定义JmsTemplate的Queue类型 -->
    <!--<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">-->
    <!--&lt;!&ndash; 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 &ndash;&gt;-->
    <!--<constructor-arg ref="connectionFactory"/>-->
    <!--&lt;!&ndash; 非pub/sub模型(发布/订阅),即队列模式 (这种模式被概括为:只有一个消费者将获得消息)&ndash;&gt;-->
    <!--<property name="pubSubDomain" value="false"/>-->
    <!--</bean>-->
    <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="defaultDestination" ref="demoQueueDestination"/>
        <property name="receiveTimeout" value="10000"/>
        <!-- true是topic,false是queue,默认是false,此处显示写出false -->
        <property name="pubSubDomain" value="false"/>
    </bean>

    <!-- 定义JmsTemplate的Topic类型 -->
    <!--<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">-->
    <!--&lt;!&ndash; 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 &ndash;&gt;-->
    <!--<constructor-arg ref="connectionFactory"/>-->
    <!--&lt;!&ndash; pub/sub模型(发布/订阅),即发布者/订阅者模型(这种模式被概括为:多个消费者可以获得消息) &ndash;&gt;-->
    <!--<property name="pubSubDomain" value="true"/>-->
    <!--</bean>-->

    <!--Spring JmsTemplate 的消息生产者 end-->


    <!-- 配置消息队列监听者(Queue) -->
    <bean id="queueMessageListener" class="com.???.QueueMessageListener"/>


    <!-- 消息消费者 采用异步接收 start-->
    <!-- 显示注入消息监听容器(Queue) -->
    <bean id="queueListenerContainer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!--配置连接工厂-->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!--自定义监听器-->
        <property name="messageListener" ref="queueMessageListener"/>
        <!--监听目标-->
        <property name="destination" ref="demoQueueDestination"/>
    </bean>
    <!--&lt;!&ndash; 定义Topic监听器 &ndash;&gt;-->
    <!--<jms:listener-container destination-type="topic">-->
    <!--<jms:listener destination="test.topic" ref="topicReceiver1"/>-->
    <!--<jms:listener destination="test.topic" ref="topicReceiver2"/>-->
    <!--</jms:listener-container>-->

    <!-- 消息消费者 end -->
</beans>
代码语言:javascript
复制
import com.sun.xml.internal.bind.v2.TODO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;

/**
 * QueueMessageListener
 *
 * @author weixiang.wu
 * @date 2018 -04-03 10:01
 */
public class QueueMessageListener implements MessageListener {

    private final static Logger logger = LoggerFactory.getLogger(QueueMessageListener.class);

    /**
     * 当收到消息时,自动调用该方法。
     *
     * @param message
     */
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage tm = (TextMessage) message;
            try {
                logger.info("QueueMessageListener 监听到" +
                    "队列[" + String.valueOf(message.getJMSDestination()) + "] 消息:" + tm.getText());
            } catch (JMSException e) {
                logger.error("队列监听发生异常", e.getMessage());
            }
        }
        if (message instanceof ObjectMessage) {
            ObjectMessage objectMessage = (ObjectMessage) message;
            logger.info("当前队列监听到对象,待有相关处理...");
//            TODO do something ...
        }


    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ActiveMQ 消费者接收消息两种方式
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档