前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >五分钟快速了解ActiveMQ,案例简单且详细!

五分钟快速了解ActiveMQ,案例简单且详细!

作者头像
Mshu
发布2019-07-02 15:44:23
9030
发布2019-07-02 15:44:23
举报
文章被收录于专栏:咸鱼不闲咸鱼不闲

最近得闲,探索了一下ActiveMQ。

ActiveMQ消息队列,信息收发的容器,作用有异步消息,流量削锋,应用耦合。 同行还有 Kafka、RabbitMQ、RocketMQ、ZeroMQ、MetaMQ 。

安装

下载地址:http://activemq.apache.org/co...

window版本的解压后双击/bin/activemq.bat 即可启动。 或者以服务方式启动:右键管理员运行InstallService.bat,然后在Windows系统的服务中启动。

它有自己的可视化页面:http://localhost:8161/admin/

默认访问密码是:admin/admin 如果需要修改在:/conf/jetty-realm.properties 中修改

JmsTemplate

springboot上整合的,使用spring 的JmsTemplate来操作ActiveMQ 一、首先在pom文件中导入所需的jar包坐标:

代码语言:javascript
复制
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
    </dependency>

二、新增一个ActiveMQ的配置文件spring-jms.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">

    <!-- 配置JMS连接工厂 -->
    <bean id="innerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${spring.activemq.broker-url}" />
    </bean>
    <!--配置连接池-->
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
        destroy-method="stop">
        <property name="connectionFactory" ref="innerConnectionFactory" />
        <property name="maxConnections" value="100"></property>
    </bean>
    <!-- 配置JMS模板,Spring提供的JMS工具类 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="pooledConnectionFactory" />
        <property name="defaultDestination" ref="JmsSenderDestination" />
        <property name="receiveTimeout" value="10000" />
    </bean>
</beans>

三、在启动类上配置以生效

代码语言:javascript
复制
@ImportResource(locations={"classpath:/config/spring-jms.xml"})

四、在application.properties中配置ActiveMQ 的连接地址

代码语言:javascript
复制
spring.activemq.broker-url=tcp://localhost:61616

准备就绪;开始写生产者和消费者,我这里把生产者和消费者写在一个项目里面。在这之前需要明白两个概念 队列(Queue)和主题(Topic)

传递模型

队列(Queue)和主题(Topic)是JMS支持的两种消息传递模型:

  1. 点对点(point-to-point,简称PTP)Queue消息传递模型: 一个消息只能被一个消费者消费
  2. 发布/订阅(publish/subscribe,简称pub/sub)Topic消息传递模型: 一个消息会被多个消费者消费

Queue

1.先在spring-jms.xml里添加配置一个队列名称Queue_love

代码语言:javascript
复制
<bean id="JmsSenderDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
    <value>Queue_love</value>
</constructor-arg>
</bean>

2.创建一个生产者来发送消息;@Qualifier("JmsSenderDestination")指定了发送到上面配置的Queue_love队列

代码语言:javascript
复制
@Component
public class JmsSender {
@Autowired
private JmsTemplate jmsTemplate;

@Qualifier("JmsSenderDestination")
@Autowired
protected Destination destination;

public void sendMessage(final String msg) {

    logger.info("QUEUE destination :" + destination.toString() + ", 发送消息:" + msg);
    jmsTemplate.send(destination, new MessageCreator() {
        @Override
        public Message createMessage(final Session session) throws JMSException {
            return session.createTextMessage(msg);
        }
    });
}}

3.创建一个消费者来消费消息:

代码语言:javascript
复制
@Component
public class JmsTemplateListener implements MessageListener {

@Override
public void onMessage(Message message) {
    final TextMessage tm = (TextMessage) message;
    try {
        logger.info("QUEUE接收信息==="+tm.getText());
    } catch (JMSException e) {
        e.printStackTrace();
    }
}}
 

4.消费者需要在spring-jms.xml配置一下并指明该消费者需要消费哪个队列的消息

代码语言:javascript
复制
<!-- 配置消息队列监听者 -->
<bean id="JmsListener" class="com.mashu.activeMq.jmsTemplate.JmsTemplateListener" />

<!-- 使用spring进行配置 监听 -->
<bean id="JmsTemplateListenerContainer"
  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="pooledConnectionFactory"></property>
<property name="destination" ref="JmsSenderDestination"></property>
<property name="messageListener" ref="JmsListener"></property>
<property name="sessionTransacted" value="false"></property>
<property name="concurrentConsumers" value="6"></property>
<property name="concurrency" value="2-4"></property>
<property name="maxConcurrentConsumers" value="10"></property>
</bean>

这样一个简单的消息队列收发程序已经写好了,调用生产者方法,看看结果

发出的消息立马就被消费了! 我们可以先把消费者注释掉,只用生产者发送消息就可以在可视化页面上看到还没有被消费的消息内容。

Topic

Topic的方式和Queue类似,只需要在定义队列的时候calss=org.apache.activemq.command.ActiveMQTopic即可

代码语言:javascript
复制
<bean id="JmsSenderTDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg>
        <value>Topic_love</value>
    </constructor-arg>
</bean>

Message

除了使用createTextMessage()方法发送纯字符串消息,还有

  1. 序列化对象的形式 session.createObjectMessage();
  2. 流的形式,可以用来传递文件 session.createStreamMessage();
  3. 字节的形式 session.createBytesMessage();
  4. map的形式 session.createMapMessage();

安全配置

ActiveMQ在使用的时候和MySQL一样,也可以配置用户名密码,默认不没有,我们可以打开:

1.在conf/activemq.xml添加以下信息(务必在<systemUsage>标签上面)

代码语言:javascript
复制
<plugins>
         <simpleAuthenticationPlugin>
             <users>
                 <authenticationUser username="${activemq.username}"
                  password="${activemq.password}" groups="users,admins"/>
             </users>
         </simpleAuthenticationPlugin>
     </plugins>

2.对应的用户名密码在/conf/credentials.properties中配置

代码语言:javascript
复制
activemq.username=admin
activemq.password=123456
guest.password=password

3.那么我们在项目中的application.properties需要加上也要用户名密码:

代码语言:javascript
复制
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=123456

消息持久化

ActiveMQ的持久化机制包含JDBC,KahaDB(默认)、LevelDB 默认保存的消息在\data\kahadb目录下;下面方法修改为用MySQL(JDBC)保存, 生产者发送消息存储到MySQL数据库,消费者消费后消息从数据库消失。

1.修改/conf/activemq.xml 将:

代码语言:javascript
复制
<persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>

修改为:

代码语言:javascript
复制
<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>
</persistenceAdapter>
 

createTablesOnStartup默认值是true,每次ActiveMQ启动的时候都重新创建数据表,一般是首次启动设置为true,之后设置为false。

2.添加:在/conf/activemq.xml 中加入MySQL连接信息

代码语言:javascript
复制
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/db_activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
<property name="poolPreparedStatements" value="true"/>
</bean>

3.添加 mysql-connector-java.jar 到/bin目录

4.新建数据库db_activemq

重启ActiveMQ后数据库产生三个表activemq_acksactivemq_lockactivemq_msgs

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 安装
  • JmsTemplate
  • 传递模型
  • Queue
  • Topic
  • Message
  • 安全配置
  • 消息持久化
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档