前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ中文系列教程三

RabbitMQ中文系列教程三

作者头像
Mandy的名字被占用了
发布2023-02-28 09:58:55
5640
发布2023-02-28 09:58:55
举报

在第一个教程中,我们 编写程序以从命名队列发送和接收消息。在此 我们将创建一个将用于分发的工作队列 多个工作人员之间的耗时任务。

工作队列(又名:任务队列)背后的主要思想是避免 立即执行资源密集型任务,必须等待 它要完成。相反,我们将任务安排在以后完成。我们将任务封装为消息并将其发送到队列。正在运行的工作进程 在后台将弹出任务并最终执行 工作。当您运行许多工作线程时,任务将在它们之间共享。

这个概念在 Web 应用程序中特别有用,因为它 在短 HTTP 请求期间无法处理复杂的任务。

在本教程的上一部分,我们发送了一条消息,其中包含 “你好世界!”现在我们将发送代表复杂的字符串 任务。我们没有现实世界的任务,例如要调整大小的图像或 要渲染的 pdf 文件,所以我们假装通过时间睡眠 sleep 来表示正在处理一个复杂的任务。

我们将稍微修改前面示例中的 send.go 代码, 以允许从命令行发送任意消息。这 程序会将任务调度到我们的工作队列中,因此我们将其命名为 new_task.go

代码语言:javascript
复制
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,
  "",           // exchange
  q.Name,       // routing key
  false,        // mandatory
  false,
  amqp.Publishing {
    DeliveryMode: amqp.Persistent,
    ContentType:  "text/plain",
    Body:         []byte(body),
  })
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

下面是 bodyFrom 函数:

代码语言:javascript
复制
func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

我们旧的 receive.go 脚本也需要一些更改:它需要 为邮件正文中的每个点伪造一秒钟的工作。它会弹出 来自队列的消息并执行任务,因此我们将其称为 worker.go

代码语言:javascript
复制
msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  true,   // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")
代码语言:javascript
复制
var forever chan struct{}

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
    dotCount := bytes.Count(d.Body, []byte("."))
    t := time.Duration(dotCount)
    time.Sleep(t * time.Second)
    log.Printf("Done")
  }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

请注意,我们的假任务模拟执行时间。

接下来,我们创建两个 shell 终端,一个是消息的生产者终端,一个是消息的消费者终端:

代码语言:javascript
复制
# 消费者终端
go run worker.go
代码语言:javascript
复制
# 生产者终端
go run new_task.go

循环调度

使用任务队列的优点之一是能够轻松 并行工作。如果我们正在积压工作,我们可以 添加更多消费者角色,来消费队列中的消息。

首先,让我们尝试同时运行两个 worker.go 脚本。他们 两者都会从队列中获取消息,但究竟如何?你需要打开三个终端,两个运行 work.go 脚本。这两个控制台我们称之为消费者 C1 和 C2 。

代码语言:javascript
复制
# shell 1
go run worker.go
代码语言:javascript
复制
# shell 2
go run worker.go

接下来,创建一个新的终端来充当消息的生产者,负责任务的发送。

代码语言:javascript
复制
# shell 3
go run new_task.go First message.
go run new_task.go Second message..
go run new_task.go Third message...
go run new_task.go Fourth message....
go run new_task.go Fifth message.....

接下来,去查看终端1和终端2的输出打印结果:

代码语言:javascript
复制
# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
代码语言:javascript
复制
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

通过上面的两个消费者 C1 和 C2 终端的输出结果可以看出,队列中的消息不同的分配给了两个消费者。这是因为 RabbitMQ 默认情况下,会将每条消息发送给下一个消费者,每个消费者将获得相同数量的 消息。这种分发消息的方式称为轮询。

消息确认

在消费者处理消息的过程中,并不是一帆风顺的,当消费者出现异常时,消息没被正常的处理结束,这很容易导致消息的丢失。默认情况下,一旦 RabbitMQ 向消费者传递消息,它立即将其标记为删除。

针对这个问题发生时,我们希望该条未被正常处理的消息,能够重新分配给其他的消费者进行处理。

