在第一个教程中,我们 编写程序以从命名队列发送和接收消息。在此 我们将创建一个将用于分发的工作队列 多个工作人员之间的耗时任务。
工作队列(又名:任务队列)背后的主要思想是避免 立即执行资源密集型任务,必须等待 它要完成。相反,我们将任务安排在以后完成。我们将任务封装为消息并将其发送到队列。正在运行的工作进程 在后台将弹出任务并最终执行 工作。当您运行许多工作线程时,任务将在它们之间共享。
这个概念在 Web 应用程序中特别有用,因为它 在短 HTTP 请求期间无法处理复杂的任务。
在本教程的上一部分,我们发送了一条消息,其中包含 “你好世界!”现在我们将发送代表复杂的字符串 任务。我们没有现实世界的任务,例如要调整大小的图像或 要渲染的 pdf 文件,所以我们假装通过时间睡眠 sleep
来表示正在处理一个复杂的任务。
我们将稍微修改前面示例中的 send.go
代码, 以允许从命令行发送任意消息。这 程序会将任务调度到我们的工作队列中,因此我们将其命名为 new_task.go
:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
下面是 bodyFrom
函数:
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
我们旧的 receive.go
脚本也需要一些更改:它需要 为邮件正文中的每个点伪造一秒钟的工作。它会弹出 来自队列的消息并执行任务,因此我们将其称为 worker.go
:
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
请注意,我们的假任务模拟执行时间。
接下来,我们创建两个 shell 终端,一个是消息的生产者终端,一个是消息的消费者终端:
# 消费者终端
go run worker.go
# 生产者终端
go run new_task.go
使用任务队列的优点之一是能够轻松 并行工作。如果我们正在积压工作,我们可以 添加更多消费者角色,来消费队列中的消息。
首先,让我们尝试同时运行两个 worker.go
脚本。他们 两者都会从队列中获取消息,但究竟如何?你需要打开三个终端,两个运行 work.go
脚本。这两个控制台我们称之为消费者 C1 和 C2 。
# shell 1
go run worker.go
# shell 2
go run worker.go
接下来,创建一个新的终端来充当消息的生产者,负责任务的发送。
# shell 3
go run new_task.go First message.
go run new_task.go Second message..
go run new_task.go Third message...
go run new_task.go Fourth message....
go run new_task.go Fifth message.....
接下来,去查看终端1和终端2的输出打印结果:
# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
通过上面的两个消费者 C1 和 C2 终端的输出结果可以看出,队列中的消息不同的分配给了两个消费者。这是因为 RabbitMQ 默认情况下,会将每条消息发送给下一个消费者,每个消费者将获得相同数量的 消息。这种分发消息的方式称为轮询。
在消费者处理消息的过程中,并不是一帆风顺的,当消费者出现异常时,消息没被正常的处理结束,这很容易导致消息的丢失。默认情况下,一旦 RabbitMQ 向消费者传递消息,它立即将其标记为删除。
针对这个问题发生时,我们希望该条未被正常处理的消息,能够重新分配给其他的消费者进行处理。
为了确保消息永远不会丢失,RabbitMQ 支持消息确认。一个确认(现在)被发回 消费者告诉 RabbitMQ 已收到特定消息处理,RabbitMQ 可以自由删除它。
如果消费者死亡(其通道关闭,连接关闭,或 TCP连接丢失)如果不发送确认,RabbitMQ 将了解消息未完全处理,并将重新排队。如果同时有其他消费者在线,它将迅速重新交付给另一个消费者。这样您就可以确保不会丢失任何消息。
在使用消息确认时,可能会存在确认超时的场景(默认为30分钟),这有助于检测从不确认的消费者,同时也可以使用延迟机制来增加超时的控制。
在本教程中,我们将通过消息确认参数 auto-ack
设置为 false
,来实现手动的消息确认。一旦确认之后,才表示消费者把消息进行了正常消费。
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
使用此代码,您可以确保即使您使用 CTRL+C 来终止正在处理的消费者时,也不不会丢失任何内容。很快工作线程终止后,将重新传递所有未确认的消息。
使用消息确认机制需要特别注意,当大量消息未被确认时,这可能导致越来越多的内存被占用。可以使用 RabbitMQ 内置命令进行检测。
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged`
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
我们已经学会了如何确保即使消费者死亡, 任务不会丢失。但是如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息。需要做两件事来确保 消息不会丢失:我们需要将队列和消息都标记为可持久化
。
q, err := ch.QueueDeclare(
"hello", // name
true, // 将其设置为true,标识可持久化到磁盘中
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
虽然这个命令本身是正确的,但它在我们目前不起作用。这是因为我们已经定义了一个名为 hello
的队列,它不持久。
RabbitMQ 不允许您重新定义现有队列 使用不同的参数,并将向任何程序返回错误
。要解决这个办法,最简单的方式就是重新声明一个队列。
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
durable的选项值,需要生产者和消费者保持一致的配置。
需要注意的时,虽然在声明队列时设置了持久化,这并不能完全保证消息的不丢失。这是因为消息可能是保存在缓存中,还需要产生系统调用才能写入到磁盘中。不过开启持久化的配置,基本能降低消息丢失的几率。如果你需要更强的保证,可以使用消息发布确认模式
。
您可能已经注意到调度仍然无法如我们所愿。例如,在有两个消费者的情况下,当所有 奇数消息很重,偶数消息很轻,一个消费者将是 经常忙碌,另一个几乎不会做任何工作。井 RabbitMQ对此一无所知,仍然会调度 消息均匀。
发生这种情况是因为 RabbitMQ 只是在消息时调度消息 进入队列。它不看未确认的数量 面向消费者的消息。它只是盲目地发送每 n 条消息 给第 n 个消费者。
为了解决这个问题,我们可以用 prefetch
值为1解决。这告诉 RabbitMQ 一次给消费者一条消息。换句话说,在一个消费者还未完全处理完消息时,不要向其分发新的消息。相反,它会将消息调度给下一个尚未繁忙的消费者。
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")