前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ中文系列教程四

RabbitMQ中文系列教程四

作者头像
Mandy的名字被占用了
发布2023-02-28 09:59:38
5100
发布2023-02-28 09:59:38
举报
文章被收录于专栏:菜鸟成长学习笔记

在前面的教程中,我们创建了一个工作队列。工作队列背后的假设是每个任务都是只交付给一个消费者。在这一部分中,我们将做一些完全不同的事情。我们将向多个消费者传递消息。此模式被称为“发布/订阅”。

为了说明这种模式,我们将构建一个简单的日志记录 系统。它将由两个程序组成,第一个将发出日志消息,第二个将接收并打印这些消息。

在我们的日志记录系统中,接收器程序的每个运行副本都将获取消息。这样我们就能够运行一个接收器和将日志定向到磁盘;同时我们将能够运行另一个接收器,并在屏幕上查看日志。

本质上,已发布的日志消息将消息通过广播的形式传递给所有接收器。

在本教程的前几部分中,我们只是简单的向队列中发送或者获取消息。现在是时候引入完整的消息传递模型了。

让我们快速回顾一下前面教程中介绍的内容:

  • 生产者是发送消息的用户应用程序。
  • 队列是存储消息的缓冲区。
  • 消费者是接收消息的用户应用程序。

RabbitMQ 中消息传递模型的核心思想是,生产者从不将任何消息直接发送到队列。实际上,很多时候,生产者甚至不知道消息是否会传递给人。

相反,生产者只能交换机所发送消息。交换机是一件非常简单的事情。一方面,它接收来自生产者和另一边,它把消息推到队列中。交换机必须确切地知道如何处理它收到的消息。应该是附加到特定队列?是否应该将其附加到许多队列中?或者应该丢弃它。其规则由交换类型定义。

有几种可用的交换类型:directtopicheadersfanout。这一章节,我们将专注于最后一个类型 - fanout。让我们创建这种类型的交换机,并调用它日志:

代码语言:javascript
复制
err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)

fanout 这种交换机的类型非常简单。正如你从名称中知道的,它只是将收到的所有消息广播到所有消费者排队。这正是我们对记录器所需要的。

交换机

要列出服务器上的交换机,可以使用 rabbitmqctl 命令:

代码语言:javascript
复制
sudo rabbitmqctl list_exchanges

在此列表中,将有一些 amq.* 交换机和默认(未命名) 的交换机。这些是默认创建的,但您不太可能需要立即使用它们。在本教程的前几部分中,我们对交换一无所知, 但仍然能够将消息发送到队列。因为我们使用的是默认交换,它由空字符串 (“”) 标识。

回想一下我们之前是如何发布的消息的:

代码语言:javascript
复制
err = ch.PublishWithContext(ctx,
  "",     // exchange
  q.Name, // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(body),
})

这里我们使用默认或无名称的交换机:消息是路由到具有 routing_key 参数指定的名称的队列(如果存在)。

现在,我们将之前的代码改为,发布到我们命名的交换机:

代码语言:javascript
复制
err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)
failOnError(err, "Failed to declare an exchange")
代码语言:javascript
复制
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,
  "logs", // exchange
  "",     // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
          ContentType: "text/plain",
          Body:        []byte(body),
  })

临时队列

