前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ:消息分发模型

RabbitMQ:消息分发模型

作者头像
栗筝i
发布2022-12-27 17:19:09
4150
发布2022-12-27 17:19:09
举报
文章被收录于专栏:迁移内容迁移内容迁移内容

Work queues,也被称为 Task queues,任务模型,当消息处理比较耗时时的时候,可能产生消息的速度会远远大于消息的消费速度,消息会堆积越来越多,无法及时处理,此时就可以使用work模型:让多个消费者绑定一个队列,共同消费队列中的消息,队列中的消息一旦消费,就会消失,因此任务不会被重复执行。 RabbitMQ 单生产单消费模型主要有以下三个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
  • 消费者2(consumer):领取任务并完成…
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

~ 本篇内容包括:RabbitMQ 消息分发模型、RabbitMQ 消息分发模型实现、RabbitMQ 手动消息确认


文章目录


一、RabbitMQ 消息分发模型

1、消息分发模型(Work Queue 模型)

Work queues,也被称为 Task queues,任务模型,当消息处理比较耗时时的时候,可能产生消息的速度会远远大于消息的消费速度,消息会堆积越来越多,无法及时处理,此时就可以使用work模型:让多个消费者绑定一个队列,共同消费队列中的消息,队列中的消息一旦消费,就会消失,因此任务不会被重复执行。

image-20221212152117164
image-20221212152117164
2、消息分发模型组成

RabbitMQ 单生产单消费模型主要有以下四个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
  • 消费者2(consumer):领取任务并完成…
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

二、RabbitMQ 消息分发模型实现

1、添加 Maven 依赖

# 在 pom.xml 文件中添加以下依赖

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.16.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>
  	...
</dependencies>
2、封装工具类 ConnectionUtil

# 连接工具类封装-ConnectionUtil

package com.lizhengi.work.demo1;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 连接工具类封装
 * @date 2022-12-23 11:29 上午
 **/
public class ConnectionUtil {

    /**
     * 创建 MQ 连接工厂对象
     */
    private static final ConnectionFactory CONNECTION_FACTORY;

    // 类加载时,重量级资源加载
    static {
        CONNECTION_FACTORY = new ConnectionFactory();
        // 设置连接MQ主机
        CONNECTION_FACTORY.setHost("127.0.0.1");
        // 设置连接MQ端口号
        CONNECTION_FACTORY.setPort(5672);
        // 设置连接MQ虚拟主机
        CONNECTION_FACTORY.setVirtualHost("/test");
        // 设置连接MQ虚拟主机的用户名密码
        CONNECTION_FACTORY.setUsername("admin");
        CONNECTION_FACTORY.setPassword("123456");
    }

