前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ学习之Topics(5)

RabbitMQ学习之Topics(5)

作者头像
Vincent-yuan
发布2019-09-10 18:20:33
3380
发布2019-09-10 18:20:33
举报
文章被收录于专栏:Vincent-yuanVincent-yuan

上一节,我们讲了direct exchange,这节我们讲下topic exchange

Topic exchange

发送到topic exchange的messages不可以有一个随意的routing_key, 它必须是使用.分隔的一些词的集合。例如: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit" .

这些binding key 必须有同样的样式(in the same form). topic exchange背后的逻辑与direct exchange相似:带有特定的routing key的message会发送到所有有匹配的binding key 的所有queues。

然而,对于binding keys有两个重要的特例:

  • *(star) 能替代一个词
  • #(hash) 能替代0个或者多个词

下面是一个简单的例子

这个例子中,我们将会发送描述动物的message. 这些message将会带有一个routing key . 这个routing key 由三个词组成. 第一个词描述速度(speed),第二个词描述肤色(colour),第三个词描述物种。 <speed>.<colour>.<species>

我们创建了3条bindings. Q1 绑定”*.orange.*”并且Q2绑定“*.*.rabbit”和“lazy.#”

这些bindings总结下就是:

  • Q1对所有的orange的动物感兴趣
  • Q2想要了解所有的rabbit,和所有的lazy的动物
Topic exchange

Topic exchange是很强大的,并且可以表现的像其他exchange.

当一个queue绑定带“#”的binding key时,它可以接受所有的messages而不用管routing key.就像fanout exchange.

当特殊字符* 和#没有在bindings中使用时,topic exchange表现的像direct exchange.

代码

我们将使用topic exchange.对于routing key ,我们将使用这两个词:<facility>.<severity>.

EmitLogTopic.cs

using System;using System.Linq;using RabbitMQ.Client;using System.Text;
class EmitLogTopic
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "topic_logs",  //topic类型的exchange
                                    type: "topic");

            var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
            var message = (args.Length > 1)
                          ? string.Join(" ", args.Skip( 1 ).ToArray())
                          : "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "topic_logs",  //发送特定routing key的message
                                 routingKey: routingKey,
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
        }
    }
}

使用示例:

cd EmitLogTopic
dotnet run "kern.critical" "A critical kernel error"

ReceiveLogsTopic.cs

using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;
class ReceiveLogsTopic
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); //声明topic exchange
            var queueName = channel.QueueDeclare().QueueName; //声明随机生成的queue name

            if(args.Length < 1)
            {
                Console.Error.WriteLine("Usage: {0} [binding_key...]",
                                        Environment.GetCommandLineArgs()[0]);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
                Environment.ExitCode = 1;
                return;
            }

            foreach(var bindingKey in args)
            {
                channel.QueueBind(queue: queueName,  //绑定bindingkey
                                  exchange: "topic_logs",
                                  routingKey: bindingKey);
            }

            Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                var routingKey = ea.RoutingKey; //message的routingkey
                Console.WriteLine(" [x] Received '{0}':'{1}'",
                                  routingKey,
                                  message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

使用示例:

接收所有的logs

cd ReceiveLogsTopic
dotnet run "#"

接收所有facility:kern的logs

cd ReceiveLogsTopic
dotnet run "kern.*"

仅仅critical的logs

cd ReceiveLogsTopic
dotnet run "*.critical"

创建多个bindings

cd ReceiveLogsTopic
dotnet run "kern.*" "*.critical"

参考网址:

https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-05-29 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Topic exchange
  • 代码
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档