rabbitmq消息队列——"topic型交换器"

在之前的章节中我们改进了我们的日志系统,我们使用direct型交换器代替了只能盲目广播消息的fanout型交换器,这使得我们可以有选择性地接收日志。

尽管使用direct型交换器改进了我们的日志系统,但它仍然有缺陷——它不能基于多个规则或标准进行路由。

在我们的系统中,我呢也许希望订阅的不仅仅是严重级别的日志,而且基于日志发送方。你可能了解过systool这个unix工具,该工具不仅能路由严重级别的日志(info、warn、crit等),并且具有各种能力(授权、定时等)。

这将会给我们很多灵活性——我们可能希望不但接收那些来自定时器的错误日志而且接收来自核心模块的。

为在我们的日志系统实现这个,我们还需要再学习一个更加复杂的交换器类型——Topic型交换器。

发送到Topic型交换器的消息不能包含任意路由键——它必须是一串字符并且以圆点符号隔开。这些字符可以是任意的,但它们通常都会指定成链接的消息的某些功能。一些有效的路由键如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"等,路由键可以包括

任意多个字符,上限是255个字节长度。

绑定键也必须使用类似的形式。topic型交换器的逻辑和direct型很相像——消息发送时会指定一个特别的路由键,并且会被路由到所有与绑定键相匹配的队列。不过对绑定键有两种特殊类型:

  • *符号用来代替任意单词
  • #符号可以代替0个或多个单词

用一个例子简单地解释下:

这个例子中,我们准备发送一些描述动物的消息,这些消息被发送的时候路由键都是包含三个单词并且以圆点符号分开(总共两个圆点符),路由键的订单第一个单词用来描述速度,第二个是颜色,第三个是物种:

"<speed>.<colour>.<species>"。

我们创建3种绑定:Q1和绑定键"*.orange.*"绑定,Q2和"*.*.rabbit" 、"lazy.#"分别绑定。这些绑定描述如下:

  • Q1只对所有橙色的动物感兴趣
  • Q2希望收到所有兔子的消息及一切懒惰的动物的

使用路由键"quick.orange.rabbit"的消息将发送至Q1和Q2,"lazy.orange.elephant"的也是一样。另一方面,使用路由键"quick.orange.fox"的消息将仅被路由Q1,"lazy.brown.fox"仅被路由到Q2。"lazy.pink.rabbit"的消息仅仅会被路由到Q2一次,尽管它匹配了

两种绑定关系,"quick.brown.fox""quick.brown.fox"没有绑定任何关系所有该消息将会被丢弃。

可是如果我们背弃”约定”直接发送一个单词或者四个单词会怎么样?比如:"orange" or "quick.orange.male.rabbit",当然了,第一个不会匹配任何绑定,直接被丢弃。

另一方面四个单词的会匹配第二种,所以它会被路由到第二个队列中。

备注:

Topic型交换器比较强大跟其它交换器很相似。

当一个队列以”#”作为绑定键时,它将接收所有消息,而不管路由键如何,类似于fanout型交换器。

当特殊字符”*”、”#”没有用到绑定时,topic型交换器就好比direct型交换器了。

揉在一起

我们将在我们的日志系统中使用topic型交换器。我们假设日志的路由键仅由两部分单词组成:"<facility>.<severity>"。

emit_log_topic.go源代码如下:

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
	"os"
	"strings"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s:%s", msg, err)
		panic(fmt.Sprintf("%s:%s", msg, err))
	}
}

func bodyFrom(args []string) string {

	if len(args) < 3 || os.Args[2] == "" {
		return "hello"
	} else {
		return strings.Join(os.Args[2:], " ")
	}
}

func severityFrom(args []string) string {
	if len(os.Args) < 2 || os.Args[1] == "" {
		return "anonymous.info"
	} else {
		return os.Args[1]
	}
}

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	err = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil)
	failOnError(err, "Failed to declare an exchange")

	body := bodyFrom(os.Args) //请求参数
	ch.Publish("logs_topic", severityFrom(os.Args), false, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body),
	})
	failOnError(err, "Failed to publish a message")

	log.Printf(" [x] Sent %s", body)
}

