前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大话RabbitMQ 基础入门

大话RabbitMQ 基础入门

作者头像
码农阿宇
发布2018-07-09 16:42:23
4760
发布2018-07-09 16:42:23
举报
文章被收录于专栏:码农阿宇码农阿宇

----------写在前面----------

近些年微服务越来越火,让我也忍不住想去一窥微服务究竟,讲到微服务,就离不开分布式,而分布式,也离不开消息队列,在消息队列中,RabbitMQ可以说是比较具有代表性的一款。

这里是一篇介绍消息队列以及各种消息队列产品对比的文章,讲得很好,有兴趣的可以看一看。

https://cloud.tencent.com/developer/article/1006035

在讲RabbitMQ之前,首先需要在电脑上安装和配置RabbitMQ,网络上已经有很多这类文章,如果懒得去搜索,可以看看这篇介绍如何安装配置RabbitMQ的文章。

https://blog.csdn.net/weixin_39735923/article/details/79288578

其中,在安装RabbitMQ的过程中,遇到了一个坑,在启用RabbltMQ的管理界面执行

rabbitmq-plugins enable rabbitmq_management

命令时,出现了以下这样的报错

可以在该指令前加上 .\ 即

.\rabbitmq-plugins enable rabbitmq_management

祝安装顺利 !!

 -------正文------

基本概念

下面是在.Net中使用RabbitMQ要明白的一些名词概念。

综上所诉,他们之间的关系可以用我下面的 丑图 表示。

在图中,没有吧Routing key画出。Producer每一次发送消息,除了发出消息本身,还会随着消息带上一个routingKey,而且每一次将Exchange和Queue绑定,大体需要三个参数,

string queueName, string exchangeName, string routingKey

其中也有一个routingKey,但此RoutingKey非彼Routingkey。

大白话

对这个过程,我们可以理解为国家给灾区发送救灾物资,国家给当地政府划拨物资的时候,会规定,谁才能拿到这批物资,如(房子倒了的.家里有人受伤了的.家庭经济困难的)。

而当地政府在分配这批物资之前,为了方便物资的分配,会给每个家庭贴上一个标签,如

家庭A 经济困难

家庭B 房子倒了.经济困难

家庭C 家庭富有.房子倒了

家庭D 房子倒了的.家里有人受伤了的.家庭经济困难的

所以,发送消息时候的routingKey就是国家规定的那批物质分配规则。

而Exchange和Queue绑定时的RoutingKey可以理解为当地政府给每个家庭贴上的一个标签。

Exchange(交换机)转发消息的规则也有很多种:direct, topic, headers(不常用) 和 fanout,我们称之为交换类型。

我们可以把Exchange理解为分配这批物质的政府,现在国家规定了宏观的分配方向(发送消息时的routingKey),每个家庭也有了家庭情况的标签(绑定Exchange时的routingKey),但是这个物资具体怎么分,还是当地政府说了算。

Direct 严格按照国家规定来,只有房子倒了的,家里有人受伤了的而且家庭经济困难的才能分到救灾物资。    家庭D能分到

Fanout 只要是灾区的居民都能分到, 不管家庭情况如何。 家庭A\B\C\D都能分到

Topic 主题匹配: 只要家庭情况在国家规定分配规则内的,都能分到物资,但是家庭C分不到,因为他家太有钱了,这个条件不在国家的分配规则里。家庭A\B\D能分到

所以,我们在声明一个Exchange(交换机)的同时,还要指定该交换机的类型,即(当地政府怎么来分救灾物资)

其实,用这个例子,我是想说,生产者和消费者之间,就像国家与难民之间一样,国家只知道,我要帮助难民,但是难民有谁,物资能不能分到难民手里,还得当地政府说了算,你就说我这个例子恰不恰当吧!哈哈?

 好了,懂了概念,我们再来结合具体例子看看。

Fanout

Producer.cs的代码

using System;
using System.Text;
using RabbitMQ.Client;

namespace _2_Publish
{
    class Program
    {
        static void Main(string[] args)
        {
            //创建连接工厂
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //创建连接
            using (var connection = factory.CreateConnection())
            {
                //创建会话
                using (var chancel = connection.CreateModel())
                {
                    //生命交换机
                    chancel.ExchangeDeclare(exchange: "FanoutDemo", type: ExchangeType.Fanout);

                    string readMsg = "helloWorld";
                    while (readMsg.ToLower() != "exit")
                    {
                        var body = Encoding.UTF8.GetBytes(readMsg);

                        //给交换机发送消息
                        chancel.BasicPublish(exchange: "FanoutDemo", routingKey: "", body: body);
                        Console.WriteLine($"成功发送消息{readMsg}");
                        Console.WriteLine("请输入要发送的内容!");
                        readMsg = Console.ReadLine();
                    }
                }
            }
        }
    }
}

