首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ActiveMQ学习总结------Spring整合ActiveMQ 04

ActiveMQ学习总结------Spring整合ActiveMQ 04

作者头像
Arebirth
发布2020-06-19 15:36:38
4600
发布2020-06-19 15:36:38
举报

通过前几篇的学习,相信大家已经对我们的ActiveMQ的原生操作已经有了个深刻的概念,

那么这篇文章就来带领大家一步一步学习下ActiveMQ结合Spring的实战操作


注:本文将省略一部分与ActiveMQ无关的spring、mvc等代码,学习者需有SSM框架基础

  所有的注释均写在代码里面,请阅读代码并多多阅读注释!

一 创建生产者

1 所需依赖jar包

     <activemq.version>5.9.0</activemq.version>
        <xbean.version>4.5</xbean.version>
        <jms.version>4.1.6.RELEASE</jms.version>
        <activemq-pool.version>5.9.0</activemq-pool.version>

2.配置与spring整合的配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!--
        需要创建一个链接工厂,链接ActiveMQ,ActiveMQConnectionFactory
        需要依赖ActiveMQ提供的amq标签

        amq:connectionFactory 是bean标签的子标签,会在spring容器中创建一个bean对象,
        可以为对象命名为
            类似:<bean id="" class="ActiveMQConnectionFactory" />

        所以我们这里边使用简便的方式amq:connectionFactory
        -->
    <!--  <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
          <property name="brokerURL" value="tcp://169.254.18.20:61616"/>
          <property name="userName" value="admin"/>
          <property name="password" value="admin"/>
      </bean>-->
    <amq:connectionFactory brokerURL="tcp://169.254.18.20:61616"
                           userName="admin" password="admin" id="amqConnectionFactory"/>


    <!--
        配置池化的ConnectionFactory,为链接ActiveMQ的connectionFactory提供连接池
        我们一般不直接用链接工厂,原因是:这个connectionFactory不会复用connection、session、produce
            consumer,每次连接都需要重新创建conneciton,再创建session,然后调用session创建新的
            producer或者consumer然后用完之后依次关闭,比较浪费资源。
        我们一般用这个链接工厂作为其他拥有更高级(缓存)的链接工厂的参数。

        又因为PooledConnectionFactory会缓存conneciton,session,producer,不会缓存consumer,所以更适合发送者
      -->
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <!--引用上面的链接工厂-->
        <property name="connectionFactory" ref="amqConnectionFactory"/>
        <!--连接数量-->
        <property name="maxConnections" value="10"/>
        <!--超时时间,最后使用时间+idleTimeout > 当前时间,连接关闭-->
        <property name="idleTimeout" value="30000"/>
        <!--回收时间,连接创建时间+expirtyTimeout > 当前时间,连接关闭-->
        <property name="expiryTimeout" value="30000"/>
        <!--如果连接池是满的,则阻塞-->
        <property name="blockIfSessionPoolIsFull" value="true"/>
        <!-- 每个链接最大的session(会话)数量-->
        <property name="maximumActiveSessionPerConnection" value="10"/>
    </bean>

    <!--
        Spring管理JMS相关代码的时候,必须依赖jms标签库、spring-jms提供的标签库

        定义Spring-JMS中的连接工厂对象 CachingConnectionFactory -spring框架提供的连接工厂对象
        不能真正的访问MOM容器,类似一个工厂的代理对象  需要提供一个真实工厂,实现MOM
        容器的连接访问

        配置有缓存的ConnectionFactory,session的缓存大小可以指定

        默认情况下cachingConnnectionFactory之缓存一个session,针对低并发足够了
    -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <property name="sessionCacheSize" value="3"/>
    </bean>

    <!--  jmsTemplate 点对点  -->
    <bean id="jmsQuequTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--     给定连接工厂,必须是spring创建的连接工厂   -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!--     可选  默认目的地命名   -->
        <property name="defaultDestinationName" value="test-spring-topic"/>
        <!-- 设置消息确认机制-
             * * Session.AUTO_ACKNOWLEDGE:自动消息确认机制
             * * Session.CLIENT_ACKNOWLEDGE:客户端确认机制
             * * Session.DUPS_OK_ACKNOWLEDGE:由副本的客户端确认消息机制
        -->
        <property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>

        <!--开启事务,则Ack无效-->
        <!--        <property name="sessionTransacted" value="true"/>-->

        <!-- 由于receive方法时同步的,所以这里对接收设置超时时间-->
        <property name="receiveTimeout" value="60000"/>

        <!--    开启订阅    -->
        <!--        <property name="pubSubDomain" value="true"/>-->
    </bean>

    <!--  jmsTemplate Topic模式  -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--     给定连接工厂,必须是spring创建的连接工厂   -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!--     可选  默认目的地命名   -->
        <property name="defaultDestinationName" value="test-spring-topic"/>
        <!-- 设置消息确认机制-
             * * Session.AUTO_ACKNOWLEDGE:自动消息确认机制
             * * Session.CLIENT_ACKNOWLEDGE:客户端确认机制
             * * Session.DUPS_OK_ACKNOWLEDGE:由副本的客户端确认消息机制
        -->
        <property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>

        <!--开启事务,则Ack无效-->
        <!--<property name="sessionTransacted" value="true"/>-->
        <!-- 开启订阅 也就是Topic模式-->
        <property name="pubSubDomain" value="true"/>
    </bean>

    <!--  JMSTemplate消息生产者  -->
    <bean id="producer" class="cn.arebirth.mq.provider.Producer"/>