为了确保消息永远不会丢失,RabbitMQ 支持消息确认。一个确认(现在)被发回 消费者告诉 RabbitMQ 已收到特定消息处理,RabbitMQ 可以自由删除它。

如果消费者死亡(其通道关闭,连接关闭,或 TCP连接丢失)如果不发送确认,RabbitMQ 将了解消息未完全处理,并将重新排队。如果同时有其他消费者在线,它将迅速重新交付给另一个消费者。这样您就可以确保不会丢失任何消息。

在使用消息确认时,可能会存在确认超时的场景(默认为30分钟),这有助于检测从不确认的消费者,同时也可以使用延迟机制来增加超时的控制。

在本教程中,我们将通过消息确认参数 auto-ack 设置为 false ,来实现手动的消息确认。一旦确认之后,才表示消费者把消息进行了正常消费。

代码语言:javascript
复制
msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  false,  // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")
代码语言:javascript
复制
var forever chan struct{}

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
    dotCount := bytes.Count(d.Body, []byte("."))
    t := time.Duration(dotCount)
    time.Sleep(t * time.Second)
    log.Printf("Done")
    d.Ack(false)
  }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

使用此代码,您可以确保即使您使用 CTRL+C 来终止正在处理的消费者时,也不不会丢失任何内容。很快工作线程终止后,将重新传递所有未确认的消息。

使用消息确认机制需要特别注意,当大量消息未被确认时,这可能导致越来越多的内存被占用。可以使用 RabbitMQ 内置命令进行检测。

  • unix操作系统
代码语言:javascript
复制
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged`
  • Windows操作系统
代码语言:javascript
复制
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久化

我们已经学会了如何确保即使消费者死亡, 任务不会丢失。但是如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。

当 RabbitMQ 退出或崩溃时,它会忘记队列和消息。需要做两件事来确保 消息不会丢失:我们需要将队列和消息都标记为可持久化

代码语言:javascript
复制
q, err := ch.QueueDeclare(
  "hello",      // name
  true,         // 将其设置为true,标识可持久化到磁盘中
  false,        // delete when unused
  false,        // exclusive
  false,        // no-wait
  nil,          // arguments
)
failOnError(err, "Failed to declare a queue")

虽然这个命令本身是正确的,但它在我们目前不起作用。这是因为我们已经定义了一个名为 hello 的队列,它不持久。

RabbitMQ 不允许您重新定义现有队列 使用不同的参数,并将向任何程序返回错误。要解决这个办法,最简单的方式就是重新声明一个队列。

代码语言:javascript
复制
q, err := ch.QueueDeclare(
  "task_queue", // name
  true,         // durable
  false,        // delete when unused
  false,        // exclusive
  false,        // no-wait
  nil,          // arguments
)
failOnError(err, "Failed to declare a queue")

durable的选项值,需要生产者和消费者保持一致的配置。

需要注意的时,虽然在声明队列时设置了持久化,这并不能完全保证消息的不丢失。这是因为消息可能是保存在缓存中,还需要产生系统调用才能写入到磁盘中。不过开启持久化的配置,基本能降低消息丢失的几率。如果你需要更强的保证,可以使用消息发布确认模式

公平调度

您可能已经注意到调度仍然无法如我们所愿。例如,在有两个消费者的情况下,当所有 奇数消息很重,偶数消息很轻,一个消费者将是 经常忙碌,另一个几乎不会做任何工作。井 RabbitMQ对此一无所知,仍然会调度 消息均匀。

发生这种情况是因为 RabbitMQ 只是在消息时调度消息 进入队列。它不看未确认的数量 面向消费者的消息。它只是盲目地发送每 n 条消息 给第 n 个消费者。

为了解决这个问题,我们可以用 prefetch 值为1解决。这告诉 RabbitMQ 一次给消费者一条消息。换句话说,在一个消费者还未完全处理完消息时,不要向其分发新的消息。相反,它会将消息调度给下一个尚未繁忙的消费者。

代码语言:javascript
复制
err = ch.Qos(
  1,     // prefetch count
  0,     // prefetch size
  false, // global
)
failOnError(err, "Failed to set QoS")
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-01-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 菜鸟成长学习笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 循环调度
  • 消息确认
  • 消息持久化
  • 公平调度
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档