前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMq 总结

RabbitMq 总结

作者头像
leon公众号精选
发布2022-04-27 16:09:58
4360
发布2022-04-27 16:09:58
举报
  • 基本介绍
  • 交换机类型
  • 公共参数说明
  • 消息手动签收
  • 消费者和生产者时间依赖关系
  • 消费端获取消息模式
  • 解决重复消费问题
  • 死信队列
  • 消息延时推送

基本介绍

  • Broker(消息代理):接受客户端的链接,实现AMQP实体服务
  • Producer:消息生产者
  • Consumer:消息消费者
  • Connection(连接):producer 和 consumer 与 broker的tcp连接
  • Channel(网络信道):基于 connection 创建,消息读写都是在 channel 中进行。客户端可以建立多个channel,每个channel代表一个会话任务
  • VirtualHost(虚拟主机) :一个broker里可以开设多个虚拟主机,用于进行逻辑隔离,最上层的消息路由。类似mysql的database
  • Exchange(交换机) :接收消息,根据路由键转单消息到绑定队列
  • Queue(消息队列) :是 RabbitMQ 的内部对象,用于存储消息。每个消息都会被投入到一个或多个队列。且多个消费者可以订阅同一个 Queue(这时 Queue 中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理)
  • Binding(绑定) :Exchange和队列Queue之间的虚拟链接。
  • Routing Key(路由键) :消息发送给 Exchange时,消息将拥有一个路由键(默认为空), Exchange根据这个路由键将消息发送到匹配的队列中
  • Binding Key(绑定键):将消息路由到所有绑定到该Exchange的Queue,但fanout下bindingkey会失效 图1-1

交换机类型

fanout

消息会发送到所有与该交换机绑定的队列中

图2-1

direct

消息通过RoutingKey精准匹配对应的队列中

图2-2

topic

消息通过RoutingKey模糊匹配到对应的队列中

  • RoutingKey用"."分割字符串
  • *:匹配一个单词
  • #:匹配0个或多个单词 图2-3
headers

不依赖于路由键的匹配规则路由消息,根据发送的消息内容headers属性进行完全匹配(键值对形式)。性能差,基本不使用。

公共参数说明

队列参数

channel.QueueDeclare方法中arguments参数,队列一旦声明,参数将无法更改,添加,删除

参数名称

描述

Features

x-message-ttl

队列中的消息的生存周期,单位毫秒

TTL

x-expires

队列在指定的时间内没有被使用(访问)就会被删除

Exp

x-max-length

设置队列最大长度(先进先丢)

Lim

x-max-length-bytes

队列可以容纳的消息的最大字节数,超过这个字节数,队列头部的消息将会被丢弃

Lim B

x-overflow

队列中的消息溢出时,(默认drop-head)丢弃队列头部的消息或(reject-publish)拒绝接收后面生产者发送过来的所有消息

Ovfl

x-single-active-consumer

一次只能有一个消费者消费消息

SAC

x-dead-letter-exchange

设置当前队列的死信交换机

DLX

x-dead-letter-routing-key

设置死信交换机的路由key,死信交换机会根据该值去找到死信消息存放的队列

DLK

x-max-priority

队列中的消息的优先级最大值,不设置的队列就不支持优先级发送消息

Pri

x-queue-mode

懒人模式的队列会先将交换机推送过来的消息(尽可能多的)保存在磁盘上,以减少内存的占用。当消费者开始消费的时候才加载到内存中。

Args

x-queue-master-locator

master queue host 的分配策略:min-masters、client-local和random

消息参数

参数名称

描述

content_type

消息内容的类型

content_encoding

消息内容的编码格式

priority

消息的优先级

correlation_id

用于将RPC响应与请求相关联

reply_to

回调队列

expiration

消息过期时间,单位毫秒.该参数值优先级>队列参数设置中的消息生存期

message_id

消息id

timestamp

消息的时间戳

type

类型

user_id

用户id

app_id

应用程序id

cluster_id

集群id

