前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一篇文章让你了解JMS以及中间件之ActiveMQ

一篇文章让你了解JMS以及中间件之ActiveMQ

作者头像
@派大星
发布2023-07-15 13:37:00
5720
发布2023-07-15 13:37:00
举报
文章被收录于专栏:码上遇见你码上遇见你

JMS(Java Message Service)

JMS的组成特点

  1. JMS provider 实现JMS接口和规范的消息中间件,也就是我们的MQ服务器
  2. JMS producer 消息生产者 创建和发送JMS消息的客户端应用
  3. JMS consumer 消息消费者,接收和处理JMS消息的客户端应用
  4. JMS message
  • 消息头
    • JMS Destination 消息发送的目的地,主要是指Queue和Topic
    • JMS DeliverMode 持久模式和非持久模式 一条持久性的消息:应该被传送"一次仅仅一次",这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之前再次传递 一条非持久性消息:最多会传送一次,这就意味着服务器出现故障,该消息将永远消失
    • JMS Expiration 过期时间 默认永不过期
    • JMS Priority 优先级 0-9十个级别,0-4是普通消息 5-9是加急消息 JMS不要求MQ严格按照这十个优先级来发送消息,但必须保证加急消息要优先于普通消息 默认4级
    • JMS MessageID(幂等性会用到) 唯一识别每个消息的标识由MQ产生
  • 消息体 发送和接收的消息体类型必须一直对应 封装具体的消息数据 5种消息体格式
    • TextMessage 普通字符串消息,包含一个String
    • MapMessage 一个Map类型的消息,key为Srting类型,而值为Java的基本类型
    • BytesMessage 二进制组消息,包含一个byte[]
    • StreamMessage Java数据流消息,用标准流操作来顺序的填充和读取
    • ObjectMessage 对象消息,包含一个可序列化的Java对象
  • 消息属性 如果需要除消息头字段外的值,那么可以使用消息属性 识别/去重/重点标注等操作非常有用的方法

JMS的可靠性(事务)

  • PERSISTENT:持久性
    • 参数设置(队列默认持久) 非持久:messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 非持久化:当服务器宕机,消息不存在 持久:messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); 持久化:当服务器宕机,消息依然存在
    • 持久性的Queue
    • 持久的Topic
  • Transacction 事务:
    • false
    • true
    • 只要执行send,就进入到队列中。
    • 关闭事务,那第2个签收参数的设置需要有效
    • 先执行send再执行commit,消息才被真正的提交到队列中(session.commit() session.rolllback())
    • 消息需要批量发送,需要缓冲区处理
    • producer提交时的事务
    • 事务偏生产者/签收偏消费者
  • Acknowledge:签收
    • 在事务性会话中,当一个事务被成功提交则消息被自动签收。 如果事务回滚。则消息会被再次传送
    • 非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)
    • 生产事务开启,只有commit后才能将全部消息变为以消费
    • 消息生产者
    • 消息消费者
    • 自动签收(默认)
    • 手动签收
    • 允许重复消息
    • Session.AUTO_ACKNOWLEDGE
    • Session.CLIENT_ACKNOWLEDGE
    • 客户端调用acknowledge()方法手动签收
    • Session.DUPS_OK_ACKNOWLEDGE
    • 非事务
    • 事务
    • 签收和事务的关系

JMS开发基本步骤

JMS点对点总结

点对点模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。

和我们平时给朋友发送短信类似

  1. 如果在Session关闭时有部分消息已被收到但还没有被签收(acknowledged),那当前消费者下次连接到相同队列时,这些消息还会被再次签收
  2. 队列可以长久的保存消息直到消费者收到消息,消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势

JMS发布订阅总结

JMS Pub/Sub模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic

主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。

主题使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送。

非持久

非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收到发送某个主题的消息。

如果消费者处于离线状态,生产者发送的主题将会丢失作废,消费者永远不会收到

一句话:先要订阅注册才能接收到发布,只给订阅者发布消息

持久

客户端首先向MQ注册一个自己的身份ID识别号,当客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,

当客户端再次连接到MQ时会根据消费者的ID得到所有当自己处于离线时发送到主题的消息

非持久订阅状态下,不能恢复或重新派送一个未签收的消息。

持久订阅才能恢复或重新派送一个未签收的消息

JMS编码总体架构(类似JDBC编码)

JavaEE

Active MQ

MQ中间件的落地产品有哪些?

消息队列的详细比较

参考资料

官网:https://activemq.apache.org/

特性:

  • api发送和接收
  • MQ的高可用性
  • MQ的集群和容错配置
  • MQ的持久性
  • 延时发送/定时发送
  • 签收机制
  • Spring整合
  • 编程语言Java

安装

  1. 官网下载;https://activemq.apache.org/components/classic/download/
  2. 上传到Linux下的opt目录
代码语言:javascript
复制
# 解压
tar -zxvf apache-activemq-5.16.1-bin.tar.gz
# 在根目录下创建文件夹
mkdir my-activeMQ
# 将解压的文件copy到自己新创建的目录下
cp -r /opt/apache-activemq-5.16.1 /my-activeMQ/
# 进入activeMQ文件夹
cd /my-activeMQ/apache-activemq-5.16.1/bin
# 启动 普通启动  默认进程端口 61616
./activemq start
# 带有日志启动 
./activemq start > /my-activeMQ/myrunmq.log

ActiveMQ控制台

代码语言:javascript
复制
# 连接之前要将防火墙关闭  或者设置白名单  service iptables stop
http://ip:8161/admin/                        #账号 admin    密码 admin

Java实现(两种模式)

POM依赖

代码语言:javascript
复制
<!--需要引入的依赖-->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.9</version>
</dependency>
<!--junit/log4j等基础使用配置-->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
</dependency>
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.3</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.16.18</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
</dependency>

Java代码(1对1 Queue):

代码语言:javascript
复制
// 消息生产者的代码
public static final String ACTIVEMQ_URL = "tcp://ip:61616";
public static final String QUEUE_NAME = "queue01";

public static void main(Stringp[] args){
    //1 创建连接工厂 按照给定的url地址 采用默认的用户名和密码  如果用户名密码改了 也可以传进去  接受3参
    ActiveMQConenctionFactory activeMQConenctionFactory = new ActiveMQConenctionFactory(ACTIVEMQ_URL);
    //2 通过连接工厂 获得连接connection并启动访问 抛异常
    Connection connection = activeMQConenctionFactory.createConnection();
    connection.start();
    //3 创建会话session
    // 两个参数 第一个叫事务 第二个叫签收
    Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    //4 创建目的地 (具体是队列还是主题)  注意这里的包是jms
    Queue queue = session.createQueue(QUEUE_NAME);
    //5 创建消息的生产者
    MessageProducer messageProducer = session.createProducer(queue);
    //6 通过消息生产者messageProducer生产3条消息发送到MQ的队列里面
    for (int i = 1; i<=3; i++){
        //7 创建消息 
        TextMessage textMessage = session.createTextMessage("msg ---"+i);//理解为一个字符串
        //8 通过messageProducer发送给mq
        messageProducer.send(textMessage);
    }
    //9 关闭资源
    messageProducer.close();
    session.close();
    connection.close();
    
    System.out.println("*******消息发布到MQ完成");
}

