上一节,我们讲了direct exchange,这节我们讲下topic exchange
发送到topic exchange的messages不可以有一个随意的routing_key, 它必须是使用.分隔的一些词的集合。例如: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit" .
这些binding key 必须有同样的样式(in the same form). topic exchange背后的逻辑与direct exchange相似:带有特定的routing key的message会发送到所有有匹配的binding key 的所有queues。
然而,对于binding keys有两个重要的特例:
下面是一个简单的例子
这个例子中,我们将会发送描述动物的message. 这些message将会带有一个routing key . 这个routing key 由三个词组成. 第一个词描述速度(speed),第二个词描述肤色(colour),第三个词描述物种。 <speed>.<colour>.<species>
我们创建了3条bindings. Q1 绑定”*.orange.*”并且Q2绑定“*.*.rabbit”和“lazy.#”
这些bindings总结下就是:
Topic exchange是很强大的,并且可以表现的像其他exchange.
当一个queue绑定带“#”的binding key时,它可以接受所有的messages而不用管routing key.就像fanout exchange.
当特殊字符* 和#没有在bindings中使用时,topic exchange表现的像direct exchange.
我们将使用topic exchange.对于routing key ,我们将使用这两个词:<facility>.<severity>.
EmitLogTopic.cs
using System;using System.Linq;using RabbitMQ.Client;using System.Text;
class EmitLogTopic
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "topic_logs", //topic类型的exchange
type: "topic");
var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
var message = (args.Length > 1)
? string.Join(" ", args.Skip( 1 ).ToArray())
: "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "topic_logs", //发送特定routing key的message
routingKey: routingKey,
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
}
}
}
使用示例:
cd EmitLogTopic
dotnet run "kern.critical" "A critical kernel error"
ReceiveLogsTopic.cs
using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;
class ReceiveLogsTopic
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); //声明topic exchange
var queueName = channel.QueueDeclare().QueueName; //声明随机生成的queue name
if(args.Length < 1)
{
Console.Error.WriteLine("Usage: {0} [binding_key...]",
Environment.GetCommandLineArgs()[0]);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
foreach(var bindingKey in args)
{
channel.QueueBind(queue: queueName, //绑定bindingkey
exchange: "topic_logs",
routingKey: bindingKey);
}
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey; //message的routingkey
Console.WriteLine(" [x] Received '{0}':'{1}'",
routingKey,
message);
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
使用示例:
接收所有的logs
cd ReceiveLogsTopic
dotnet run "#"
接收所有facility:kern的logs
cd ReceiveLogsTopic
dotnet run "kern.*"
仅仅critical的logs
cd ReceiveLogsTopic
dotnet run "*.critical"
创建多个bindings
cd ReceiveLogsTopic
dotnet run "kern.*" "*.critical"
参考网址:
https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html