    /**
     * 建立与RabbitMQ连接 工具方法
     *
     * @return Connection
     */
    public static Connection getConnection() {
        try {
            // 返回连接对象
            return CONNECTION_FACTORY.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 关闭通道和连接 工具方法
     *
     * @param channel    Channel
     * @param connection Connection
     */
    public static void closeConnectionAndChanel(Channel channel, Connection connection) {
        try {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
3、生产者实现
package com.lizhengi.work.demo1;

import com.lizhengi.hello.demo2.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.testng.annotations.Test;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description Rabbit 消息分发模型 生产者
 * @date 2022-12-23 11:32 上午
 **/
public class Producer {

    /**
     * 消息生产
     *
     * @throws IOException IOException
     */
    @Test
    public void testSendMessage() throws IOException {
        // 获取连接对象
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        assert connection != null;
        Channel channel = connection.createChannel();
        // 通过通道声明队列
        channel.queueDeclare("WorkQueue", true, false, false, null);
        // 生产消息
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("", "WorkQueue", null, ("Hello RabbitMQ " + i).getBytes());
        }
        // 资源关闭
        ConnectionUtil.closeConnectionAndChanel(channel, connection);
    }
}
4、消费者-1 实现
package com.lizhengi.work.demo1;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description Rabbit 消息分发模型 消费者-1
 * @date 2022-12-23 11:32 上午
 **/
public class Customer1 {
    public static void main(String[] msg) throws IOException {

        Connection connection = ConnectionUtil.getConnection();

        assert connection != null;
        Channel channel = connection.createChannel();

        channel.queueDeclare("WorkQueue", true, false, false, null);

        channel.basicConsume("WorkQueue", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者1:" + new String(body));
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

    }
}
5、消费者-2 实现
package com.lizhengi.work.demo1;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description Rabbit 消息分发模型 消费者-2
 * @date 2022-12-23 11:32 上午
 **/
public class Customer2 {
    public static void main(String[] msg) throws IOException {

        Connection connection = ConnectionUtil.getConnection();

        assert connection != null;
        Channel channel = connection.createChannel();

        channel.queueDeclare("WorkQueue", true, false, false, null);

        channel.basicConsume("WorkQueue", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者2:" + new String(body));
            }
        });

    }
}
6、消息队列的循环机制

运行上面的消费者与生产者,观察消费者打印:

消费者1:Hello RabbitMQ 1 消费者1:Hello RabbitMQ 3 消费者1:Hello RabbitMQ 5 消费者1:Hello RabbitMQ 7 消费者1:Hello RabbitMQ 9

消费者2:Hello RabbitMQ 0 消费者2:Hello RabbitMQ 2 消费者2:Hello RabbitMQ 4 消费者2:Hello RabbitMQ 6 消费者2:Hello RabbitMQ 8

可以观察到,两个消费者依次打印了队列中的消息,哪怕是在其中一个加上 Thread.sleep(1000); 依然是这种循环机制!


三、RabbitMQ 手动消息确认

1、消费者-1 实现
package com.lizhengi.work.demo2;

import com.lizhengi.work.demo1.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description Rabbit 消息分发模型 消费者-1
 * @date 2022-12-23 11:32 上午
 **/
public class Customer1 {
    public static void main(String[] msg) throws IOException {

        Connection connection = ConnectionUtil.getConnection();

        assert connection != null;
        Channel channel = connection.createChannel();
        // 设置每次只能消费一个消息
        channel.basicQos(1);

        channel.queueDeclare("WorkQueue", true, false, false, null);
        // 参数2:自动消息确认关闭
        channel.basicConsume("WorkQueue", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1:" + new String(body));
                /*  手动消息确认
                 *  参数1:确认队列中的那个具体消息
                 *  参数2:是否开启多个消息同时确认
                 */
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

    }
}
2、消费者-2 实现
package com.lizhengi.work.demo2;

import com.lizhengi.work.demo1.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description Rabbit 消息分发模型 消费者-2
 * @date 2022-12-23 11:32 上午
 **/
public class Customer2 {
    public static void main(String[] msg) throws IOException {

        Connection connection = ConnectionUtil.getConnection();

        assert connection != null;
        Channel channel = connection.createChannel();

        //设置每次只能消费一个消息
        channel.basicQos(1);

        channel.queueDeclare("WorkQueue", true, false, false, null);
        channel.basicConsume("WorkQueue", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

    }
}
3、实现能者多劳

运行上面的消费者与生产者,观察消费者打印:

消费者1:Hello RabbitMQ 0 消费者1:Hello RabbitMQ 2 消费者1:Hello RabbitMQ 3 消费者1:Hello RabbitMQ 4 消费者1:Hello RabbitMQ 5 消费者1:Hello RabbitMQ 6 消费者1:Hello RabbitMQ 7 …

消费者2:Hello RabbitMQ 1 消费者2:Hello RabbitMQ 64 消费者2:Hello RabbitMQ 76 消费者2:Hello RabbitMQ 89

可以观察到,消费者1开启了手动消费机制后进行了更大比重消息数量的消费!

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-12-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 一、RabbitMQ 消息分发模型
    • 1、消息分发模型(Work Queue 模型)
      • 2、消息分发模型组成
      • 二、RabbitMQ 消息分发模型实现
        • 1、添加 Maven 依赖
          • 2、封装工具类 ConnectionUtil
            • 3、生产者实现
              • 4、消费者-1 实现
                • 5、消费者-2 实现
                  • 6、消息队列的循环机制
                  • 三、RabbitMQ 手动消息确认
                    • 1、消费者-1 实现
                      • 2、消费者-2 实现
                        • 3、实现能者多劳
                        相关产品与服务
                        消息队列 CMQ 版
                        消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档