前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【深度知识】RabbitMQ死信队列的原理及GO实现

【深度知识】RabbitMQ死信队列的原理及GO实现

作者头像
辉哥
发布2021-01-29 10:43:01
1.6K0
发布2021-01-29 10:43:01
举报
文章被收录于专栏:区块链入门

RabbitMQ2

1. 摘要

本文按照以下目前讲解RabbitMQ死信队列的内容,包括: (1)死信队列是什么? (2)如何配置死信队列? (3)死信队列代码实现演示(GO版本/JAV版本) (3)死信队列的应用场景? 网上Java版本的死信队列演示代码较多,特定找了GO版本的代码供大家演示使用。

2. 内容

2.1 死信队列是什么?

注意:业务队列与死信交换机的绑定是在构建业务队列时,通过参数(x-dead-letter-exchange和x-dead-letter-routing-key)的形式进行指定。

死信,在官网中对应的单词为“Dead Letter”,可以看出翻译确实非常的简单粗暴。那么死信是个什么东西呢?

“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况之一: (1)消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。 (2)消息在队列的存活时间超过设置的TTL时间。 (3)消息队列的消息数量已经超过最大队列长度。 那么该消息将成为“死信”。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

2.2 如何配置死信队列?

这一部分将是本文的关键,如何配置死信队列呢?其实很简单,大概可以分为以下步骤: (1)配置业务队列,绑定到业务交换机上 (2)为业务队列配置死信交换机和路由key (3)为死信交换机配置死信队列

注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。

有了死信交换机和路由key后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。

2.3 RabbitMQ死信队列GO代码实现

JAVA版本的代码实现参考该篇文章 《【RabbitMQ】一文带你搞定RabbitMQ死信队列》 和代码 https://github.com/MFrank2016/dead-letter-demo​,本文只讲GO代码实现。

(1)生产者

代码语言:javascript
复制
package product

import (
    "fmt"
    "github.com/streadway/amqp"
)

func ProducerDlx()  {
    var(
        conn *amqp.Connection
        err error
        ch *amqp.Channel
    )
    if conn, err = amqp.Dial("amqp://liulong:liulong@127.0.0.1:5672/"); err!=nil{
        fmt.Println("amqp.Dial err :", err)
        return
    }
    defer conn.Close()

    if ch, err = conn.Channel(); err!=nil{
        fmt.Println("conn.Channel err: ", err)
        return
    }

    defer ch.Close()
    
    //func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
    //声明交换器
    if err = ch.ExchangeDeclare(
        "long_abc", // Exchange names
        amqp.ExchangeDirect,//"direct", "fanout", "topic" and "headers"
        true,
        false,//Durable and Non-Auto-Deleted exchanges会一直保留
        false,
        false,
        nil,
    ); err!=nil{
        fmt.Println("ch.ExchangeDeclare err: ", err)
        return
    }
    
    //func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
    //发送消息
    if err = ch.Publish(
        "long_abc",
        "zhe_mess",
        false,
        false,
        amqp.Publishing{
            Headers: amqp.Table{},
            ContentType:"text/plain",
            Body:[]byte("hello world dlx"),
            DeliveryMode:amqp.Persistent,//需要做持久化保留
            Priority:0,
        },
    ); err!=nil{
        fmt.Println("ch.Publish err: ", err)
        return
    }
}

(2)消费正常队列

代码语言:javascript
复制
package consum

import (
    "fmt"
    "github.com/streadway/amqp"
    "time"
)