您可能还记得以前我们使用的队列具有具体名称(还记得 hellotask_queue 吗?能够命名队列,对我们来说至关重要 - 我们需要将消费者指向相同的队列。在以下情况下,为队列命名很重要,希望在生产者和消费者之间共享队列。

但对于我们的记录器来说,情况并非如此。我们想听听所有日志消息,而不仅仅是其中的子集。我们也只对当前流动的消息感兴趣,而不是旧的。为了解决这个问题,我们需要两件事。

首先,每当我们连接到Rabbit时,我们都需要一个新的空队列。为此,我们可以创建一个具有随机名称的队列,或者让服务器为我们选择一个随机队列名称。

其次,一旦我们断开了消费者的连接,队列应该是自动删除。在 amqp 客户端中,当我们提供队列名称时,作为一个空字符串,我们创建一个具有生成名称的非持久队列:

代码语言:javascript
复制
q, err := ch.QueueDeclare(
  "",    // name
  false, // durable
  false, // delete when unused
  true,  // exclusive
  false, // no-wait
  nil,   // arguments
)

当该方法返回时,队列实例包含一个随机队列名称,该队列名称由 RabbitMQ 自动生成。例如,它可能看起来像 amq.gen-JzTY20BRgKO-HjmUJj0wLg 的形式。当声明它的连接关闭时,队列将被删除,因为它被声明为排他性。您可以了解有关独占标志和其他队列的更多信息。

绑定

我们已经创建了一个 fanout 类型的交换机和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。这种关系,交换和队列之间称为绑定。

代码语言:javascript
复制
err = ch.QueueBind(
  q.Name, // queue name
  "",     // routing key
  "logs", // exchange
  false,
  nil,
)

从现在开始,日志交换机会将消息附加到我们的队列中。

列出绑定

你可以通过使用下面的命令,来罗列出所有的绑定关系。

代码语言:javascript
复制
rabbitmqctl list_bindings

完整代码

发出日志消息的生产者程序看起来不多。最重要的变化是,我们现在希望将消息发布到我们的日志交换机。我们需要在发送时提供一个路由密钥,但它对于 fanout 类型的交换机,将忽略值。下面是 emit_log.go 脚本的代码:

代码语言:javascript
复制
package main

import (
        "context"
        "log"
        "os"
        "strings"
        "time"

        amqp "github.com/rabbitmq/amqp091-go"
)
代码语言:javascript
复制
func failOnError(err error, msg string) {
        if err != nil {
                log.Panicf("%s: %s", msg, err)
        }
}
代码语言:javascript
复制
func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs",   // name
                "fanout", // type
                true,     // durable
                false,    // auto-deleted
                false,    // internal
                false,    // no-wait
                nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()

        body := bodyFrom(os.Args)
        err = ch.PublishWithContext(ctx,
                "logs", // exchange
                "",     // routing key
                false,  // mandatory
                false,  // immediate
                amqp.Publishing{
                        ContentType: "text/plain",
                        Body:        []byte(body),
                })
        failOnError(err, "Failed to publish a message")

        log.Printf(" [x] Sent %s", body)
}
代码语言:javascript
复制

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

如您所见,建立连接后,我们声明了交换机。此步骤是必需的,如果交换机不存在,则会被禁止发送消息。如果还没有队列绑定到交换机,消息将丢失。但这对我们来说没关系(当消息不重要时),如果没有消费者在监听,我们可以安全地丢弃该消息。下面是 receive_logs.go 的代码:

代码语言:javascript
复制
package main

import (
        "log"

        amqp "github.com/rabbitmq/amqp091-go"
)
代码语言:javascript
复制
func failOnError(err error, msg string) {
        if err != nil {
                log.Panicf("%s: %s", msg, err)
        }
}
代码语言:javascript
复制
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
            "logs",   // name
            "fanout", // type
            true,     // durable
            false,    // auto-deleted
            false,    // internal
            false,    // no-wait
            nil,      // arguments
        )
    failOnError(err, "Failed to declare an exchange")
代码语言:javascript
复制
q, err := ch.QueueDeclare(
    "",    // name
    false, // durable
    false, // delete when unused
    true,  // exclusive
    false, // no-wait
    nil,   // arguments
)
failOnError(err, "Failed to declare a queue")

err = ch.QueueBind(
        q.Name, // queue name
        "",     // routing key
        "logs", // exchange
        false,
        nil,
    )
failOnError(err, "Failed to bind a queue")
代码语言:javascript
复制
msgs, err := ch.Consume(
      q.Name, // queue
      "",     // consumer
      true,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
)
failOnError(err, "Failed to register a consumer")

var forever chan struct{}

go func() {
    for d := range msgs {
        log.Printf(" [x] %s", d.Body)
    }
}()

log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}

如果要将日志保存到文件,只需打开控制台并键入:

代码语言:javascript
复制
go run receive_logs.go > logs_from_rabbit.log

如果您希望在屏幕上查看日志,请生成一个新终端并运行:

代码语言:javascript
复制
go run receive_logs.go

当然,也要运行日志的生产者脚本:

代码语言:javascript
复制
go run emit_log.go

使用 rabbitmqctl list_bindings 命令您可以验证代码是否确实如此,根据需要创建绑定和队列。运行两个 receive_logs.go 程序后,使用该命令,您应该看到类似以下内容:

代码语言:javascript
复制
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

结果的解释很简单:数据来自交换机日志将转到具有服务器分配名称的两个队列。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-01-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 菜鸟成长学习笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 交换机
  • 临时队列
  • 绑定
    • 列出绑定
    • 完整代码
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档