控制台说明:

代码语言:javascript
复制
// 消息消费者的代码 Consumer

public static final String ACTIVEMQ_URL = "tcp://ip:61616";
public static final String QUEUE_NAME = "queue01";


public static void main(String[] args){
    System.out.println("***********我是1号消费者")
    //1 创建连接工厂 按照给定的url地址 采用默认的用户名和密码  如果用户名密码改了 也可以传进去  接受3参
    ActiveMQConenctionFactory activeMQConenctionFactory = new ActiveMQConenctionFactory(ACTIVEMQ_URL);
    //2 通过连接工厂 获得连接connection并启动访问 抛异常
    Connection connection = activeMQConenctionFactory.createConnection();
    connection.start();
    //3 创建会话session
    // 两个参数 第一个叫事务 第二个叫签收
    Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    //4 创建目的地 (具体是队列还是主题)  注意这里的包是jms
    Queue queue = session.createQueue(QUEUE_NAME);
    //5 创建消费者
    MessageConsumer messageConsumer = session.createConsumer(queue);
    
    
    /*
    // 1 种
    同步阻塞方式receive()
    订阅者或接收者调用MessageConsumer的receive()的方法来接受消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞
    while(true){
        // receive有两种  1 无超时时间  2  超时时间
        TextMessage textMessage = (TextMessage)messageConsumer.receive();
        // TextMessage textMessage = (TextMessage)messageConsumer.receive(4000L);
        if (null != textmessage){
            System.out.println("*****消费者接收到消息:"+textMessage.getText());
        }else{
            break;
        }
    }
    messageConsumer.close();
    session.close();
    connection.close();
    **/
    
    
    //第2种 通过监听的方式来消费消息 MessageConsumer messageConsumer = session.createConsumer(queue);
    // 异步非阻塞方式(监听器onMessage())
    //订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器
    //当消息到达之后 系统自动调用监听器MessageListener的onMessage(Message message)方法
    messageConsumer.setMessageListener(new MessageListener(){
        @Override
        pubilc void onMessage(Message message){
            if(null != message && message instanceof TextMessage){
                TextMessage textMessage = (TextMessage)message;
                // 这里有异常  try catch
                System.out.println("*****消费者接收到消息:"+textMessage.getText());
            }
        }
    });
    // 抛异常  保证控制台不灭
    System.in.read();
    messageConsumer.close();
    session.close();
    connection.close();
    
    /*
    *1 先生产 只 启动1号消费者 问题:1号消费者能消费到消息吗?Yes
    *2 先生产 先 启动1号消费者 再启动2号消费者 问题:2号消费者还能消费到消息吗?No
    *    1号可以消费
    *    2号不可以消费
    *3 先启动2个消费者,再生产6条消息  请问消息情况如何?一人一半
    */
}

Queue总结:

两种消费方式:

  • 同步阻塞方式receive() 订阅者或接收者调用MessageConsumer的receive()的方法来接受消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞
  • 异步非阻塞方式(监听器onMessage())订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器//当消息到达之后 系统自动调用监听器MessageListener的onMessage(Message message)方法

在点对点的消息传递种,目的地被称为队列(queue)

点对点消息传递域的特点如下:

  • 每个消息只能有一个消费者,类似1对1的关系。好比个人快递自己领取自己的
  • 消息的生产者和消费者之间没有时间上的对应性。无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。好比我们的发送短息,发送者发送后不见得接收者会即收即看
  • 消息被消费后队列不会再存储,所以消费者不会消费到已经被消费掉的消息

Java代码(1对多 Topic):

非持久化
代码语言:javascript
复制
// 消息生产者的代码
public static final String ACTIVEMQ_URL = "tcp://ip:61616";
public static final String TOPIC_NAME = "topic01";

public static void main(Stringp[] args){
    //1 创建连接工厂 按照给定的url地址 采用默认的用户名和密码  如果用户名密码改了 也可以传进去  接受3参
    ActiveMQConenctionFactory activeMQConenctionFactory = new ActiveMQConenctionFactory(ACTIVEMQ_URL);
    //2 通过连接工厂 获得连接connection并启动访问 抛异常
    Connection connection = activeMQConenctionFactory.createConnection();
    connection.start();
    //3 创建会话session
    // 两个参数 第一个叫事务 第二个叫签收
    Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    //4 创建目的地 (具体是队列还是主题)  注意这里的包是jms
    Topic topic = session.createTopic(TOPIC_NAME);
    //5 创建消息的生产者
    MessageProducer messageProducer = session.createProducer(queue);
    // messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT)  持久化的topic
    // connection.start(); 持久化的topic
    //6 通过消息生产者messageProducer生产3条消息发送到MQ的队列里面
    for (int i = 1; i<=3; i++){
        //7 创建消息 
        TextMessage textMessage = session.createTextMessage("TOPIC_NAMEmsg ---"+i);//理解为一个字符串
        //8 通过messageProducer发送给mq
        messageProducer.send(textMessage);
    }
    //9 关闭资源
    messageProducer.close();
    session.close();
    connection.close();
    
    System.out.println("*******TOPIC_NAME消息发布到MQ完成");
}
代码语言:javascript
复制
// 消息消费者的代码 Consumer

public static final String ACTIVEMQ_URL = "tcp://ip:61616";
public static final String TOPIC_NAME = "topic01";


public static void main(String[] args){
    System.out.println("***********我是1号消费者")
    //1 创建连接工厂 按照给定的url地址 采用默认的用户名和密码  如果用户名密码改了 也可以传进去  接受3参
    ActiveMQConenctionFactory activeMQConenctionFactory = new ActiveMQConenctionFactory(ACTIVEMQ_URL);
    //2 通过连接工厂 获得连接connection并启动访问 抛异常
    Connection connection = activeMQConenctionFactory.createConnection();
    connection.start();
    
    // 持久化 connection.setClientID("zs")
    
    //3 创建会话session
    // 两个参数 第一个叫事务 第二个叫签收
    Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    //4 创建目的地 (具体是队列还是主题)  注意这里的包是jms
    Topic topic = session.createTopic(TOPIC_NAME);
    
    //TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark")   //主题订阅者
    
    //5 创建消费者
    MessageConsumer messageConsumer = session.createConsumer(topic);    
    
    //通过监听的方式来消费消息
    /*messageConsumer.setMessageListener(new MessageListener(){
        @Override
        pubilc void onMessage(Message message){
            if(null != message && message instanceof TextMessage){
                TextMessage textMessage = (TextMessage)message;
                // 这里有异常  try catch
                System.out.println("*****消费者接收到消息:"+textMessage.getText());
            }
        }
    });*/
    // lomdba 表达式
    messageConsumer.setMessageListener((Message message) ->{
        if(null != message && message instanceof TextMessage){
                TextMessage textMessage = (TextMessage)message;
                // 这里有异常  try catch
                System.out.println("*****消费者接收到消息:"+textMessage.getText());
            }
    })
    // 抛异常  保证控制台不灭
    System.in.read();
    messageConsumer.close();
    session.close();
    connection.close();
}