消息手动签收

  • 签收异常,没有调用basic.ack;当前会话处于连接状态时,消息转变为unacked状态,其他消费者消费不到,当前会话断开,unacked的消息会重新变为ready状态,其他消费者才能够重新消费
  • 签收正常,成功调用basic.ack,队列中立即删除消息
  • basic.reject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列
  • basic.nack方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到
  • basic.recover是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己

消费者和生产者时间依赖关系

  • 消费者和生产者已知,消费者和生产者之间没有时间依赖关系
  • 生产者已知,消费者未知,需要消费者订阅后才能接收消息

消费端获取消息模式

  • **推模式:**消息中间件主动将消息推送给消费者,消费者需要设置一个缓冲区缓存消息,效率高,但缓冲区可能会溢出
  • **拉模式:**消费者主动从消息中间件拉取消息,网络开销会增加消息延迟,降低系统吞吐量 拉模式适用场景
    • 消费者在某个条件成立时才能消费消息
    • 需要批量拉取消息进行处理,连续调用basicGet方法拉取多条消息,处理完毕一次性返回ACK

解决重复消费问题

  • 利用数据库主键去重
  • 利用Redis的原子性去实现

redis是单线程的,但是性能好也有很多原子性的命令,比如setnx命令,在接收到消息后将消息ID作为key去执行setnx命令,如果执行成功则表示没有执行过这条消息,可以进行消费(setnx命令特点:当且仅当key不存在,将key值设为value值;若key已存在该命令不做任何操作)

  • 使用全局ID区分消息,解决幂等性

生产者在请求头设置messageId,可以用随机ID或业务逻辑唯一ID

死信队列

  • 消息被拒(basicreject or basicnack)并且没有重新入队(requeue=false);
  • 当前队列中的消息数量已经超过最大长度
  • 消息在队列中过期

配置死信队列

代码语言:javascript
复制
    public static void SendMessage()
        {
            var exchangeA = "exchange";
            var routeA = "routekey";
            var queueA = "queue";

            var exchangeD = "dlx.exchange";
            var routeD = "dlx.route";
            var queueD = "dlx.queue";

            var connection = RabbitMQHelper.GetConnection();
            {
                var channel = connection.CreateModel();
                {
                    // 创建死信交换机
                    channel.ExchangeDeclare(exchangeD, type: "fanout", durable: true, autoDelete: false);
                    // 创建死信队列
                    channel.QueueDeclare(queueD, durable: true, exclusive: false, autoDelete: false);
                    // 绑定死信交换机和队列
                    channel.QueueBind(queueD, exchangeD, routeD);

                    channel.ExchangeDeclare(exchangeA, type: "fanout", durable: true, autoDelete: false);
                    channel.QueueDeclare(queueA, durable: true, exclusive: false, autoDelete: false, arguments: 
                                        new Dictionary<string, object> {
                                             { "x-dead-letter-exchange",exchangeD}, //设置当前队列的DLX
                                             { "x-dead-letter-routing-key",routeD}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
                                           //  { "x-message-ttl",10000}, //设置消息的存活时间,即过期时间
                                            { "x-max-length",5}//设置队列最大长度
                                         });
                    channel.QueueBind(queueA, exchangeA, routeA);


                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //发布消息
                    channel.BasicPublish(exchange: exchangeA,
                                         routingKey: routeA,
                                         basicProperties: properties,
                                         body: Encoding.UTF8.GetBytes("hello rabbitmq message"));
                }
            }
            
        } 

重试失败特定次数后放入死信队列