func Consumer()  {
    var(
        conn *amqp.Connection
        err error
        ch *amqp.Channel
        queue amqp.Queue
        dlxExchangeName string
        delvers <- chan amqp.Delivery
        message amqp.Delivery
        ok bool
    )
    if conn, err = amqp.Dial("amqp://liulong:liulong@127.0.0.1:5672/"); err!=nil{
        fmt.Println("amqp.Dial err :", err)
        return
    }
    defer conn.Close()

    if ch, err = conn.Channel(); err!=nil{
        fmt.Println("conn.Channel err: ", err)
        return
    }

    defer ch.Close()

    //func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
    //设置未确认的最大消息数
    if err = ch.Qos(3, 0, false); err!=nil{
        fmt.Println("ch.Qos err: ", err)
        return
    }

    dlxExchangeName = "dlx_exchange"

    //声明交换器
    if err = ch.ExchangeDeclare(
        "long_abc",
        amqp.ExchangeDirect,
        true,
        false,
        false,
        false,
        nil,
    ); err!=nil{
        fmt.Println("ch.ExchangeDeclare err: ", err)
        return
    }

    argsQue := make(map[string]interface{})
        //添加死信队列交换器属性
    argsQue["x-dead-letter-exchange"] = dlxExchangeName
    //指定死信队列的路由key,不指定使用队列路由键
    //argsQue["x-dead-letter-routing-key"] = "zhe_mess"
    //添加过期时间
    argsQue["x-message-ttl"] = 6000  //单位毫秒
    //声明队列
    queue, err = ch.QueueDeclare("zhe_123", true, false, false, false, argsQue)
    if err !=nil{
        fmt.Println("ch.QueueDeclare err :", err)
        return
    }

    //绑定交换器/队列和key
    //func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error
    if err = ch.QueueBind(queue.Name, "zhe_mess", "long_abc", false, nil);err!=nil{
        fmt.Println("ch.QueueBind err: ", err)
        return
    }
    //开启推模式消费
    //func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
    delvers, err = ch.Consume(
        queue.Name,
        "",
        false,
        false,
        false,
        false,
        nil,
    )

    if err!=nil{
        fmt.Println("ch.Consume err: ", err)
    }

    //消费接收到的消息
    for{
        select {
        case message, ok = <- delvers:
            if !ok{
                continue
            }
            go func() {
                //处理消息
                time.Sleep(time.Second*2)
                //确认接收到的消息
                if err = message.Ack(true); err!=nil{
                //TODD: 获取到消息后,在过期时间内如果未进行确认,此消息就会流入到死信队列,此时进行消息确认就会报错
                    fmt.Println("d.Ack err: ", err)
                    return
                }
                fmt.Println("已确认", string(message.Body))
            }()
        case <-time.After(time.Second*1):

        }
    }

(3)消费死信队列

代码语言:javascript
复制
package consum

import (
    "fmt"
    "github.com/streadway/amqp"
    "time"
)

func ConsumerDlx()  {

    var(
        conn *amqp.Connection
        ch *amqp.Channel
        queue amqp.Queue
        err error
        delvers <- chan amqp.Delivery
        message amqp.Delivery
        ok bool
    )

    //链接rbmq
    if conn, err = amqp.Dial("amqp://liulong:liulong@52.83.64.102:5672/");err!=nil{
        fmt.Println("amqp.Dial err: ", err)
        return
    }

    //声明信道
    if ch, err = conn.Channel(); err!=nil{
        fmt.Println("conn.Channel err: ", err)
        return
    }


    //声明交换机
    if err = ch.ExchangeDeclare(
        "dlx_exchange",
        amqp.ExchangeFanout,   //交换机模式fanout
        true,          //持久化
        false,      //自动删除
        false,        //是否是内置交互器,(只能通过交换器将消息路由到此交互器,不能通过客户端发送消息
        false,
        nil,
    ); err!=nil{
        fmt.Println("ch.ExchangeDeclare: ", err)
        return
    }

    //声明队列
    if queue, err = ch.QueueDeclare(
        "dlx_queue",       //队列名称
        true,      //是否是持久化
        false,   //是否不需要确认,自动删除消息
        false,   //是否是排他队列
        false,    //是否等待服务器返回ok
        nil,
    ); err!=nil{
        fmt.Println("ch.QueueDeclare err: ", err)
        return
    }


    //将交换器和队列/路由key绑定
    if err = ch.QueueBind(queue.Name, "", "dlx_exchange", false, nil); err!=nil{
        fmt.Println("ch.QueueBind err: ", err)
        return
    }



    //开启推模式消费
    delvers, err = ch.Consume(
        queue.Name,
        "",
        false,
        false,
        false,
        false,
        nil,
    )

    if err!=nil{
        fmt.Println("ch.Consume err: ", err)
    }

    //消费接收到的消息
    for{
        select {
        case message, ok = <- delvers:
            if !ok{
                continue
            }
            go func() {
                //处理消息
                time.Sleep(time.Second*3)
                //确认接收到的消息
                if err = message.Ack(true); err!=nil{
                    fmt.Println("dlx d.Ack err: ", err)
                    return
                }
                fmt.Println("已确认dlx", string(message.Body))
            }()
        case <-time.After(time.Second*1):

        }
    }
}

(4)死信消息变化

那么“死信”被丢到死信队列中后,会发生什么变化呢?

如果队列配置了参数 x-dead-letter-routing-key 的话,“死信”的路由key将会被替换成该参数对应的值。如果没有设置,则保留该消息原有的路由key。

举个例子:

如果原有消息的路由key是testA,被发送到业务Exchage中,然后被投递到业务队列QueueA中,如果该队列没有配置参数x-dead-letter-routing-key,则该消息成为死信后,将保留原有的路由keytestA,如果配置了该参数,并且值设置为testB,那么该消息成为死信后,路由key将会被替换为testB,然后被抛到死信交换机中。

另外,由于被抛到了死信交换机,所以消息的Exchange Name也会被替换为死信交换机的名称。

消息的Header中,也会添加很多奇奇怪怪的字段,修改一下上面的代码,在死信队列的消费者中添加一行日志输出:

代码语言:javascript
复制
log.info("死信消息properties:{}", message.getMessageProperties());

然后重新运行一次,即可得到死信消息Header中被添加的信息:

死信消息properties:

代码语言:javascript
复制
死信消息properties:MessageProperties [headers={x-first-death-exchange=dead.letter.demo.simple.business.exchange, x-death=[{reason=rejected, count=1, exchange=dead.letter.demo.simple.business.exchange, time=Sun Jul 14 16:48:16 CST 2019, routing-keys=[], queue=dead.letter.demo.simple.business.queuea}], x-first-death-reason=rejected, x-first-death-queue=dead.letter.demo.simple.business.queuea}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAREVTS1RPUC1DUlZGUzBOAAAPQAAAAAAB.bLbsdR1DnuRSwiKKmtdOGw==, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dead.letter.demo.simple.deadletter.exchange, receivedRoutingKey=dead.letter.demo.simple.deadletter.queuea.routingkey, deliveryTag=1, consumerTag=amq.ctag-NSp18SUPoCNvQcoYoS2lPg, consumerQueue=dead.letter.demo.simple.deadletter.queuea]

Header中看起来有很多信息,实际上并不多,只是值比较长而已。下面就简单说明一下Header中的值:

字段名

含义

x-first-death-exchange

第一次被抛入的死信交换机的名称

x-first-death-reason

第一次成为死信的原因,rejected:消息在重新进入队列时被队列拒绝,由于default-requeue-rejected 参数被设置为false。expired :消息过期。maxlen : 队列内消息数量超过队列最大容量

x-first-death-queue

第一次成为死信前所在队列名称

x-death

历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新

3.参考

(1)RabbitMQ系列(十三)RabbitMQ的死信队列 https://juejin.cn/post/6844903823643934727

(2)【RabbitMQ】一文带你搞定RabbitMQ死信队列 https://www.cnblogs.com/mfrank/p/11184929.html

(3)go 调用rabbitmq 死信队列 https://juejin.cn/post/6844903942661373965

(4)RABBITMP的GO消息接口 https://godoc.org/github.com/streadway/amqp

(5)死信队列的官方介绍 https://www.rabbitmq.com/dlx.html

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 摘要
  • 2. 内容
    • 2.1 死信队列是什么?
      • 2.2 如何配置死信队列?
        • 2.3 RabbitMQ死信队列GO代码实现
          • (1)生产者
          • (2)消费正常队列
          • (3)消费死信队列
          • (4)死信消息变化
      • 3.参考
      相关产品与服务
      消息队列 CMQ
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档