rabbitmq消息队列——"发布订阅"

三、”发布订阅”

上一节的练习中我们创建了一个工作队列。队列中的每条消息都会被发送至一个工作进程。这节,我们将做些完全不同的事情——我们将发送单个消息发送至多个消费者。这种模式就是广为人知的“发布订阅”模式。

为了说明这种模式,我们将构建一个简单的日志系统。包括2个应用程序,一个传送日志消息另一个接收并打印这些消息。

我们的日志系统中每一个运作的接收端程序都会收到这些消息。这种方式下,我们就可以运行一个接收端发送日志消息至硬盘,同时可以运行另一个接收端将日志打印到屏幕上。

理论上讲,已发布的日志消息将会被广播到所有的接收者。

交换器(Exchange)

之前的几节练习中我们发送接收消息都是在队列中进行,是时候介绍下RabbitMQ完整的消息传递模式了。

先来迅速的回顾下我们之前章节:

  • 一个生产者就是一个用来发送消息的应用程序
  • 一个 队列好比存储消息的缓存buffer
  • 一个消费者就是一个用户应用程序用来接收消息

RabbitMQ消息传递模型的核心思想是生产者从来不会直接发送消息至队列。事实上,生产者经常都不知道消息会被分发至哪个队列。

相反的是,生产者仅仅发送消息至交换器。交换器是非常简单的东西:一边从生产者那边接收消息一边发送这些消息至队列。交换器必须准确的知道这些被接收的消息该如何处理。它应该被添加到某个特定队列?或者添加到多个队列?甚至直接放弃。具体的传输规则就是通过交换器类型来定义的。

交换器类型有四种:direct、topic、headers、fanout。这节我们主要关注最后一种——fanout。让我们来创建一个fanout类型的交换器,命名为logs:

err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)

正如你从名字中猜测的一样,它仅仅广播所有消息到所有已知的接收队列。实际上这正是我们需要的日志系统。

备注:之前的几节练习中我们并不知道交换器,但我们依然能够将消息发送至队列中,之所以可以实现是因为我们使用了默认的交换器,使用空字符串表示。

回顾下之前我们发送消息是这样子的:

err = ch.Publish(
  "",     // exchange
  q.Name, // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(body),
  })

这里我们可以使用默认也可以自己命名交换器:如果路由键存在的话,消息会被路由到加上路由键参数的地址,注意fanout类型会直接忽略路由键的存在。

以下是修改后的代码:

err = ch.ExchangeDeclare(
  "logs",   // name    定义一个名为logs的交换器    
  "fanout", // type    交换器类型为fanout即广播类型
  true,     // durable    持久化
  false,    // auto-deleted        无队列绑定时是否自动删除
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)
failOnError(err, "Failed to declare an exchange")

body := bodyFrom(os.Args)
err = ch.Publish(
  "logs", // exchange    指定消息发送的交换器名称
  "",     // routing key    因为fanout类型会自动忽略路由键,所以这里的路由键参数任意,一般不填
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
          ContentType: "text/plain",
          Body:        []byte(body),
  })

临时队列

你可能记得之前我们声明队列的时候都会指定一个队列名称(记得hello和task_queue?)。队列的命名对我们来说至关重要——我们需要将工作进程指向同一个队列。当你需要在消费者和生产者之间共享队列的话声明队列就显得很重要。

但这对我们的日志系统来说无关重要。我们需要监听的是所有的日志消息,而不是他们中的某一类。我们只关注当前流中的消息而不关注旧的那些。解决这个我们需要做两件事。

首先,每当链接RabbitMQ的时候我们需要创建一个新的、空的队列。为做到这点,我们必须创建一个名称随机的队列,甚至更好的实现方式是——让服务端给我们自动生成一个随机的队列。

其次,一旦消费者链接断开,该队列便会自动删除。

在amqp客户端中,当我们给一个队列名称设定为空字符串时,我们就创建了一个非持久化的生成队列:

q, err := ch.QueueDeclare(
  "",    // name    满足第一点:服务端自动产生随机队列
  false, // durable
  false, // delete when usused
  true,  // exclusive   满足第二点:连接断开立即删除
  false, // no-wait
  nil,   // arguments
)

当该方法返回的时候,声明好的队列便包含一个由RabbitMQ生成的随机队列名称。举例来说,队列名称形如:amq.gen-JzTY20BRgKO-HjmUJj0wLg这种的。

当消费者的链接宣布关闭后,队列便像exclusive参数设置的那样,自动删除。

绑定

我们已经创建了一个fanout类型的交换器和一个队列,现在我们需要告诉交换器将消息发送至我们的队列。这种交换器和队列中的关联关系就叫做绑定。

err = ch.QueueBind(
  q.Name, // queue name    绑定的队列名称
  "",     // routing key    绑定的路由键
  "logs", // exchange    绑定的交换器名称
  false,
  nil
)

从现在起,logs交换器便能发送消息至我们的队列。

糅合在一起

生产者的程序,也就是发送消息端,跟之前几节的发送代码差不多。最重要的是我们现在要发送消息到logs交换器而非默认的交换器。发送的时候我们可以设置一个路由键,但是对于fanout类型的交换器来说它将被忽略。下面就是发送日志方的代码:

// rabbitmq_3_emit_log.go project main.go
package main