代码语言:javascript
复制
 private static string _exchangeNormal = "Exchange.Normal";  //定义一个用于接收 正常 消息的交换机
        private static string _exchangeRetry = "Exchange.Retry";    //定义一个用于接收 重试 消息的交换机
        private static string _exchangeFail = "Exchange.Fail";      //定义一个用于接收 失败 消息的交换机
        private static string _queueNormal = "Queue.Noraml";        //定义一个用于接收 正常 消息的队列
        private static string _queueRetry = "Queue.Retry";          //定义一个用于接收 重试 消息的队列
        private static string _queueFail = "Queue.Fail";            //定义一个用于接收 失败 消息的队列

        public static void Test()
        {
            var connection = RabbitMQHelper.GetConnection();
            var channel = connection.CreateModel();

            //声明交换机
            channel.ExchangeDeclare(_exchangeNormal, "topic", true, false, null);
            channel.ExchangeDeclare(_exchangeRetry, "topic", true, false, null);
            channel.ExchangeDeclare(_exchangeFail, "topic", true, false, null);

            //定义队列参数
            var queueNormalArgs = new Dictionary<string, object>();
            {
                queueNormalArgs.Add("x-dead-letter-exchange", _exchangeFail);   //指定死信交换机,用于将 Normal 队列中失败的消息投递给 Fail 交换机
            }
            var queueRetryArgs = new Dictionary<string, object>();
            {
                queueRetryArgs.Add("x-dead-letter-exchange", _exchangeNormal);  //指定死信交换机,用于将 Retry 队列中超时的消息投递给 Normal 交换机
                queueRetryArgs.Add("x-message-ttl", 6000);                      //定义 queueRetry 的消息最大停留时间 (原理是:等消息超时后由 broker 自动投递给当前绑定的死信交换机)                                                                             //定义最大停留时间为防止一些 待重新投递 的消息、没有定义重试时间而导致内存溢出
            }
            var queueFailArgs = new Dictionary<string, object>();
            {
            }

            //声明队列
            channel.QueueDeclare(queue: _queueNormal, durable: true, exclusive: false, autoDelete: false, arguments: queueNormalArgs);
            channel.QueueDeclare(queue: _queueRetry, durable: true, exclusive: false, autoDelete: false, arguments: queueRetryArgs);
            channel.QueueDeclare(queue: _queueFail, durable: true, exclusive: false, autoDelete: false, arguments: queueFailArgs);

            //为队列绑定交换机
            channel.QueueBind(queue: _queueNormal, exchange: _exchangeNormal, routingKey: "#");
            channel.QueueBind(queue: _queueRetry, exchange: _exchangeRetry, routingKey: "#");
            channel.QueueBind(queue: _queueFail, exchange: _exchangeFail, routingKey: "#");

            #region 创建一个普通消息消费者
            {
                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += (sender, e) =>
                {
                    var _sender = (EventingBasicConsumer)sender;            //消息传送者
                    var _channel = _sender.Model;                           //消息传送通道
                    var _message = (BasicDeliverEventArgs)e;                //消息传送参数
                    var _headers = _message.BasicProperties.Headers;        //消息头
                    var _content = Encoding.UTF8.GetString(_message.Body.ToArray());  //消息内容
                    var _death = default(Dictionary<string, object>);       //死信参数

                    if (_headers != null && _headers.ContainsKey("x-death"))
                        _death = (Dictionary<string, object>)(_headers["x-death"] as List<object>)[0];

                    try
                    #region 消息处理
                    {
                        Console.WriteLine();
                        Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t(1.0)消息接收:\r\n\t[deliveryTag={_message.DeliveryTag}]\r\n\t[consumerID={_message.ConsumerTag}]\r\n\t[exchange={_message.Exchange}]\r\n\t[routingKey={_message.RoutingKey}]\r\n\t[content={_content}]");

                        throw new Exception("模拟消息处理失败效果。");

                        //处理成功时
                        Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t(1.1)处理成功:\r\n\t[deliveryTag={_message.DeliveryTag}]");

                        //消息确认 (销毁当前消息)
                        _channel.BasicAck(deliveryTag: _message.DeliveryTag, multiple: false);
                    }
                    #endregion
                    catch (Exception ex)
                    #region 消息处理失败时
                    {
                        var retryCount = (long)(_death?["count"] ?? default(long)); //查询当前消息被重新投递的次数 (首次则为0)

                        Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t(1.2)处理失败:\r\n\t[deliveryTag={_message.DeliveryTag}]\r\n\t[retryCount={retryCount}]");

                        if (retryCount >= 2)
                        #region 投递第3次还没消费成功时,就转发给 exchangeFail 交换机
                        {
                            //消息拒绝(投递给死信交换机,也就是上边定义的 ("x-dead-letter-exchange", _exchangeFail))
                            _channel.BasicNack(deliveryTag: _message.DeliveryTag, multiple: false, requeue: false);
                        }
                        #endregion
                        else
                        #region 否则转发给 exchangeRetry 交换机
                        {
                            var interval = (retryCount + 1) * 10; //定义下一次投递的间隔时间 (单位:秒)

                            //定义下一次投递的间隔时间 (单位:毫秒)
                            _message.BasicProperties.Expiration = (interval * 1000).ToString();

                            //将消息投递给 _exchangeRetry (会自动增加 death 次数)
                            _channel.BasicPublish(exchange: _exchangeRetry, routingKey: _message.RoutingKey, basicProperties: _message.BasicProperties, body: _message.Body);

                            //消息确认 (销毁当前消息)
                            _channel.BasicAck(deliveryTag: _message.DeliveryTag, multiple: false);
                        }
                        #endregion
                    }
                    #endregion
                };
                channel.BasicConsume(queue: _queueNormal, autoAck: false, consumer: consumer);
            }
            #endregion
        }

