前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go之RabbitMQ(三)优先级队列

Go之RabbitMQ(三)优先级队列

作者头像
灰子学技术
发布2020-06-03 09:23:19
8330
发布2020-06-03 09:23:19
举报
文章被收录于专栏:灰子学技术灰子学技术

1. RabbitMQ优先级队列介绍:

RabbitMQ3.5.0之后官方版本已经实现了优先级队列。数值越大则优先级越高。

创建优先级队列,需要增加x-max-priority参数,指定一个优先级的数值大小,这里最好是0~10之间,用来表示这个queue的最大优先级。(备注:因为生产者和消费者都需要对queue进行声明,所以它们都需要设置这个参数)

生产者在发送消息的时候,需要设置priority属性,最好不要超过上面指定的最大的优先级,一旦超过了这个优先级,发送设置的优先级就不再生效了。在这个范围内的优先级,数字越大,优先级越高。

优先级队列处理的场景,是针对的生产者生产快,消费者消费慢,反之没有意义,毕竟只有queue中有消息堆积的时候,才会需要根据优先级策略进行调度。

创建了rabbitmq的优先级队列之后,界面查看的变化:

2. RabbitMQ优先级队列实现例子

下面是优先级队列实现的代码,生产者一次性创建6个消息,其中奇数优先级为2,偶数优先级为1,并阻塞到RabbitMQ上面。

消费者启动之后,会先消费优先级为2的那些消息,然后才消费优先级是1的那些消息,对于同等优先级的消息,则是按照先进先出的顺序进行消费。

1)生产者代码:

代码语言:javascript
复制
package main

import (
    "fmt"
    "log"
    "os"
    "strings"
    "strconv"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string){
    if err != nil {
        log.Fatalf("%s:%s", msg, err)
        panic(fmt.Sprintf("%s:%s", msg, err))
    }
}

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hoge"
    } else {
        s = strings.Join(args[1:], " ")
    }

    return s
}

func main() {
    // step1: 作为生产者与amqp server 建立一个连接,rabbitmq的提供的默认端口是5672
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    // step2: 在这个连接上面创建channel
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // step3: 创建一个名字叫做hello的queue
    var args amqp.Table
    args = amqp.Table{"x-max-priority":int32(10)} // 设置优先级列表的最初优先级
    q, err := ch.QueueDeclare(
    "priqueue", //name
    true,  //durable
    false,  //delete when unused
    false,  //exclusive
    false,  //no wait
    args,    //arguments
    )
    failOnError(err, "Failed to declare q queue")

    err = ch.Qos(
            1,     // prefetch count
            0,     // prefetch size
            false, // global
    )
    failOnError(err, "Failed to set QoS")
    // step4: 向rabbitmq server发送"Hello"消息
    for i:=0;i<6;i++ {
        body := bodyFrom(os.Args)
        body += strconv.Itoa(i)
        pri := i%2 + 1
        err = ch.Publish(
            "",     //exchange
            q.Name,     // routing key
            false,  //mandatory
            false, //immediate
            amqp.Publishing{
                ContentType: "text/plain",
                Priority: uint8(pri),
                Body :      []byte(body),
            })
        failOnError(err, "Failed to publish a message")
        log.Printf(" [x] Sent %s", body)
    }

    return
}

执行结果:

代码语言:javascript
复制
$ ./send_priority world
2020/05/29 16:21:07  [x] Sent world0
2020/05/29 16:21:07  [x] Sent world1
2020/05/29 16:21:07  [x] Sent world2
2020/05/29 16:21:07  [x] Sent world3
2020/05/29 16:21:07  [x] Sent world4
2020/05/29 16:21:07  [x] Sent world5

2)消费者代码:

代码语言:javascript
复制
package main

import (
    "fmt"
    "log"
    "time"
    "bytes"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string){
    if err != nil {
        log.Fatalf("%s:%s", msg, err)
        panic(fmt.Sprintf("%s:%s", msg, err))
    }
}

func main() {

    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
    failOnError(err, "Failed to connect to server")
    defer conn.Close();

    ch, err := conn.Channel()
    failOnError(err, "Failed to connect to channel")
    defer ch.Close()
    var args amqp.Table
    args = amqp.Table{"x-max-priority":int32(10)} // 设置优先级列表的最初优先级
    q, err := ch.QueueDeclare(
        "priqueue",    //name
        true,      //durable
        false,      //delete when usused
        false,      // exclusive
        false,      //no-wait
        args,        // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
                1,     // prefetch count
                0,     // prefetch size
                false, // global
    )
    failOnError(err, "Failed to set QoS")

    msgs, err := ch.Consume(
        q.Name,     // queue
        "",         // consumer
        false,       // auto-ack
        false,      // exclusive
        false,      // no-local
        false,      // no-wait
        nil,        // arguments
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)
    go func(){
        for d:= range msgs{
            log.Printf("Received a message : %s", d.Body)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t* time.Second)
            log.Printf("Done")
            d.Ack(false)
        }
    }()

    log.Printf(" [*] Waiting for messages, To exit press CTRL+C")
    <-forever
    return
}

执行结果:

代码语言:javascript
复制
$ ./receive2
2020/05/29 16:21:12  [*] Waiting for messages, To exit press CTRL+C
2020/05/29 16:21:12 Received a message : world1
2020/05/29 16:21:12 Done
2020/05/29 16:21:12 Received a message : world3
2020/05/29 16:21:12 Done
2020/05/29 16:21:12 Received a message : world5
2020/05/29 16:21:12 Done
2020/05/29 16:21:12 Received a message : world0
2020/05/29 16:21:12 Done
2020/05/29 16:21:12 Received a message : world2
2020/05/29 16:21:12 Done
2020/05/29 16:21:12 Received a message : world4
2020/05/29 16:21:12 Done

参考文档:

https://www.jianshu.com/p/2a439db39687

https://www.rabbitmq.com/extensions.html

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 灰子学技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档