持久化
代码语言:javascript
复制
// 消息生产者的代码
public static final String ACTIVEMQ_URL = "tcp://ip:61616";
public static final String TOPIC_NAME = "topic01";

public static void main(Stringp[] args){
    //1 创建连接工厂 按照给定的url地址 采用默认的用户名和密码  如果用户名密码改了 也可以传进去  接受3参
    ActiveMQConenctionFactory activeMQConenctionFactory = new ActiveMQConenctionFactory(ACTIVEMQ_URL);
    //2 通过连接工厂 获得连接connection并启动访问 抛异常
    Connection connection = activeMQConenctionFactory.createConnection();
    //3 创建会话session
    // 两个参数 第一个叫事务 第二个叫签收
    Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    //4 创建目的地 (具体是队列还是主题)  注意这里的包是jms
    Topic topic = session.createTopic(TOPIC_NAME);
    //5 创建消息的生产者
    MessageProducer messageProducer = session.createProducer(queue);
    messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT)  // 持久化的topic
    
        connection.start(); // 持久化的topic
    //6 通过消息生产者messageProducer生产3条消息发送到MQ的队列里面
    for (int i = 1; i<=3; i++){
        //7 创建消息 
        TextMessage textMessage = session.createTextMessage("TOPIC_NAMEmsg ---"+i);//理解为一个字符串
        //8 通过messageProducer发送给mq
        messageProducer.send(textMessage);
    }
    //9 关闭资源
    messageProducer.close();
    session.close();
    connection.close();
    
    System.out.println("*******TOPIC_NAME消息发布到MQ完成");
}
代码语言:javascript
复制
// 消息消费者的代码 Consumer  持久化的

public static final String ACTIVEMQ_URL = "tcp://ip:61616";
public static final String TOPIC_NAME = "topic01";


public static void main(String[] args){
    System.out.println("***********z3")
    //1 创建连接工厂 按照给定的url地址 采用默认的用户名和密码  如果用户名密码改了 也可以传进去  接受3参
    ActiveMQConenctionFactory activeMQConenctionFactory = new ActiveMQConenctionFactory();
    //2 通过连接工厂 获得连接connection并启动访问 抛异常
    Connection connection = activeMQConenctionFactory.createConnection();
   
    // 持久化 
    connection.setClientID("z3")
    
    //3 创建会话session
    // 两个参数 第一个叫事务 第二个叫签收
    Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    //4 创建目的地 (具体是队列还是主题)  注意这里的包是jms
    Topic topic = session.createTopic(TOPIC_NAME);
    
    TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark")   //主题订阅者
    
    connection.start();
    Message message = topicSubscriber.receive();
    while(null != message){
        TextMessage textMessage = TextMessage(message);
        System.out.println("******收到的持久化Topic"+textMessage.getText());
        message = topicSubscriber.receive(1000L);
    }
    
    session.close();
    connection.close();
}
总结:

一定要先运行一次消费,等于向MQ注册,类似我订阅了这个主题。

然后再运行生产者发送是那个消息,此时

无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收过的消息都接收下来。

Topic总结:

发布/订阅消息传递域的特点如下:

  • 生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系
  • 生产者和消费者之间有时间上的相关性。订阅某一主题的消费者只能消费自它订阅之后发布的消息
  • 生产者生产时,topic不保存消息它是 无状态的 不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。

两大模式比较

ActiveMQ的Broker

相当于一个ActiveMQ的服务器实例

说白了,Broker其实就是实现了用代码形式的启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动

再用的时候再去启动这样节省了资源,也保证了可靠性。

代码语言:javascript
复制
./activemq start xbean:file:/my-activeMQ/apache-activemq-5.16.1/conf/activemq02.xml            # 指定配置文件启动

嵌入式Broker

引入依赖

代码语言:javascript
复制
<!--不引入该依赖可能报错  ClassNotFoundException 。。。。。。。-->
<dependency>
    <groupId>com.faster.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.5</version>
</dependency>
<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>3.16</version>
</dependency>
代码语言:javascript
复制
public class EmbedBroker{
    public static void main(String[] args){
        // 用ActiveMQ Broker作为独立的消息服务器来构建Java应用
        // ActiveMQ也支持在vm中通信基于嵌入式的Broker
        BrokerService brokerService = new BrokerService();
        brokerService.setUseJmx(true);
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
    }
}

Spring整合ActiveMQ

pom依赖

代码语言:javascript
复制
<!--需要引入的依赖-->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.9</version>
</dependency>
<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>3.16</version>
</dependency>

<!--不引入该依赖可能报错  ClassNotFoundException 。。。。。。。-->
<dependency>
    <groupId>com.faster.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.5</version>
</dependency>

<!--activemq 对jms的支持,整合spring和ActiveMQ-->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>4.3.23</version>
</dependency>

<!--activemq 所需要的pool包-->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.15.9</version>
</dependency>

<!--Spring AOP 相关jar-->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version>4.3.23.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>4.3.23.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-aop</artifactId>
    <version>4.3.23.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-orm</artifactId>
    <version>4.3.23.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjrt</artifactId>
    <version>1.6.1</version>
</dependency>
<dependency>
    <groupId>cglib</groupId>
    <artifactId>cglib</artifactId>
    <version>2.1_2</version>
</dependency>


<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.16.18</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
</dependency>

spring xml配置文件

applicationContext.xml

代码语言:javascript
复制
<context:component-scan base-package="com.vipbbo.activemq"/>
<!--配置生产者-->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destory-method="stop">
    <property name="connectionFactory">
        <!--真正可以产生Connection的ConnectionFactory,由对应的JMS服务厂商提供-->
        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://ip.....:61616"/>
        </bean>
    </property>
    <property name="maxConnections"value="100"></property>
</bean>
<!--这个队列目的地,点对点的-->
<bean id="destinationQueue" class="org.apache.activeqm.command.ActiveMQQueue">
    <construtor-arg index="0" value="spring-active-queue"/>
</bean>
<!--spring 提供的jms工具类 它可以进行消息发送 接收等-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="jmsFactory"/>
    <property name="defaultDestination" ref="destinationQueue"/>
    <property name="messageConverter">
        <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
    </property>
</bean>

一对一的Queue代码实现

代码语言:javascript
复制
// 生产者
@Service
public class SpringMQ_Produe{
    
    @Autowired
    private JmsTemplate jmsTemplate;
    
    public static void main(String[] args){
        ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml")
        SpringMQ_Produe produce = (SpringMQ_Produce)ctx.getBean("springMQ_Produce");
        /*
        produce.jmsTemplate.send(new MessageCreator(){
            @Override
            pubilc Message createMessage(Session session) throw JMSException{
                TextMessage textMessage = session.createTextMessage("***spring和ActiveMQ的整合case...")
                return textMessage;
            }
        })
        */
        //表达式
        produce.jmsTemplate.send(new MessageCreator(session -> ){
            TextMessage textMessage = session.createTextMessage("***spring和ActiveMQ的整合case...")
            return textMessage;
        })
            System.out.println("***********send task over")
    }
}
代码语言:javascript
复制
// 消费者
@Service
public class SpringMQ_Consumer{
        