消息延时推送

  • 过期队列+死信交换机
  • RabbitMQ 3.6.x 开始可以使用延迟插件,交换机类型选择x-delayed-message(延迟将数据放入队列)
代码语言:javascript
复制
   public static void ConsumerMessage()
   {
            var connection = RabbitMQHelper.GetConnection();
            var channel = connection.CreateModel();

            var exchangeArgumets = new Dictionary<string, object>
            {
                { "x-delayed-type", "topic" }  //延迟交换机的类型
            };
            channel.ExchangeDeclare("delay_exchange", "x-delayed-message", true, false, exchangeArgumets);

            // 创建队列
            string queueName1 = "delay_queue1";
            channel.QueueDeclare(queueName1, false, false, false, null);
            string queueName2 = "delay_queue2";
            channel.QueueDeclare(queueName2, false, false, false, null);
            string queueName3 = "delay_queue3";
            channel.QueueDeclare(queueName3, false, false, false, null);
            // 绑定到交互机
            channel.QueueBind(queue: queueName1, exchange: "delay_exchange", routingKey: "delayed-direct1");
            channel.QueueBind(queue: queueName2, exchange: "delay_exchange", routingKey: "delayed-direct2");
            channel.QueueBind(queue: queueName3, exchange: "delay_exchange", routingKey: "delayed-direct3");

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true; // 标记消息持久化

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                var routingKey = ea.RoutingKey;
                var _headers = ea.BasicProperties.Headers;        //消息头
                int delay = 0;
                if (_headers == null)
                {
                    ea.BasicProperties.Headers = new Dictionary<string, object>();
                }
                else  if ( _headers.ContainsKey("x-delay"))
                {
                    delay = Convert.ToInt32(ea.BasicProperties.Headers["x-delay"]);
                    delay = delay + 20000;
                }
                ea.BasicProperties.Headers["x-delay"] = delay; //消息头设置消息延迟的时间
                Console.WriteLine($" {DateTime.Now}=={delay}");
                Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
               
                channel.BasicPublish(ea.Exchange, ea.RoutingKey, basicProperties: ea.BasicProperties, body);
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };

            channel.BasicConsume(queue: queueName3,
                                 autoAck: false,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 架构师高级俱乐部 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 交换机类型
    • fanout
      • direct
        • topic
          • headers
          • 公共参数说明
          • 消息手动签收
          • 消费者和生产者时间依赖关系
          • 消费端获取消息模式
          • 解决重复消费问题
          • 死信队列
          • 消息延时推送
          相关产品与服务
          消息队列 TDMQ
          消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档