Customer.cs代码

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace _2_Receiver
{
    class Program
    {
        static void Main(string[] args)
        {
            //创建连接工厂
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //创建连接
            using (var connection = factory.CreateConnection())
            {
                //创建会话
                using (var channel = connection.CreateModel())
                {
                    //声明一个Fanout类型的交换机
                    channel.ExchangeDeclare(exchange: "FanoutDemo", type: ExchangeType.Fanout);

                    //声明一个消息队列并获取它的名字
                    var queueName = channel.QueueDeclare().QueueName;

                    //把消息队列和交换机绑定
                    channel.QueueBind(exchange: "FanoutDemo", queue: queueName, routingKey: "");

                    //创建消费者
                    var consume = new EventingBasicConsumer(channel);

                    //把消费者和队列绑定
                    channel.BasicConsume(queue: queueName, autoAck: true,consumer: consume);

                    consume.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"收到消息{message}");
                    };

                    Console.ReadLine();
                }
            }
        }
    }
}

在上面的代码中,无论是在生产者的发送消息里

                         //给交换机发送消息

                        chancel.BasicPublish(exchange: "FanoutDemo", routingKey: "", body: body);

还是消费者所在的Queue的绑定里

                 //把消息队列和交换机绑定

                    channel.QueueBind(exchange: "FanoutDemo", queue: queueName, routingKey: "");

我们都没有制定routingKey,因为没个人都能获取消息,所以此处,声明routingKey就没有意义了。

我们看看运行效果。

运行了三个消费者,当生产者发出消息时,三个消费者都收到了相同的消息。可以理解为广播模式。(Customer单词拼写错了,图片修改不方便,就不改了,大家将就一下)

Direct

Direct时严格匹配的,只有队列绑定的RoutingKey与生产者发送消息时指定的RoutingKey完全相同,才能接收成功。

Producer.cs

using System;
using System.Text;
using RabbitMQ.Client;

namespace _2_Publish
{
    class Program
    {
        static void Main(string[] args)
        {
            //创建连接工厂
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //创建连接
            using (var connection = factory.CreateConnection())
            {
                //创建会话
                using (var chancel = connection.CreateModel())
                {
                    //生命交换机
                    chancel.ExchangeDeclare(exchange: "DirectDemo", type: ExchangeType.Direct);

                    string readMsg = "helloWorld";
                    while (readMsg.ToLower() != "exit")
                    {
                        var body = Encoding.UTF8.GetBytes(readMsg);

                        //给交换机发送消息
                        chancel.BasicPublish(exchange: "DirectDemo", routingKey: "Direct.Key", body: body);
                        Console.WriteLine($"成功发送消息{readMsg}");
                        Console.WriteLine("请输入要发送的内容!");
                        readMsg = Console.ReadLine();
                    }
                }
            }
        }
    }
}

我把Exchange的类型更改为Direct类型,并且发送消息的routingKey设置为Direct.Key。

然后我们来定义消费者

Customer.CS

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace _2_Receiver
{
    class Program
    {
        static void Main(string[] args)
        {
            //创建连接工厂
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //创建连接
            using (var connection = factory.CreateConnection())
            {
                //创建会话
                using (var channel = connection.CreateModel())
                {
                    //声明一个Fanout类型的交换机
                    channel.ExchangeDeclare(exchange: "DirectDemo", type: ExchangeType.Direct);

                    //声明一个消息队列并获取它的名字
                    var queueName = channel.QueueDeclare().QueueName;

                    Console.WriteLine("请输入RoutingKey!");
                    var routingKey = Console.ReadLine();
                    //把消息队列和交换机绑定
                    channel.QueueBind(exchange: "DirectDemo", queue: queueName, routingKey: routingKey);

                    //创建消费者
                    var consume = new EventingBasicConsumer(channel);

                    //把消费者和队列绑定
                    channel.BasicConsume(queue: queueName, autoAck: true,consumer: consume);

                    Console.WriteLine("开始监听消息");

                    consume.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"收到消息{message}");
                    };

                    Console.ReadLine();
                }
            }
        }
    }
}

消费者的RoutingKey再控制台输入,

运行效果如下:

可以看到,只有RoutingKey为Direct.Key的消费者才收到了生产者发出的消息。

Topic

RabbitMQ 中的 RouteKey 支持绑定键表达式写法,有两种主要的绑定键:

*(星号)可以代替一个单词.

# (井号) 可以代替0个或多个单词.

比如在下面这个图中(P为发送者,X为RabbitMQ中的Exchange,C为消费者,Q为队列)

在这个示例中,我们将发送一条关于动物描述的消息,也就是说 Name(routeKey) 字段中的内容包含 3 个单词。第一个单词是描述速度的(celerity),第二个单词是描述颜色的(colour),第三个是描述哪种动物的(species),它们组合起来类似:“..”。