    @Autowired
    private JmsTemplate jmsTemplate;
    
    public static void main(String[] args){
        ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml")
        SpringMQ_Consumer consumer = (SpringMQ_Produce)ctx.getBean("SpringMQ_Consumer");
        String retValue = (String)consumer.jmsTemplate.receiveAndConvert();
        System.out.println("*****消费者收到的消息:"+retValue);
    }
}

一对多的代码实现Topic

添加Topic在配置文件中

代码语言:javascript
复制
<!--这个主题目的地,点对点的-->
<bean id="destinationTopic" class="org.apache.activeqm.command.ActiveMQTopic">
    <construtor-arg index="0" value="spring-active-topic"/>
</bean>

<!--spring 提供的jms工具类 它可以进行消息发送 接收等-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="jmsFactory"/>
    <property name="defaultDestination" ref="destinationTopic"/>
    <property name="messageConverter">
        <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
    </property>
</bean>

在Spring里面实现消费者不启动,直接通过配置监听完成

修改applicationContext.xml

代码语言:javascript
复制
<!--配置监听程序-->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsFactory"/>
    <!--注意这里的ref  要监听的   -->
    <property name="destination" ref="destinationTopic"/>
    <!-- public class MyMessageListener implements MessageListener-->
    <property name="messageListener" ref="myMessageListener"/>
</bean>
<!--自己定义的包名类名  或者在类上使用 @Component注解-->

<!--
<bean id="myMessageListener" class="com.vipbbo.activemq.spring.MyMessageListener"/>
-->

代码语言:javascript
复制
@Component
public class MyMessageListener implements MessageListener{
    @Override
    pubilc void onMessage(Message message){
        if(null != message && message instanceof TextMessage){
            TextMessage textMessage = (TextMessage)message;
            try{
                System,out.println("textMessage.getText()");
            } catch (JMSException e){
                e.printStackTracr();
            }
            
        }
    }
}

Spring Boot整合ActiveMQ

Queue

boot 使用的是 2.1.5

pom依赖
代码语言:javascript
复制
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.5.RELEASE</version>
    <relativePath/>
</parent>

<groupId>com.vipbbo.activemq</groupId>
<artifactId>boot_mq_produce</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
    <project.bulid.sourceEncoding>UTF-8</project.bulid.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <!--activemq  依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
        <version>2.1.5.RELEASE</version>
    </dependency>
</dependencies>
application.yml
代码语言:javascript
复制
server:
  port: 7777

spring:
  activemq:
    broker-url: tcp://ip:61616 # 自己的mq服务器地址
    user: admin
    password: admin
  jms:
    pub-sub-domain: false    # false = Queue  true = Topic


# 自定义队列名称
myqueque: boot-activemq-queue
配置Bean
代码语言:javascript
复制
@Component
@EnableJms   //一定要开启
public class ConfigBean{
    
    @Value("${myqueue}")
    private String myQueue;
    
    @Bean // 相当于<bean id ="" class="" >
    public Queue queue(){
        return new ActiveMQQueue(myQueue)
    }
}

生产者

代码语言:javascript
复制
// 生产者
@Component
public class Queue_Produce{
    
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    @Autowired
    private Queue queue;
    
    public void produceMsg(){
        jmsMessagingTemplate.convertAndSend(queue,UUID.randomUUID().toString().subString(0,6));
    }
}

测试类

代码语言:javascript
复制
@SpringBootTest(classes= Main_Produce.class)  // 主类
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration

public class TestActiveMQ{
    
    @Resource
    private Queue_Produce queue_produce;
    
    @Test
    public void testSend() throw Exception{
        queue_produce.produceMsg();
    }
}

消费者

pom文件和生产者一样 端口号8888

代码语言:javascript
复制
@Component
public class Queue_Consumer
{
    @JmsListener(destination = "${myqueue}")
    public void receive(TextMessage textMessage) throw JMSException
    {
        System.out.println("********消费者收到消息:"+textMessage.getText());
    }
}

Topic

pom依赖

和Queue 的一样

application.yml
代码语言:javascript
复制
server:
  port: 6666

spring:
  activemq:
    broker-url: tcp://ip:61616 # 自己的mq服务器地址
    user: admin
    password: admin
  jms:
    pub-sub-domain: true    # false = Queue  true = Topic


# 自定义队列名称
myTopic: boot-activemq-topic
配置Bean
代码语言:javascript
复制
@Component
@EnableJms   //一定要开启
public class ConfigBean
{
    
    @Value("${myTopic}")
    private String topicName;
    
    @Bean // 相当于<bean id ="" class="" >
    public Topic topic()
    {
        return new ActiveMQTopic(topicName)
    }
}

生产者

代码语言:javascript
复制
// 生产者  topic
@Component
public class Topic_Produce{
    
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    @Autowired
    private Topic topic;
    
    @Schdeuled(fixedDelay = 3000)   // 在主启动动类开启 加注解  @EnableScheduling
    public void produceTopic()
{
        jmsMessagingTemplate.convertAndSend(topic,"主题"+UUID.randomUUID().toString().subString(0,6));
    }
}

消费者

pom文件和生产者一样

application.yml
代码语言:javascript
复制
#   第一个配置
server:
  port: 5566

spring:
  activemq:
    broker-url: tcp://ip:61616 # 自己的mq服务器地址
    user: admin
    password: admin
  jms:
    pub-sub-domain: true    # false = Queue  true = Topic


# 自定义队列名称
myTopic: boot-activemq-topic



#   第二个配置
server:
  port: 5566

spring:
  activemq:
    broker-url: tcp://ip:61616 # 自己的mq服务器地址
    user: admin
    password: admin
  jms:
    pub-sub-domain: true    # false = Queue  true = Topic


# 自定义队列名称
myTopic: boot-activemq-topic

ActiveMQ的传输协议

Transmission Control Protocol-(TCP)

  1. 这是默认的Broker配置,TCP的client监听端口61616
  2. 在网络传输数据之前,必须要序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。默认情况下ActiveMQ把wire protocol叫做OpenWire,它的目的是促使网络上的效率和数据快速交互。
  3. TCP连接的URL形式如:tcp://hostname:port?key=value&key=value,后面的参数是可选
  4. TCP传输的优点
    • TCP协议传输可靠性高,稳定性强
    • 高效性:字节流方式传递,效率很高
    • 有效性、可用性:应用广泛,支持任何平台
  5. 关于Transport协议的可配置参数可以参考官网:https://activemq.apache.org/configuring-version-5-transports

New(no) I/O API Protocol-(NIO)

介绍
  1. NIO协议和TCP协议类似但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务端有更多的负载。
  2. 适合使用NIO协议的场景:
    • 可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的。因此,NIO的实现比实现TCP需要更少的线程去运行,所以建议使用NIO协议
    • 可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。
  3. NIO连接的URL形式:nio//hostname:port?key=value
  4. Transport Connector配置实例,参考官网:https://activemq.apache.org/configuring-version-5-transports
配置
修改activemq.xml

如果你不特别指定ActiveMQ的网络监听端口,那么这些端口都将使用BIO网络IO模型,(OpenWire,STOMP,AMQP...)所以为了首先提高单节点的网络吞吐性能,我们需要明确指定Active的网络IO模型

