前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabitMQ&Java使用说明

RabitMQ&Java使用说明

作者头像
俺也想起舞
发布2020-07-01 16:30:33
4170
发布2020-07-01 16:30:33
举报

RabbitMQ的安装以及五种队列模式

RabitMQ&Java使用说明

RabbitMQ简介

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端

RabbitMQ安装

docker一键安装

代码语言:javascript
复制
# 拉去镜像(后缀为management表示为带图形化管理界面的版本)
docker pull docker.io/rabbitmq:3.8-management
# 启动镜像
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin [ent_id]
# -d 后台运行容器;
# --name 指定容器名;
# -p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);
# -v 映射目录或文件;
# --hostname  主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
# -e 指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)

RabbitMQ中的五种队列

  • Simplest Queue
  • Work Queue
  • Publish/Subscibe
  • Routing
  • Topics

导入依赖

代码语言:javascript
复制
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.4.1</version>
</dependency>
SimplestQueue(简单队列)
  • p(producing):生产者,用于生产消息并推送到队列中
  • 红色:消息队列,用于缓存生产者推送的消息,消费者可以从中取出消息
  • c(Consuming):消费者,读取队列中的消息

代码

工具方法码

代码语言:javascript
复制
//用于返回一个连接
public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("localhost");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("testhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
   }

生产者

代码语言:javascript
复制
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();

// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 消息内容
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭通道和连接
channel.close();
connection.close();

消费者

代码语言:javascript
复制
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);

// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);

// 获取消息
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [x] Received '" + message + "'");
}
Work Queue
  • 一个生产者、两个消费者
  • 一条消息只能被一个消费者读取

生产者

代码语言:javascript
复制
String QUEUE_NAME = "test_queue_work"; 

// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

for (int i = 0; i < 100; i++) {
    // 消息内容
    String message = "" + i;
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    Thread.sleep(i * 10);
}

channel.close();
connection.close();

消费者X2

代码语言:javascript
复制
String QUEUE_NAME = "test_queue_work";
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 同一时刻服务器只会发一条消息给消费者
//channel.basicQos(1);

// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, true, consumer);

// 获取消息
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [y] Received '" + message + "'");
    // 返回确认状态,注释掉表示使用自动确认模式
    //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

两种分发模式

  • 轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。
  • 公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

默认情况下是使用的轮询分发模式。将上述代码注释移除,并将channel.basicConsume(QUEUE_NAME, false, consumer);设置为false,则会采用公平分发

Publish/Subscibe(订阅模式)
  • 一个生产者,多个消费者
  • 每个消费者都有自己的队列
  • 生产者没有将消息直接发送到队列,而是发送到了交换机
  • 每个队列都要绑定到交换机
  • 生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的

PS:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费

生产者

代码语言:javascript
复制
// 交换机名称
String EXCHANGE_NAME = "test_exchange_fanout";
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

// 消息内容
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

channel.close();
connection.close();

PS:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。

消费者

代码语言:javascript
复制
//另一个消费则需要将队列名称换成另外一个例如test_queue_work2,其他代码相同
String QUEUE_NAME = "test_queue_work1";
String EXCHANGE_NAME = "test_exchange_fanout";

// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [Recv] Received '" + message + "'");
    Thread.sleep(10);

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
Routing(路由模式)

Publish/Subscibe模式中,所有的消息均会发送到所有的消费者,但是目前有这样一个场景,所有的日志记录必须发送到消费者A,用于记录消息,但是只有错误的日志需要发送到消费者B,这是就需Exchange有过滤功能 在Routing模式下,就可以实现这个功能

生产者

代码语言:javascript
复制
Connection connection = ConnectUtils.getConnection();
Channel channel = connection.createChannel();

//声明Exchange
channel.exchangeDeclare(EXCHANGE_NAME,"direct");

//分别发送两条消息到"delete"、"insert"渠道
channel.basicPublish(EXCHANGE_NAME,"delete",null,"删除商品".getBytes());
channel.basicPublish(EXCHANGE_NAME,"insert",null,"插入商品".getBytes());

channel.close();
connection.close();

消费者A

代码语言:javascript
复制
Connection connection = ConnectUtils.getConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME1,false,false,false,null);

//绑定到交换机,接受"insert"、"delete"两个渠道的消息,也就是最终结果会受到两条消息
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"insert");
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"delete");

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);

// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME1, false, consumer);
// 获取消息
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [Recv1] Received '" + message + "'");
    Thread.sleep(10);

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

消费者B

代码语言:javascript
复制
Connection connection = ConnectUtils.getConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME2,false,false,false,null);

//绑定到交换机,只绑定了"delete"渠道,也就是只会受到一条消息
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"delete");

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);

// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME2, false, consumer);
// 获取消息
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [Recv2] Received '" + message + "'");
    Thread.sleep(10);

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
Topics(主题模式)

主题模式是路由模式的一个升级,在过滤条件上更加灵活 主题模式是将路由键和某个模式进行匹配。此时队列需要绑定一个模式上。#匹配一个或多个词,*匹配不多不少一个词。因此audit.#能够匹配到audit.irs.corporate,但是audit.*只会匹配到audit.irs

生产者

代码语言:javascript
复制
Connection connection = ConnectUtils.getConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

//发送两条消息
channel.basicPublish(EXCHANGE_NAME, "routkey.1", null, "routkey消息".getBytes());
channel.basicPublish(EXCHANGE_NAME, "common.1", null, "common消息".getBytes());

channel.close();
connection.close();

消费者A

代码语言:javascript
复制
Connection connection = ConnectUtils.getConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME1, false, false, false, null);

//绑定到交换机
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "routkey.#");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
channel.basicConsume(QUEUE_NAME1, true, consumer);
// 获取消息
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [Recv1] Received '" + message + "'");
}

消费者B

代码语言:javascript
复制
Connection connection = ConnectUtils.getConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME2, false, false, false, null);

//绑定到交换机
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "#.#");

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME2, true, consumer);
// 获取消息
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [Recv2] Received '" + message + "'");
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RabitMQ&Java使用说明
    • RabbitMQ简介
      • RabbitMQ安装
        • RabbitMQ中的五种队列
          • SimplestQueue(简单队列)
          • Work Queue
          • Publish/Subscibe(订阅模式)
          • Routing(路由模式)
          • Topics(主题模式)
      相关产品与服务
      消息队列 CMQ 版
      消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档