然后在使用 CapSubscribe 绑定的时候,Q1绑定为 CapSubscribe["*.orange.*"], Q2 绑定为CapSubscribe["*.*.rabbit"] 和 [CapSubscribe["lazy.#]

那么,当发送一个名为 "quick.orange.rabbit" 消息的时候,这两个队列将会同时收到该消息。同样名为 lazy.orange.elephant的消息也会被同时收到。另外,名为 "quick.orange.fox" 的消息将仅会被发送到Q1队列,名为 "lazy.brown.fox" 的消息仅会被发送到Q2。"lazy.pink.rabbit" 仅会被发送到Q2一次,即使它被绑定了2次。"quick.brown.fox" 没有匹配到任何绑定的队列,所以它将会被丢弃。

另外一种情况,如果你违反约定,比如使用 4个单词进行组合,例如 "quick.orange.male.rabbit",那么它将匹配不到任何的队列,消息将会被丢弃。

但是,假如你的消息名为 "lazy.orange.male.rabbit",那么他们将会被发送到Q2,因为 #(井号)可以匹配 0 或者多个单词。

我们结合代码来看一看。

Producer.cs

using System;
using System.Text;
using RabbitMQ.Client;

namespace _2_Publish
{
    class Program
    {
        static void Main(string[] args)
        {
            //创建连接工厂
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //创建连接
            using (var connection = factory.CreateConnection())
            {
                //创建会话
                using (var chancel = connection.CreateModel())
                {
                    //生命交换机
                    chancel.ExchangeDeclare(exchange: "TopicDemo", type: ExchangeType.Topic);

                    string readMsg = "helloWorld";
                    while (readMsg.ToLower() != "exit")
                    {
                        var body = Encoding.UTF8.GetBytes(readMsg);

                        //给交换机发送消息
                        chancel.BasicPublish(exchange: "TopicDemo", routingKey: "Topic.Demo.Key", body: body);
                        Console.WriteLine($"成功发送消息{readMsg}");
                        Console.WriteLine("请输入要发送的内容!");
                        readMsg = Console.ReadLine();
                    }
                }
            }
        }
    }
}

我给发送消息的routingKey指定为Topic.Demo.Key

再来看看消费者

Cuustomer.cs

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace _2_Receiver
{
    class Program
    {
        static void Main(string[] args)
        {
            //创建连接工厂
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //创建连接
            using (var connection = factory.CreateConnection())
            {
                //创建会话
                using (var channel = connection.CreateModel())
                {
                    //声明一个Fanout类型的交换机
                    channel.ExchangeDeclare(exchange: "TopicDemo", type: ExchangeType.Topic);

                    //声明一个消息队列并获取它的名字
                    var queueName = channel.QueueDeclare().QueueName;

                    Console.WriteLine("请输入RoutingKey!");
                    var routingKey = Console.ReadLine();
                    //把消息队列和交换机绑定
                    channel.QueueBind(exchange: "TopicDemo", queue: queueName, routingKey: routingKey);

                    //创建消费者
                    var consume = new EventingBasicConsumer(channel);

                    //把消费者和队列绑定
                    channel.BasicConsume(queue: queueName, autoAck: true,consumer: consume);

                    Console.WriteLine("开始监听消息");

                    consume.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"收到消息{message}");
                    };

                    Console.ReadLine();
                }
            }
        }
    }
}

其RoutingKey也是在外部输入。

我们看看运行效果

因为Producer发布消息的RoutingKey是Topic.Demo.Key

又因为#可以代表0个或者多个单词 ,*能代表一个单词

所以*.*.Key    Topic.#与Topic.Demo.Key匹配,而其他两个*.Key和test.1.2当然是不匹配的,所以没有收到消息。

总结

对于上面的例子,我们可以总结出,编写一个生产者的过程如下:

创建连接工厂-》创建连接-》创建会话(Chanel)-》创建交换机(Exchange)-》发送消息

编写一个生产者的过程如下:

创建连接工厂-》创建连接-》创建会话(Chanel)-》创建交换机(Exchange)-》创建队列-》绑定队列和交换机-》创建消费者-》把消费者和队列绑定-》监听消息

掌握这个大的方向,不管交换机怎么分配,代码应该都会写了。

为什么在生产者中和消费者中都要创建交换机呢? 因为我们不确定是生产者先执行还是消费者先执行,所以提前创建一下,避免连接时发现没有创建交换机,出现错误,如果交换机已经创建了,那么默认不会再次创建的。

另外,交换机创建后,同一名称的交换机使用完不会自动删除,但是第二次如果创建的名称和上次一样,但是交换机类型不一样了,那么便会出现报错。

这里总结的是一些RabbitMQ的基础知识,后面还会继续写一些更深入的使用技巧,如果不想错过精彩信息,点击关注一下吧(๑¯◡¯๑)!

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-05-22 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ----------写在前面----------
  •  -------正文------
  • 基本概念
  • 大白话
  • Fanout
  • Direct
  • Topic
  • 总结
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档