如下所示:URL格式以nio开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型

代码语言:javascript
复制
<broker>
  ...
  <transportConnectors>
    <transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/>  
  </<transportConnectors>
  ...
</broker>

Java代码

代码语言:javascript
复制
public static final String ACTIVEMQ_URL = "nio://ip:61618";
NIO增强

如何解决让这个端口支持NIO网络IO模型,又让它支持多个协议呢?

解决:

参考:https://activemq.apache.org/auto

使用auto关键字。

使用"+"符号来为端口设置多种特性

如果我们既需要某一个端口支持NIO网络IO模型,又需要它支持多个协议

代码语言:javascript
复制
<transportConnector name="nio" uri="auto+nio://0.0.0.0:61608?maximumConnection=1000&amp;wireFormat.maxFrameSize=104857600&amp;org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&amporg.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50"/>  

AMQP协议

即Advance Message Queuing Protocol,一个提供同意消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。

参考地址:https://activemq.apache.org/amqp

STOMP协议

STOMP,Stream Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。

参考地址:https://activemq.apache.org/stomp

Secure Sockets Layer Protocol(SSL)

  1. 连接的URL形式:ssl://hostname:port?key=value
  2. Transport Connetor 配置示例:参考:https://activemq.apache.org/ssl-transport-reference

MQTT协议

