之前已经介绍过Exchange的作用,这里先回顾一下,方便开展今天的内容。
Exchange:消息交换机。他是和Producer直接进行打交道的,有点类似于路由器的功能,主要就是进行转发操作。那么Producer到底用哪个Exchange进行路由呢?这个取决于Routing Key,每个消息都有这个键,我们也可以自己设定,其实就是一字符串。
消息到达Exchange后,Exchange按照特定的策略转发到Queue进行存储。消息可能进入一个Queue也可能进入对应多个Queue,至于进入哪个Queue或者哪个都不进入,这要依据Exchange的ExchangeType和Exchange所绑定的路由规则,实现AMQP0.9.1协议的RabbitMQ Broker提供了四种ExChangeType。分别为:Direct exchange,Fanout exchange,Topic exchange和Headers exchange。
一、Direct exchange
1、原理图。
2、说明。
所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。
Direct模式,可以使用RabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。
3、核心示例代码。
生产者:
channel.ExchangeDeclare(EXCHANGE_NAME, "direct");
channel.BasicPublish(EXCHANGE_NAME, "routingKey", null, message.getBytes());
消费者:
channel.ExchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queueName,EXCHANGE_NAME, "routingKey");
注:EXCHANGE_NAME为常量字符串,可为空。
二、Fanout exchange
1、原理图。
2、说明。
所有发送到Fanout Exchange的消息,将忽略Routing Key的存在,直接被广播到与该Exchange 绑定(Binding)的所有Queue上。
3、核心示例代码。
生产者:
channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");
channel.BasicPublish(EXCHANGE_NAME, "", null, message.GetBytes());
消费者:
channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queueName,EXCHANGE_NAME, "");
注:EXCHANGE_NAME不可为空。BasicPublish第二个参数为空,表示
没有RoutingKey 只要把队列绑定到这个Exchange上就能接收到消息。
三、Topic exchange
1、原理图。
2、说明。
Topic Exchange 使用非常灵活,也是最常用的Exchange模式。经常用来实现各种publish/subscribe,即发布订阅。
3、核心示例代码。
生产者:
channel.ExchangeDeclare(EXCHANGE_NAME, "topic");
String[] keys = { "USA.weather.1", "China.weather.1", "USA.people.1", "China.people.1" };
foreach (String key in keys)
{
String msg = key + ":消息";
channel.BasicPublish(EXCHANGE_NAME, key, null, msg.GetBytes());
}
消费者:
channel.ExchangeDeclare(EXCHANGE_NAME, "topic");
String queue = channel.QueueDeclare().QueueName;
String key0 = "*.weather.*";
//String key1 = "#.1";
channel.QueueBind(queue,EXCHANGE_NAME, key);
注:key0可以接收RoutingKey为"USA.weather.1"、"China.weather.1"的消息,不会接收RoutingKey为"USA.people.1", "China.people.1"的消息。
key1可以接收所有RoutingKey数组中的消息。
四、Headers exchange
1、原理图。
2、说明。
所有发送到Headers Exchange的消息,将根据消息的一些头部信息来分发过滤。
3、核心示例代码。
生产者:
channel.ExchangeDeclare(ExchangeName,ExchangeType.Headers, false, true, null);
byte[] message = Encoding.UTF8.GetBytes("Hello, World!");
var properties = new BasicProperties();
properties.Headers = new Dictionary();
properties.Headers.Add("key1", "12345");
channel.BasicPublish(ExchangeName, "", properties, message);
消费者:
const string QueueName = "header-exchange-example";
const string ExchangeName = "header-exchange-example";
static void Main(string[] args)
{
var connectionFactory = new ConnectionFactory();
connectionFactory.HostName = "localhost";
IConnection connection = connectionFactory.CreateConnection();
IModel channel = connection.CreateModel();
channel.ExchangeDeclare(ExchangeName,ExchangeType.Headers, false, true, null);
channel.QueueDeclare(QueueName, false, false, true, null);
System.Collections.Generic.IDictionary specs = new Dictionary();
specs.Add("x-match", "any");
specs.Add("key1", "12345");
specs.Add("key2", "123455");
channel.QueueBind(QueueName,ExchangeName, string.Empty, specs);
// 注意,这个StartConsume是扩展函数,在一个循环内反复调用CallBack函数。
channel.StartConsume(QueueName, MessageHandler);
connection.Close();
}
public static void MessageHandler(IModel channel, DefaultBasicConsumer consumer, BasicDeliverEventArgs eventArgs)
{
string message = Encoding.UTF8.GetString(eventArgs.Body);
Console.WriteLine("Message received: " + message);
foreach (string headerKey in eventArgs.BasicProperties.Headers.Keys)
{
Console.WriteLine(headerKey + ": " + eventArgs.BasicProperties.Headers[headerKey]);
}
if (message == "quit")
channel.BasicCancel(consumer.ConsumerTag);
}
扩展函数:
public static class ChannelExtensions
{
public static void StartConsume(this IModel channel, string queueName, Action callback)
{
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, true, consumer);
while (true)
{
try
{
var eventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
new Thread(() => callback(channel, consumer, eventArgs)).Start();
}
catch (EndOfStreamException)
{
// The consumer was cancelled, the model closed, or the connection went away.
break;
}
}
}
}
注:Headers exchange在实际使用中较少。
参考链接:
http://blog.csdn.net/dandanzmc/article/details/52262850
https://lostechies.com/derekgreer/2012/05/29/rabbitmq-for-windows-headers-exchanges/
Exchange的四种模式就简单介绍到这里。
关于RabbitMQ的入门知识介绍,就到此为止,至于更高深的RPC远程调用以及集群的内容,以后工作中遇到时再深入研究介绍。
领取专属 10元无门槛券
私享最新 技术干货