import (
    "fmt"
    "log"
    "os"
    "strings"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s:%s", err, msg)
        panic(fmt.Sprintf("%s:%s", err, msg))
    }
}

func bodyForm(args []string) string {
    var s string
    if len(args) < 2 || os.Args[1] == "" {
        s = "Hello World! This is a test!"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "failed to dial rabbitmq server")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "failed to declare the channel")
    defer ch.Close()

    //声明一个交换器,交换器名称logs,类型fanout
    err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
    failOnError(err, "failed to declare the exchange")

    body := bodyForm(os.Args)
    //发送消息到交换器
    err = ch.Publish("logs", "", false, false, amqp.Publishing{
        Body:        []byte(body),
        ContentType: "text/plain",
    })
    failOnError(err, "failed to publish the message")
}

备注:这里发送方并不需要声明队列之类的,不像之前的代码需要声明,这里的发送方唯一关联的是交换器,所以只需声明交换器并发送消息至交换器即可。

正如你想的那样,链接建立后我们声明交换器,这一步是必须的因为发送消息到一个不存在的交换器是完全禁止的。

如果该交换器上面没有队列绑定的话那么发送至该交换器的消息将全部丢失,但这对我们来时ok;如果没有消费者我们会安全地丢弃这些消息。

下面是日志接收方的代码:

// rabbitmq_3_receive_logs.go project main.go
package main

import (
    "fmt"
    "log"
    "os"
    "strings"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s:%s", err, msg)
        panic(fmt.Sprintf("%s:%s", err, msg))
    }
}

func bodyForm(args []string) string {
    var s string
    if len(args) < 2 || os.Args[1] == "" {
        s = "Hello World! This is a test!"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "failed to dial rabbitmq server")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "failed to declare the channel")
    defer ch.Close()

    //声明一个交换器,交换器名称logs,类型fanout
    err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
    failOnError(err, "failed to declare the exchange")

    //声明一个队列
    q, err := ch.QueueDeclare("", false, false, true, false, nil)
    failOnError(err, "failed to declare the queue")

    //设置绑定(第二个参数为路由键,这里为空)
    err = ch.QueueBind(q.Name, "", "logs", false, nil)
    failOnError(err, "failed to bind the queue")

    //注册一个消费者
    msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
    failOnError(err, "Failed to register a consumer")

    forever := make(<-chan bool)

    go func() {
        for d := range msgs {
            log.Printf(" [x] %s", d.Body)
        }
    }()
    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    <-forever
}

如果你想将日志保存到文件,执行如下命令:

go run receive_logs.go > logs_from_rabbit.log

如果你仅仅想在屏幕上查看日志,开启一个新的控制台执行如下命令:

go run receive_logs.go

当然了,你最后还要发出日志才行:

go run emit_log.go

使用rabbitmqctl list_bindings命令可以直接查看所有的绑定,如运行2个receive_logs.go程序你就会看到如下输出:

rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

实际效果:

分别开启两个控制台,均监听相同队列,同时收到消息并打印了,说明两个随机的队列均收到了logs交换器发来的消息,发送方略。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏木头编程 - moTzxx

ThinkPHP5.1 配置Nginx/Apache下的 URL重写

1242
来自专栏生信技能树

blast2go本地化-2017教程

通常我们上游分析得到的蛋白序列需要和主流的数据库进行比对,完成功能注释。常用数据库一共有以几种:

4722
来自专栏信安之路

审计 tinyshop 中风险

审计该 CMS 中的内容只涉及到前台,后台中有存安全问题但对我来说没什么意义,所以没有过多的关注,感兴趣的朋友可以自己动动手。

530
来自专栏知识分享

ESP8266使用详解--基于Lua脚本语言ESP8266刷AT固件与nodemcu固件轻松使用8266

这些天,,,,今天终于看到了希望,,,天道酬勤 先说实现的功能...让ESP8266连接无线网,然后让它建立服务器,,我的客户端连接上以后,发给客户端发数据模块...

2.7K4
来自专栏杂烩

kafka-eagle1.1.9安装 原

源码地址https://github.com/smartloli/kafka-eagle

781
来自专栏pangguoming

centos7安装redis

方法一:使用命令安装(前提是已经安装了EPEL)。 安装redis: yum -y install redis 启动/停止/重启 Redis  启动...

4028
来自专栏FreeBuf

初探伪装在Office宏里的反弹Shell

通常的钓鱼邮件场景中office的安全问题一直都受到关注,恶意宏文档制作简单,兼容性强,并且攻击成本较小,所以整体占比较大。但是使用恶意宏进行攻击,往往需要用户...

752
来自专栏程序生活

Laravel-博客实战+踩坑laravel-blog最终的效果踩的坑

最近在学习Laravel,参考的课程是后盾网地Laravel5.2博客项目实战 下面整个项目的开发过程: laravel-blog 基于laravel5....

6885
来自专栏漫漫前端路

前端安全知识

xss: 跨站脚本攻击(Cross Site Scripting)是最常见和基本的攻击 WEB 网站方法,攻击者通过注入非法的 html 标签或者 javasc...

892
来自专栏C/C++基础

offload error: cannot find offload entry解决办法

linux环境下,使用MIC架构的Xeon Phi(至强融核)协处理器进行进行host+mic编程时,源程序运行的毫无问题,但将其通过ar命令生成静态连接库供其...

902

扫码关注云+社区