MQTT(Message Queuing Telemetry ,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有互联网和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。

参考:https://activemq.apache.org/mqtt

参考资料:https://github.com/fusesource/mqtt-client

WS协议

总结 :

ActiveMQ的消息存储和持久化

官网:https://activemq.apache.org/persistence 可以在conf文件下的注释中找到

ActiveMQ的持久化机制

  1. Replicated LevelDB Store
  2. KahaDB

为了避免意外宕机后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。

就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等再试图将消息发送给接收者,成功则将消息从存储中删除,失败则继续尝试发送。

消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需把消息发送出去。

消息存储机制

AMQ Message Store(了解)

基于文件的存储方式,是以前的默认消息存储,现在不用了

AMQ Message是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件夹中,文件的默认大小为32M,当一个存储文件中的消息已经全被消费,那么这个文件将被表示为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本

KahaDB消息存储(默认)
  • 基于日志文件,从ActiveMQ5.4开始默认的持久化插件(类似于redis的aof rdb)
kahaDB官网:

https://activemq.apache.org/kahadb

验证:可以在activeMQ的conf目录下的activemq.xml看见

代码语言:javascript
复制
<kahaDB directory="${activemq.data}/kafadb}"/>
说明:

KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。

消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址

kahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。

数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。

kahaDB的 存储原理

kahadb在消息保存目录中只有4类文件和一个lock,跟ActiveMQ的其他几种文件存储引擎相比较这就非常简洁了。

  1. db-< Number >.log KahaDB存储消息到预定义大小的数据记录文件中,文件命名为db-< Number >.log。当数据文件已满时,一个新的文件随之创建,number数值也会随之递增,它随着消息数量的增多,比如每32M一个文件,文件名按照数字进行编号,如db-1.log、db-2.log、db-3.log ......。当不再有引用到数据文件中的任何消息时,文件会被删除或归档。
  1. db.data该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree(B数),使用B-Tree作为索引指向db-< Number >.log里面的存储的消息。
  2. db.free 当前db.data文件里面哪些页面是空闲的,文件具体内容是所有空闲页的ID (保证索引的连续性没有碎片
  3. db.redo 用来进行消息恢复,如果KahaDB消息存储在强制退出后启动,用于恢复BTree索引
  4. lock文件锁,表示当前获得kahadb读写权限的broker
LevelDB消息存储

这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库储存形式,但是它提供比KahaDB更快的持久性

但它不使用自定义B-Tree实现来索引预写日志,而是基于LevelDB的索引

默认配置如下

代码语言:javascript
复制
<persistenceAdapter>
    <leveleDBdirectory="activemq-data"/>
</persistenceAdapter>
JDBC消息存储
配置
  1. MQ+MySQL
  1. 添加mysql数据库的驱动包到lib文件夹下(如果你使用的是其他连接池 需要将连接池的相关jar包拷贝进来) cp mysql-connector-java-5.1.3.jar /my-activeMQ/apache-activemq-5.16.1/bin
  2. jdbcPersistenceAdapter配置 在/my-activeMQ/apache-activemq-5.16.1/conf路径下修改activemq.xml配置文件,按照如下修改:
代码语言:javascript
复制
<!--修改前-->
<persistenceAdapter>
    <kahaDB directory="${activemq-data}/kahadb"/>
</persistenceAdapter>

<!--修改后     mysql-ds  createTablesOnStartuo默认是true  新建一套表  第一次是true  第二次要设置是false-->
<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartuo="true"/>
</persistenceAdapter>

数据库连接池配置

代码语言:javascript
复制
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" detroy-methid="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://自己数据库IP:3306/activemq?relaxAutoCommit=true"/>
    <property name="username" value="自己的数据库账号"/>
    <property name="password" value="自己数据库的密码"/>
    <property name="maxTotal" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>

配置在activemq.xmlbroker标签之外import标签的里面

建仓SQL和建表说明

  • 建一个名为activemq的数据库
  • 三张表的说明

ACTIVEMQ_MSGS

代码语言:javascript
复制
ID                        # 自增的数据库主键
CONTAINER                # 消息的Destination
MSGID_PROD                # 消息发送者的主键
MSG_SEQ                    # 是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION                # 消息的过期时间,存储的是从1970-01-01到现在的毫秒数
MSG                        # 消息本地的Java序列化对象的二进制数据
PRIORITY                # 优先级 从0-9 数值越大优先级越高

ACTIVEMQ_ACKS

activemq_acks # 用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。数据字段如下:

代码语言:javascript
复制
CONTAINER                        # 消息的Destination
SUB_DEST                        # 如果使用的是static集群,这个字段会有集群其他系统的信息
CLIENT_ID                        # 每一个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME                        # 订阅者名称
SELECTOR                        # 选择器,可以选择之消费满足条件的消息。条件可以用自定义属性来实现,可支持多属性AND和OR操作
LAST_ACKED_ID                    # 记录消费过的消息ID

ACTIVEMQ_LOCK

表activemq_lock 在集群环境下才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker

注意 : 如果新建数据库OK + 上述配置OK + 代码运行 OK ,3张表会自动生成 万一情况,手动建表SQL(如果配置好不需要手动,应急)

代码实战
  1. 代码运行验证
    • 要开启持久化(生产者)_ Queue_
代码语言:javascript
复制
messageProducer.setDeliverMode(DeliveryMode.PERSISTENT)
代码语言:javascript
复制
// 消息生产者的代码
public static final String ACTIVEMQ_URL = "tcp://ip:61616";
public static final String QUEUE_NAME = "jdbc01";

public static void main(Stringp[] args){
    //1 创建连接工厂 按照给定的url地址 采用默认的用户名和密码  如果用户名密码改了 也可以传进去  接受3参
    ActiveMQConenctionFactory activeMQConenctionFactory = new ActiveMQConenctionFactory();
    //2 通过连接工厂 获得连接connection并启动访问 抛异常
    Connection connection = activeMQConenctionFactory.createConnection();
    connection.start();
    //3 创建会话session
    // 两个参数 第一个叫事务 第二个叫签收
    Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    //4 创建目的地 (具体是队列还是主题)  注意这里的包是jms
    Queue queue = session.createQueue(QUEUE_NAME);
    //5 创建消息的生产者
    MessageProducer messageProducer = session.createProducer(queue);
    
    messageProducer.setDeliverMode(DeliveryMode.PERSISTENT)
        
    //6 通过消息生产者messageProducer生产3条消息发送到MQ的队列里面
    for (int i = 1; i<=3; i++){
        //7 创建消息 
        TextMessage textMessage = session.createTextMessage("msg ---"+i);//理解为一个字符串
        //8 通过messageProducer发送给mq
        messageProducer.send(textMessage);
    }
    //9 关闭资源
    messageProducer.close();
    session.close();
    connection.close();
    
    System.out.println("*******消息发布到MQ完成");
    
}



// 消费者不用动

要开启持久化 Topic

代码语言:javascript
复制
 messageProducer.setDeliverMode(DeliveryMode.PERSISTENT)
  1. 数据库情况
    • 点对点模式Queue:当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中,当DeliveryMode设置为PEREISTENCE时,消息保存在broker的相应的文件或者数据库中.而且点对点模型中消息一旦被Consemer消费就从Broker中删除 看activemq_msgs
    • topic :先启动消费者订阅消息再运行生产,看activemq_acks
    • 运行生产code
  2. 小总结 Queue: 在没有消费者消费的情况下会将消息保存到activemq_msgs表中,只要有任意一个消费者已经消费过了,消费之后这些消息就会立即被删除 Topic:一般是先启动消费者订阅然后再生成的情况 下会将消息保存到activemq_msgs并且不会删除 activemq_acks表记录了订阅者的信息
  3. 开发有坑 在配置关系型数据库作为ActiveMQ的持久化存储方案时,有坑: 数据库jar包 默认dbcp2 记得需要将使用到的相关jar文件放置到ActiveMQ安装目录下的lib目录。mysql-jdbc驱动的jar包和对应的数据库连接池jar包 createTablesOnStartup 在jdbcPersistenceAdapter标签中设置了createTablesOnStartup属性时在第一次启动ActiveMQ时,ActiveMQ服务节点会自动创建所需要的数据表,启动完成后可以去掉这个属性,或者更改createTablesOnStartup属性为false 下划线 java.lang.IIIegalStateException:BeanFactory not initialized already closed 这是因为你的操作系统的机器名中有"__"符号。请更改机器名并且重启后即可解决。
JDBC Message Store with ActiveMQ Journal
说明:

这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库和读库

ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能

当消费者的消费速度能够及时跟上生产者消息的产生速度时,journal文件就能够大大减少需要写入到DB的消息。

降低了MySQL的操作,降低MySQL负担

配置:

/my-activeMQ/apache-activemq-5.16.1/conf路径下修改activemq.xml配置文件,按照如下修改:

代码语言:javascript
复制
<!--修改前     mysql-ds  createTablesOnStartuo默认是true  新建一套表  第一次是true  第二次要设置是false-->
<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartuo="true"/>
</persistenceAdapter>

<!--修改后   -->
<persistenceAdapter>
    <jdbcPersistenceAdapterFactory journalLogFiles="4"
                                   journalLogFileSize="32768"
                                   useJournal="true"
                                   useQuickJournal="true"
                                   dataSource="#mysql-ds"
                                   dataDirectory="activemq-data"/>
</persistenceAdapter>

ActiveMQ持久化机制小总结

持久化消息主要是指:

MQ所在的服务器down了消息不会丢失的机制

持久化机制演化过程:

从最初的AMQ Message Store 方案到ActiveMQ V4版本中推出的High performance journal (高性能事务支持)附件并且同步推出了关系型数据库的存储方案。ActiveMQ5.3版本中又推出了KahaDB的支持(V5.4版本后成为ActiveMQ默认的持久化方案),后来ActiveMQ V5.8版本开始支持LevelDB,到现在,V5.9+版本提供了标准的zookeeper+LevelDB集群化方案。本次重点了解KahaDB、LevelDB、和mysql数据库这三种持久化存储方案。

ActiveMQ的消息持久化机制有:

AMQ 基于日志文件 KahaDB 基于日志文件,从ActiveMQ5.4开始默认的持久化插件 JDBC 基于第三方数据库 LevelDB 基于文件的本地数据库储存,从ActiveMQ5.8版本之后又推出了LevelDB的持久化引擎性能高于KahaDB Replicated LevelDB Store 从ActiveMQ5.9 提供了基于LevelDB和zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。

无论使用哪种持久化方式,消息的存储逻辑都是一致的:

就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。消息中心启动后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。

ActiveMQ多点集群

官网:https://activemq.apache.org/masterslave

引入消息队列之后如何保证其高可用性 单点故障 (集群)

基于Zookeeper和LevelDB搭建ActiveMQ集群。集群仅提供主备方式的高可用集群功能,避免单点故障。

Zookeeper+replicated-leveldb-store的主从集群

三种集群方式对比

基于sharedFileSystem共享文件系统(KahaDB) 基于JDBC 基于可复制的LevelDB

基于ZK+Replicated LevelDB Store的案例

官网:https://activemq.apache.org/replicated-leveldb-store

部署规划和步骤
  • 环境和版本
  • 关闭防火墙并保证win可以ping通过ActiveMQ服务器
  • 要求具备zk集群并可以成功开启
  • 集群规划部署列表
  • 创建3台集群目录
代码语言:javascript
复制
mkdir /mq_cluster/
cd /mq_cluster/
cp -r /opt/apache/-activemq-5.15.9 mq_node01
cp -r mq_node01 mq_node02
cp -r mq_node01 mq_node03
  • 💡修改管理控制台端口
代码语言:javascript
复制
# mq_node01默认不动
# mq_node02   mq_node03和2一样操作
cd /mq_cluster/mq_node02/conf
vim jetty.xml                        # 里面有一个port
  • hostname名字映射
代码语言:javascript
复制
 vim /etc/hosts
  • ActiveMQ集群配置
代码语言:javascript
复制
# 01 02 03 一样的操作 3个节点的brokerName 要求一致
cd /mq_cluster/mq_node01/conf
vim activemq.xml                        # broker标签的brokerName="zzyymq"

3个节点的持久化配置

参考官网:

1号机:(vi /mq_cluster/mq_node01/conf/activemq.xml)

代码语言:javascript
复制
<persistenceAdapter>
    <replicatedLevelDB
      directory="${activemq-data}/leveldb"
      replicas="3"
      bind="tcp://0.0.0.0:63631"
      zkAddress="localhost:2191,localhost:2192,localhost:2193"
      zkPath="/activemq/leveldb-stores"
      hostname="zzyymq-server"
      sync="local_disk"
      />
</persistenceAdapter>

2号机(vi /mq_cluster/mq_node01/conf/activemq.xml)

代码语言:javascript
复制
<persistenceAdapter>
    <replicatedLevelDB
      directory="${activemq-data}/leveldb"
      replicas="3"
      bind="tcp://0.0.0.0:63632"
      zkAddress="localhost:2191,localhost:2192,localhost:2193"
      zkPath="/activemq/leveldb-stores"
      hostname="zzyymq-server"
      sync="local_disk"
      />
</persistenceAdapter>

3号机(vi /mq_cluster/mq_node01/conf/activemq.xml)

代码语言:javascript
复制
<persistenceAdapter>
    <replicatedLevelDB
      directory="${activemq-data}/leveldb"
      replicas="3"
      bind="tcp://0.0.0.0:63633"
      zkAddress="localhost:2191,localhost:2192,localhost:2193"
      zkPath="/activemq/leveldb-stores"
      hostname="zzyymq-server"
      sync="local_disk"
      />
</persistenceAdapter>
  • 修改各节点的消息端口
代码语言:javascript
复制
1 号不用动
vi /mq_cluster/mq_node02/conf/activemq.xml   # 2号修改tcp端口号为61617  3号修改tcp端口号为61618


  • 按顺序启动3个ActiveMQ节点,到这步前提是zk集群已经成功启动运行
代码语言:javascript
复制
# 查看zookeeper的运行数量
ps -ef |grep zookeeper|grep -V grep|wc -l
  • 批量启动脚本
代码语言:javascript
复制
#!bin/sh

cd /mq_cluster/mq_node01/bin
./activemq start

cd /mq_cluster/mq_node02/bin
./activemq start

cd /mq_cluster/mq_node03/bin
./activemq start
  • zk集群的节点状态说明
代码语言:javascript
复制
cd /myzookeeper/zk01/bin
./zkCli.sh -server 127.0.0.1:2191    # 查看activemq是否注册成功
ls /activemq/leveldb-stores             # 里面应该有3个

get /activemq/leveldb-stores/00000000000        # 查看elected 是否有值  有值的为master  null为slaver
测试
代码语言:javascript
复制
// 生产者 
public static final String ACTIVEMQ_URL = "failover:(tcp://ip:61616,tcp://ip:61617,tcp://ip:61618)randomize=false";
public static final String QUEUE_NAME = "jdbc01";

// 消费者 
public static final String ACTIVEMQ_URL = "failover:(tcp://ip:61616,tcp://ip:61617,tcp://ip:61618)randomize=false";
public static final String QUEUE_NAME = "jdbc01";

💡难点

引入消息队列后该如何保证其高可用性

ZK+Replicated LevelDB Store

异步投递Async Sends

对于一个Slow Consumer,使用同步发送消息可能出现Producer堵塞等情况,慢消费者适合使用异步发送

ActiveMQ支持同步、异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。

ActiveMQ默认使用异步发送的模式:除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送。

如果你没有使用事务并且发送的是持久化消息,每一次发送都是同步发送的且会阻塞producer直到broker返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来很大的延时。

很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化消息。

异步发送

它可以最大化producer端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过也带来了额外的问题。

就是需要消耗较多的Client端内存同时也会导致broker端性能消耗增加:

此外它不能有效地确保消息的发送成功。在useAsyncSend=true的情况下客户端需要容忍消息丢失的可能。

官网配置

参考#:https://activemq.apache.org/async-sends

代码语言:javascript
复制
// Configuring Async Send using a Connection URI
// You can use the Connection Configuration URI to configure async sends as follows
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

// Configuring Async Send at the ConnectionFactory Level
// You can enable this feature on the ActiveMQConnectionFactory object using the property.
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

// Configuring Async Send at the Connection Level
// Configuring the dispatchAsync setting at this level overrides the settings at the connection factory level.
//You can enable this feature on the ActiveMQConnection object using the property.
((ActiveMQConnection)connection).setUseAsyncSend(true);
💡异步发送如何确认发送成功

异步发送丢失消息的场景是:生产者设置UseAsyncSend=true,使用producer.send(msg)持续发送消息。 由于消息不阻塞,生产者会认为所有的send消息均被成功发送至MQ。 如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。 所以,正确的异步发送方法是需要接收回调的 同步发送和异步发送的区别就在此 同步发送等send不阻塞了就表示一定发送成功了 ,异步发送需要接收回执并由客户端再判断一次是否发送成功。

代码语言:javascript
复制
 ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer)session.createProducer(queue);
  TextMessage message =null;
  for (int i = 1; i<=3; i++){
        //7 创建消息 
         message = session.createTextMessage("TOPIC_NAMEmsg ---"+i);//理解为一个字符串
         message.setJMSMessageID(UUID.randomUUID().toString()+"------order--");
      String msgID = message.getJMSMessageID();
      activeMQMessageProducer.send(message,new AsyncCallback()
                                   {
                                       @Override
                                       public void onSuccess(){
                                           System.out.prinitln(msgID+"has been ok send")
                                       }
                                       @Override
                                       public void onException(JMSException exception){
                                           System.out.prinitln(msgID+"fail to send to mq");
                                       }
                                   })
        //8 通过messageProducer发送给mq
        messageProducer.send(textMessage);
    }

延迟投递和定时投递

参考官网:https://activemq.apache.org/delay-and-schedule-message-delivery

四大属性

AMQ_SCHEDULED_DELAY long 延迟投递的时间 AMQ_SCHEDULED_PERIOD long 重复投递的时间间隔 AMQ_SCHEDULED_REPEAT int 重复投递的次数 AMQ_SCHEDULED_CRON String Cron表达式

案例演示:

要在activemq.xml中配置schedulerSupport属性为True

代码语言:javascript
复制
<!--放在broker标签中-->
scheduleSupport="true" 

Java代码里面封装的辅助消息类型:ScheduleMessage

代码

代码语言:javascript
复制
// JMSProducer_DelayAndSchedule

// 消息生产者的代码
public static final String ACTIVEMQ_URL = "tcp://ip:61616";
public static final String QUEUE_NAME = "queue-delay";

public static void main(Stringp[] args){
    //1 创建连接工厂 按照给定的url地址 采用默认的用户名和密码  如果用户名密码改了 也可以传进去  接受3参
    ActiveMQConenctionFactory activeMQConenctionFactory = new ActiveMQConenctionFactory(ACTIVEMQ_URL);
    //2 通过连接工厂 获得连接connection并启动访问 抛异常
    Connection connection = activeMQConenctionFactory.createConnection();
    connection.start();
    //3 创建会话session
    // 两个参数 第一个叫事务 第二个叫签收
    Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    //4 创建目的地 (具体是队列还是主题)  注意这里的包是jms
    Queue queue = session.createQueue(QUEUE_NAME);
    //5 创建消息的生产者
    MessageProducer messageProducer = session.createProducer(queue);
    Long delay = 3 * 1000;
    long period = 4 * 1000;
    int repeat = 5;
    
    //6 通过消息生产者messageProducer生产3条消息发送到MQ的队列里面
    for (int i = 1; i<=3; i++){
        //7 创建消息 
        TextMessage textMessage = session.createTextMessage("delay msg ---"+i);//理解为一个字符串
        
        message.setLongProperty(ScheduleMessage.AMQ_SCHEDULED_DELAY,delay);
        message.setLongtProperty(ScheduleMessage.AMQ_SCHEDULED_PERIOD,period);
        message.setIntProperty(ScheduleMessage.AMQ_SCHEDULED_REPEAT,repeat);
        
        //8 通过messageProducer发送给mq
        messageProducer.send(textMessage);
    }
    //9 关闭资源
    messageProducer.close();
    session.close();
    connection.close();
    
    System.out.println("*******消息发布到MQ完成");
}
代码语言:javascript
复制
// JMSConsumer_DelayAndSchedule

分发策略(未写)

ActiveMQ消费重试机制

具体哪些情况会引起消息的重发

  1. Client用了transaction且在session中调用了rollback()
  2. Client用了transactionns且在调用commit()之前关闭或者没有commit()
  3. Client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover()

请说说消息重发时间间隔和重发次数吗?

间隔:1

次数:6

有毒消息Poison ACK谈谈你的理解

参考官网:https://activemq.apache.org/redelivery-policy

一个消息被redelivedred超过默认的最大重试次数(默认6次)时,消费端会给MQ发送一个"poison ack" 表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)

