【RabbitMQ系列】Exchange模式

之前已经介绍过Exchange的作用,这里先回顾一下,方便开展今天的内容。

Exchange:消息交换机。他是和Producer直接进行打交道的,有点类似于路由器的功能,主要就是进行转发操作。那么Producer到底用哪个Exchange进行路由呢?这个取决于Routing Key,每个消息都有这个键,我们也可以自己设定,其实就是一字符串。

消息到达Exchange后,Exchange按照特定的策略转发到Queue进行存储。消息可能进入一个Queue也可能进入对应多个Queue,至于进入哪个Queue或者哪个都不进入,这要依据Exchange的ExchangeType和Exchange所绑定的路由规则,实现AMQP0.9.1协议的RabbitMQ Broker提供了四种ExChangeType。分别为:Direct exchange,Fanout exchange,Topic exchange和Headers exchange。

一、Direct exchange

1、原理图。

2、说明。

所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。

Direct模式,可以使用RabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

3、核心示例代码。

生产者:

channel.ExchangeDeclare(EXCHANGE_NAME, "direct");

channel.BasicPublish(EXCHANGE_NAME, "routingKey", null, message.getBytes());

消费者:

channel.ExchangeDeclare(EXCHANGE_NAME, "direct");

String queueName = channel.QueueDeclare().QueueName;

channel.QueueBind(queueName,EXCHANGE_NAME, "routingKey");

注:EXCHANGE_NAME为常量字符串,可为空。

二、Fanout exchange

1、原理图。

2、说明。

所有发送到Fanout Exchange的消息,将忽略Routing Key的存在,直接被广播到与该Exchange 绑定(Binding)的所有Queue上。

3、核心示例代码。

生产者:

channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");

channel.BasicPublish(EXCHANGE_NAME, "", null, message.GetBytes());

消费者:

channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");

String queueName = channel.QueueDeclare().QueueName;

channel.QueueBind(queueName,EXCHANGE_NAME, "");

注:EXCHANGE_NAME不可为空。BasicPublish第二个参数为空,表示

没有RoutingKey 只要把队列绑定到这个Exchange上就能接收到消息。

三、Topic exchange

1、原理图。

2、说明。

Topic Exchange 使用非常灵活,也是最常用的Exchange模式。经常用来实现各种publish/subscribe,即发布订阅。

3、核心示例代码。

生产者:

channel.ExchangeDeclare(EXCHANGE_NAME, "topic");

String[] keys = { "USA.weather.1", "China.weather.1", "USA.people.1", "China.people.1" };

foreach (String key in keys)

{

String msg = key + ":消息";

channel.BasicPublish(EXCHANGE_NAME, key, null, msg.GetBytes());

}

消费者:

channel.ExchangeDeclare(EXCHANGE_NAME, "topic");

String queue = channel.QueueDeclare().QueueName;

String key0 = "*.weather.*";

//String key1 = "#.1";

channel.QueueBind(queue,EXCHANGE_NAME, key);

注:key0可以接收RoutingKey为"USA.weather.1"、"China.weather.1"的消息,不会接收RoutingKey为"USA.people.1", "China.people.1"的消息。

key1可以接收所有RoutingKey数组中的消息。

四、Headers exchange

1、原理图。

2、说明。

所有发送到Headers Exchange的消息,将根据消息的一些头部信息来分发过滤。

3、核心示例代码。

生产者:

channel.ExchangeDeclare(ExchangeName,ExchangeType.Headers, false, true, null);

byte[] message = Encoding.UTF8.GetBytes("Hello, World!");

var properties = new BasicProperties();

properties.Headers = new Dictionary();

properties.Headers.Add("key1", "12345");

channel.BasicPublish(ExchangeName, "", properties, message);

消费者:

const string QueueName = "header-exchange-example";

const string ExchangeName = "header-exchange-example";

static void Main(string[] args)

{

var connectionFactory = new ConnectionFactory();

connectionFactory.HostName = "localhost";

IConnection connection = connectionFactory.CreateConnection();

IModel channel = connection.CreateModel();

channel.ExchangeDeclare(ExchangeName,ExchangeType.Headers, false, true, null);

channel.QueueDeclare(QueueName, false, false, true, null);

System.Collections.Generic.IDictionary specs = new Dictionary();

specs.Add("x-match", "any");

specs.Add("key1", "12345");

specs.Add("key2", "123455");

channel.QueueBind(QueueName,ExchangeName, string.Empty, specs);

// 注意,这个StartConsume是扩展函数,在一个循环内反复调用CallBack函数。

channel.StartConsume(QueueName, MessageHandler);

connection.Close();

}

public static void MessageHandler(IModel channel, DefaultBasicConsumer consumer, BasicDeliverEventArgs eventArgs)

{

string message = Encoding.UTF8.GetString(eventArgs.Body);

Console.WriteLine("Message received: " + message);

foreach (string headerKey in eventArgs.BasicProperties.Headers.Keys)

{

Console.WriteLine(headerKey + ": " + eventArgs.BasicProperties.Headers[headerKey]);

}

if (message == "quit")

channel.BasicCancel(consumer.ConsumerTag);

}

扩展函数:

public static class ChannelExtensions

{

public static void StartConsume(this IModel channel, string queueName, Action callback)

{

var consumer = new QueueingBasicConsumer(channel);

channel.BasicConsume(queueName, true, consumer);

while (true)

{

try

{

var eventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

new Thread(() => callback(channel, consumer, eventArgs)).Start();

}

catch (EndOfStreamException)

{

// The consumer was cancelled, the model closed, or the connection went away.

break;

}

}

}

}

注:Headers exchange在实际使用中较少。

参考链接:

http://blog.csdn.net/dandanzmc/article/details/52262850

https://lostechies.com/derekgreer/2012/05/29/rabbitmq-for-windows-headers-exchanges/

Exchange的四种模式就简单介绍到这里。

关于RabbitMQ的入门知识介绍,就到此为止,至于更高深的RPC远程调用以及集群的内容,以后工作中遇到时再深入研究介绍。

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180316G1VJBR00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券