</beans>

3 为了松耦合抽取出provider

package cn.arebirth.mq.provider;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import javax.annotation.Resource;

public class Producer {
    @Resource(name = "jmsTopicTemplate")
    private JmsTemplate jmsTemplate;

    /**
     * 发送消息
     *
     * @param destinationName 目的地名称
     * @param messageCreator  消息
     */
    public void sendMessage(String destinationName, MessageCreator messageCreator) {
        if (null != destinationName) {
            jmsTemplate.send(messageCreator);
            return;
        }
        jmsTemplate.send(destinationName, messageCreator);
    }
}

4 service

通过调用自定义的provider,然后使用了里面的匿名类来创建了一个对象消息,

Uuser为我自己定义的对象,可以任意自定义

package cn.arebirth.service.impl;

import cn.arebirth.mq.provider.Producer;
import cn.arebirth.pojo.Users;
import cn.arebirth.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

@Service
public class UserServiceImpl implements UserService {

    //获取自定义的provider
    @Autowired
    private Producer producer;

    @Override
    public void addUser(String destinationName, Users user) {
        producer.sendMessage(destinationName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage(user);
            }
        });
    }
}

5 Controller

@Controller
@RequestMapping("/user")
public class UserController {

    @Autowired
    private UserService userService;
    
    @RequestMapping("/addUser")
    public String addUser(Users user){
        this.userService.addUser("test-spring-topic",user);
        return "ok";
    }
}

然后我们启动tomcat执行,

 由于我们的是Topic模式,所以我们需要在Topic模式里面查看。

 出现这样的内容,证明我们的provider发布成功了!

仔细看上面的配置文件我们可以发现

jmsQueueTemplate和jmsTopicTemplate还是有区别的

在我们的topic里面会有这行代码

如果pubSubDomain为true则代表为topic模式,false为queue也就是点对点,我们可以看下源码介绍

/**
 * Configure the destination accessor with knowledge of the JMS domain used.
 * Default is Point-to-Point (Queues).  默认为点对点也就是queue模式
 * <p>This setting primarily indicates what type of destination to resolve
 * if dynamic destinations are enabled.
 * @param pubSubDomain "true" for the Publish/Subscribe domain ({@link javax.jms.Topic Topics}), 
 * "false" for the Point-to-Point domain ({@link javax.jms.Queue Queues})
 * @see #setDestinationResolver
 */
public void setPubSubDomain(boolean pubSubDomain) {
   this.pubSubDomain = pubSubDomain;
}

这是官方代码介绍,默认是queue模式,设置为true的话就是topic模式

二 创建消费者

1 添加pom文件依赖

与provider不同的是,不需要连接池

 <activemq.version>5.9.0</activemq.version>
        <xbean.version>4.5</xbean.version>
        <jms.version>4.1.6.RELEASE</jms.version>
<!-- ActiveMQ客户端完整jar包依赖 -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>${activemq.version}</version>
</dependency>
<!-- ActiveMQ和Spring整合配置文件标签处理jar包依赖 -->
<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>${xbean.version}</version>
</dependency>
<!-- Spring-JMS插件相关jar包依赖 -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${jms.version}</version>
</dependency>