代码语言:javascript
复制
collisionAvoidanceFactor            # 设置防止冲突范围的正负百分比,只有启用useCllisionAvoidance参数时才生效,也就是在延迟时间上再加一个时间波动范围。默认值为0.15
maximumRedeliveries                    # 最大重试次数,达到最大重试次数后抛出异常。为-1时不限制次数,为0时表示不进行重传,默认值为6
maximumRedeliveryDelay                # 最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,当重连时间间隔是最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为-1.
initialRedeliveryDelay                # 初始重发延迟时间,默认为1000L
redeliveryDelay                        # 重发延迟时间,当initialRedeliveryDelay=0时生效,默认为1000L
useCollisionAvoidance                # 启用防止冲突功能,默认为false
useExponentialBackOff                # 启用指数倍数递增的方式增加延迟时间,默认false
backOffMultiplier                    # 重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效,默认为5
整合Spring如何使用

死信队列

参考官网:https://activemq.apache.org/message-redelivery-and-dlq-handling

说明:

ActiveMQ中引入了死信队列(Dead Letter Queue)的概念。即一条消息在被重发了多次后(默认重发6次redeliveryCounter==6),将会被ActiveMQ移入"死信队列"。开发人员可以在这个Queue中查看处理出错的消息,进行人工干预

重新执行发货和配送的逻辑

