在前面的教程中,我们创建了一个工作队列。工作队列背后的假设是每个任务都是只交付给一个消费者。在这一部分中,我们将做一些完全不同的事情。我们将向多个消费者传递消息。此模式被称为“发布/订阅”。
为了说明这种模式,我们将构建一个简单的日志记录 系统。它将由两个程序组成,第一个将发出日志消息,第二个将接收并打印这些消息。
在我们的日志记录系统中,接收器程序的每个运行副本都将获取消息。这样我们就能够运行一个接收器和将日志定向到磁盘;同时我们将能够运行另一个接收器,并在屏幕上查看日志。
本质上,已发布的日志消息将消息通过广播的形式传递给所有接收器。
在本教程的前几部分中,我们只是简单的向队列中发送或者获取消息。现在是时候引入完整的消息传递模型了。
让我们快速回顾一下前面教程中介绍的内容:
RabbitMQ 中消息传递模型的核心思想是,生产者从不将任何消息直接
发送到队列。实际上,很多时候,生产者甚至不知道消息是否会传递给人。
相反,生产者只能交换机所发送消息。交换机是一件非常简单的事情。一方面,它接收来自生产者和另一边,它把消息推到队列中。交换机必须确切地知道如何处理它收到的消息。应该是附加到特定队列?是否应该将其附加到许多队列中?或者应该丢弃它。其规则由交换类型定义。
有几种可用的交换类型:direct
、topic
、headers
和 fanout
。这一章节,我们将专注于最后一个类型 - fanout
。让我们创建这种类型的交换机,并调用它日志:
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
fanout
这种交换机的类型非常简单。正如你从名称中知道的,它只是将收到的所有消息广播到所有消费者排队。这正是我们对记录器所需要的。
要列出服务器上的交换机,可以使用 rabbitmqctl
命令:
sudo rabbitmqctl list_exchanges
在此列表中,将有一些 amq.* 交换机和默认(未命名) 的交换机。这些是默认创建的,但您不太可能需要立即使用它们。在本教程的前几部分中,我们对交换一无所知, 但仍然能够将消息发送到队列。因为我们使用的是默认交换,它由空字符串 (“”) 标识。
回想一下我们之前是如何发布的消息的:
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
这里我们使用默认或无名称的交换机:消息是路由到具有 routing_key
参数指定的名称的队列(如果存在)。
现在,我们将之前的代码改为,发布到我们命名的交换机:
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,
"logs", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
您可能还记得以前我们使用的队列具有具体名称(还记得 hello
和 task_queue
吗?能够命名队列,对我们来说至关重要 - 我们需要将消费者指向相同的队列。在以下情况下,为队列命名很重要,希望在生产者和消费者之间共享队列。
但对于我们的记录器来说,情况并非如此。我们想听听所有日志消息,而不仅仅是其中的子集。我们也只对当前流动的消息感兴趣,而不是旧的。为了解决这个问题,我们需要两件事。
首先,每当我们连接到Rabbit时,我们都需要一个新的空队列。为此,我们可以创建一个具有随机名称的队列,或者让服务器为我们选择一个随机队列名称。
其次,一旦我们断开了消费者的连接,队列应该是自动删除。在 amqp
客户端中,当我们提供队列名称时,作为一个空字符串,我们创建一个具有生成名称的非持久队列:
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
当该方法返回时,队列实例包含一个随机队列名称,该队列名称由 RabbitMQ 自动生成。例如,它可能看起来像 amq.gen-JzTY20BRgKO-HjmUJj0wLg
的形式。当声明它的连接关闭时,队列将被删除,因为它被声明为排他性。您可以了解有关独占标志和其他队列的更多信息。
我们已经创建了一个 fanout
类型的交换机和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。这种关系,交换和队列之间称为绑定。
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil,
)
从现在开始,日志交换机会将消息附加到我们的队列中。
你可以通过使用下面的命令,来罗列出所有的绑定关系。
rabbitmqctl list_bindings
发出日志消息的生产者程序看起来不多。最重要的变化是,我们现在希望将消息发布到我们的日志交换机。我们需要在发送时提供一个路由密钥,但它对于 fanout
类型的交换机,将忽略值。下面是 emit_log.go
脚本的代码:
package main
import (
"context"
"log"
"os"
"strings"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,
"logs", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
如您所见,建立连接后,我们声明了交换机。此步骤是必需的,如果交换机不存在,则会被禁止发送消息。如果还没有队列绑定到交换机,消息将丢失。但这对我们来说没关系(当消息不重要时),如果没有消费者在监听,我们可以安全地丢弃该消息。下面是 receive_logs.go
的代码:
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil,
)
failOnError(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
如果要将日志保存到文件,只需打开控制台并键入:
go run receive_logs.go > logs_from_rabbit.log
如果您希望在屏幕上查看日志,请生成一个新终端并运行:
go run receive_logs.go
当然,也要运行日志的生产者脚本:
go run emit_log.go
使用 rabbitmqctl list_bindings
命令您可以验证代码是否确实如此,根据需要创建绑定和队列。运行两个 receive_logs.go
程序后,使用该命令,您应该看到类似以下内容:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
结果的解释很简单:数据来自交换机日志将转到具有服务器分配名称的两个队列。