2 整合spring的配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- 需要创建一个连接工厂,连接ActiveMQ. ActiveMQConnectionFactory. 需要依赖ActiveMQ提供的amq标签 -->
    <!-- amq:connectionFactory 是bean标签的子标签, 会在spring容器中创建一个bean对象.
        可以为对象命名. 类似: <bean id="" class="ActiveMQConnectionFactory"></bean>
     -->
    <amq:connectionFactory brokerURL="tcp://169.254.18.20:61616"
                           userName="admin" password="admin" id="amqConnectionFactory"/>

    <!-- spring管理JMS相关代码的时候,必须依赖jms标签库. spring-jms提供的标签库. -->
    <!-- 定义Spring-JMS中的连接工厂对象
        CachingConnectionFactory - spring框架提供的连接工厂对象. 不能真正的访问MOM容器.
            类似一个工厂的代理对象. 需要提供一个真实工厂,实现MOM容器的连接访问.

        默认情况下,cachingConnectionFactory之缓存一个session,对于低并发足以
     -->

    <bean id="connectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <property name="sessionCacheSize" value="3"/>
    </bean>

    <!--
        注册监听器      DefaultMessageListenerContainer
        jms:listener-container 相当于DefaultMessageListenerContainer
        负责将messageListener注册到connectionFactory的destination,
        一旦destination中有消息,就会将消息推送给messageListener
        开始注册监听:
        需要的参数有:
            acknowledge -消息确认机制
            container-type 容器类型 default|simple
                simple:SimpleMessageListenerContainer最简单的消息监听容器,只能处理固定数量的JMS会话,而且不支持事务
                default:DefaultMessageListenerContainer 是一个用于异步消息监听器容器,且支持事务
            destination-type 目的地类型,使用队列作为目的地 queue  topic
            connection-factory 连接工厂,spring-jms使用的连接工厂,必须是spring自主创建的
            不能使用三方工具创建的工程,如:ActiveMQConnectionFactory
    -->
    <jms:listener-container acknowledge="auto" container-type="default"
                            destination-type="topic"
                            connection-factory="connectionFactory">
        <!-- 在监听容器中注册某监听器对象
            destination -设置目的地命名
            ref-指定监听器对象
            -->
        <jms:listener destination="test-spring-topic" ref="myListener"/>  <!--这个myListener是我们自定义的一个监听类,下边代码可以看到-->
    </jms:listener-container>

</beans>

3 service

很简单就是一个简单输出

package cn.arebirth.service.impl;

import cn.arebirth.pojo.Users;
import cn.arebirth.service.UserService;
import org.springframework.stereotype.Service;


@Service
public class UserServiceImpl implements UserService {

    @Override
    public void showUser(Users user) {
        System.out.println(user);
    }

}

4 监听处理消息类Listener

因为我们在配置文件里面已经引用了此对象,我们只需要实现MessageListener消息监听类即可

package cn.arebirth.listener;

import cn.arebirth.pojo.Users;
import cn.arebirth.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

/**
 * 消息服务监听器
 */
@Component(value = "myListener")
public class MyMessageListener implements MessageListener {

    @Autowired
    private UserService userService;

    @Override
    public void onMessage(Message message) {
        //处理消息
        ObjectMessage objectMessage = (ObjectMessage) message;
        Users users = null;
        try {
            users = (Users) objectMessage.getObject();
        } catch (JMSException e) {
            e.printStackTrace();
        }
        this.userService.showUser(users);

    }
}

5 启动测试

package cn.arebirth;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Start {
    public static void main(String[] args) {
        String[] content = {"classpath:applicationContext-jms.xml","classpath:applicationContext-service.xml"};
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(content);
        context.start();
    }
}

启动后,我们发现没有任何消息啊,是不是代码错了啊!

!!我们的是topic模式,所以我们要先启动consumer,然后在进行发布消息,否则,provider发不完可不管你收不收到的

然后我们在重新发布一条消息

内容

我们来看consumer

已经收到消息了,就此我们的整合就已经完毕了


我们的具体精华都在配置文件里面,详细的注释也都在里面,需要多看,并且敲一遍,运行一遍,然后在看一遍,你会有新的收货!

参考博客:

https://www.cnblogs.com/zackzhuzi/p/10050506.html

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-10-15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 通过前几篇的学习,相信大家已经对我们的ActiveMQ的原生操作已经有了个深刻的概念,
  • 那么这篇文章就来带领大家一步一步学习下ActiveMQ结合Spring的实战操作
  • 一 创建生产者
  • 二 创建消费者
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档