配置:

SharedDeadLetterStrategy(共享的死信队列)

将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ broker端默认的策略

共享队列默认为"ActiveMQ DLQ",可以通过"deadLetterQueue"属性来设定。

代码语言:javascript
复制
<deadLetterStrategy>
    <sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
</deadLetterStrategy>

IndividualDeadLetterStrategy(个性化的死信队列)

把DeadLetter放入各自的死信通道中,

对于Queue而言,死信通道的前缀默认为"ActiveMQ.DLQ.Queue.";

对于Topic而言,死信通道的前缀默认为"ActiveMQ.DLQ.Topic.";

比如队列Order,那么它对应的 死信通道为ActiveMQ.DLQ.Queue.Order。我们使用"queuePrefix" "topicPrefix"来指定上述前缀。

默认情况下,无论是Topic还是Queue ,broker将使用Queue来保存DeadLeader,即死信通道通常为Queue;不过开发者也可以指定Topic。

代码语言:javascript
复制
<policyEntry queue="order">
    <deadLetterStrategy>
        <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="false"/>
    </deadLetterStrategy>
</policyEntry>

将队列order中出现的DeadLetter保存在DLQ.Order中,不过此时DLQ.Order为Topic

属性"useQueueForTopicMessages" 此值表示是否将Topic的DeadLetter保存在Queue中默认为true

自动删除过期消息

有时需要直接删除过期的消息而不需要发送到死信队列中,"processExpired"表示是否将过期消息放入到死信队列,默认为true

代码语言:javascript
复制
<policyEntry queue=">">
    <deadLetterStrategy>
        <sharedDeadLetterStratgy processExpired = "false"/>
    </deadLetterStrategy>
</policyEntry>
存放非持久的消息到死信队列

默认情况下,ActiveMQ不会把非持久的死消息发送到死信队列中。

processNonPersistent 表示是否将"非持久化"消息放入到死信队列中,默认为false

非持久性如果你想要把非持久的消息发送到死信队列中,需要设置属性processNonPersistent= "true"

代码语言:javascript
复制
<!-- queue=">"  中的 >  类似于sql中的*  就是全部-->
<policyEntry queue=">">
    <deadLetterStrategy>
        <sharedDeadLetterStrategy processNonPersistent="true"/>
    </deadLetterStrategy>
</policyEntry>

如何保证消息不被重复消费呢?幂等性

由于网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。

如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

另一种是token的方式

如果上面两种情况还不行,准备一个第三服务方来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将< id,message>以K-V形式写入redis。那消费者开始消费,先去redis中查询有没有消费记录即可。

以上笔记获取方式:

代码语言:javascript
复制
链接:https://pan.baidu.com/s/1S0sObSiEAVC42FrfYT589g 
提取码:ndhk

过期请私信

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-05-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码上遇见你 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • JMS(Java Message Service)
    • JMS的组成特点
      • JMS的可靠性(事务)
        • JMS开发基本步骤
          • JMS点对点总结
            • JMS发布订阅总结
            • JavaEE
            • Active MQ
              • MQ中间件的落地产品有哪些?
                • 消息队列的详细比较
              • 参考资料
                • 安装
                  • Java实现(两种模式)
                    • POM依赖
                    • Java代码(1对1 Queue):
                    • Queue总结:
                    • Java代码(1对多 Topic):
                    • Topic总结:
                    • 两大模式比较
                  • ActiveMQ的Broker
                    • 嵌入式Broker
                  • Spring整合ActiveMQ
                    • pom依赖
                    • spring xml配置文件
                    • 一对一的Queue代码实现
                    • 一对多的代码实现Topic
                    • 在Spring里面实现消费者不启动,直接通过配置监听完成
                  • Spring Boot整合ActiveMQ
                    • Queue
                    • Topic
                  • ActiveMQ的传输协议
                    • Transmission Control Protocol-(TCP)
                    • New(no) I/O API Protocol-(NIO)
                    • AMQP协议
                    • STOMP协议
                    • MQTT协议
                    • WS协议
                    • 总结 :
                  • ActiveMQ的消息存储和持久化
                    • ActiveMQ的持久化机制
                    • 消息存储机制
                    • ActiveMQ持久化机制小总结
                  • ActiveMQ多点集群
                    • Zookeeper+replicated-leveldb-store的主从集群
                  • 💡难点
                    • 引入消息队列后该如何保证其高可用性
                    • 异步投递Async Sends
                    • 延迟投递和定时投递
                    • 分发策略(未写)
                    • ActiveMQ消费重试机制
                    • 死信队列
                    • 如何保证消息不被重复消费呢?幂等性
                相关产品与服务
                消息队列 TDMQ
                消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档