receive_logs_topic.go端代码如下:

package main

import (
	"fmt"
	"log"
	"os"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
		panic(fmt.Sprintf("%s: %s", msg, err))
	}
}

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	err = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil)
	failOnError(err, "Failed to declare an exchange")

	q, err := ch.QueueDeclare("", false, false, true, false, nil)
	failOnError(err, "Failed to declare a queue")
	if len(os.Args) < 2 {
		log.Printf("Usage: %s [binding_key]...", os.Args[0])
		os.Exit(0)
	}

	//为每条消息设置单独绑定
	for _, s := range os.Args[1:] {
		log.Printf("Binding queue %s to exchange %s with routing key %s",
			q.Name, "logs_topic", s)
		err = ch.QueueBind(q.Name, s, "logs_topic", false, nil)
		failOnError(err, "Failed to bind a queue")
	}
	msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)

	go func() {
		for m := range msgs {
			log.Printf(" [x] %s", m.Body)
		}
	}()

	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}

接收所有log:

go run receive_logs_topic.go "#"

接收facility为kern的消息:

go run receive_logs_topic.go "kern.*"

仅接收所有critical的消息:

go run receive_logs_topic.go "*.critical"

同时创建多个绑定:

go run receive_logs_topic.go "kern.*" "*.critical"

发送路由键为kern.critical的消息:

go run emit_log_topic.go "kern.critical" "A critical kernel error"

具体效果如下:

接收所有:

接收kern:

接收critical:

同时接收kern、critical:

无效绑定(这里接收不到消息):

发送端:

至此,topic型交换器就over了,topic型交换器相比direct的单对单,fanout型的广播式,topic型灵活性更大,稍作修改绑定键及路由键即可实现以上两种交换器类型,简直了!

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Golang语言社区

Golang语言 之网络

Go语言标准库里提供的net包,支持基于IP层、TCP/UDP层及更高层面(如HTTP、FTP、SMTP)的网络操作,其中用于IP层的称为Raw Socket。...

3429
来自专栏奔跑的蛙牛技术博客

java 读写二进制数据与java序列化

zip文档以压缩格式存储一个和多个文件,每个ZIP文件都有一个头,包含每个文件的名字和压缩方法等信息

582
来自专栏闻道于事

Hibternate框架笔记

Hibernate框架 配置 配置文件: 1 <?xml version="1.0" encoding="UTF-8"?> 2 <!DOCTYPE hibe...

2616
来自专栏Spring相关

nodejs的事件处理机制以及事件环机制

ES6标准发布后,module成为标准,标准的使用是以export指令导出接口,以import引入模块,但是在我们一贯的node模块中,我们采用的是Common...

371
来自专栏二进制文集

Google Protocol Buffers 序列化算法分析

分析一下 Google Protocol Buffers 的序列化原理。介绍参考 Google Protocol Buffers 数据交换协议

503
来自专栏MasiMaro 的技术博文

PE文件详解(三)

在执行一个PE文件的时候,windows 并不在一开始就将整个文件读入内存的,二十采用与内存映射文件类似的机制。 也就是说,windows 装载器在装载的时...

763
来自专栏转载gongluck的CSDN博客

UNPv13:#第1章#简介

概述 ? TCP本身并不提供记录结束标志:如果应用程序需要确定记录的边界,它就要自己去实现,已有一些常用的方法可供选择。从TCP套接字读取数据时,我们总...

2949
来自专栏PHP技术

良好的书写规范提高PHP代码执行效率

用单引号代替双引号来包含字符串,这样做会更快一些。因为 php 会在双引号包围的字符串中搜寻变量,单引号则不会,注意:只有 echo 能这么做,它是一种可以把多...

2545
来自专栏程序猿DD

Spring Batch:文件的批量读写Flatfile(XML,CSV,TXT)

继杨小强童鞋的《Spring Batch入门篇》之后,继续为大家分享第二篇关于Spring Batch的系列教程。 更多内容请持续关注:spring4all.c...

2577
来自专栏沈唁志

提高PHP编程效率的53个要点

1646

扫码关注云+社区