专栏首页领域驱动设计DDD实战进阶微服务实战(三):落地微服务架构到直销系统(构建基于RabbitMq的消息总线)

微服务实战(三):落地微服务架构到直销系统(构建基于RabbitMq的消息总线)

从前面文章可以看出,消息总线是EDA(事件驱动架构)与微服务架构的核心部件,没有消息总线,就无法很好的实现微服务之间的解耦与通讯。通常我们可以利用现有成熟的消息代理产品或云平台提供的消息服务来构建自己的消息总线;也可以自己完全写一个消息代理产品,然后基于它构建自己的消息总线。通常我们不用重复造轮子(除非公司有特殊的要求,比如一些大型互联网公司考虑到自主可控的白盒子),可以利用比如像RabbitMq这样成熟的消息代理产品作为消息总线的底层支持。

RabbitMq核心组件解释:

Connection:消息的发送方或订阅方通过它连接到RabbitMq服务器。

Channel:消息的发送方或订阅方通过Connection连接到RabbitMq服务器后,通过Channel建立会话通道。

Exchange:消息的发送方向Exchange发送消息,通过RabbitMq服务器中Exchange与Queue的绑定关系,Exchange会将消息路由到匹配的Queue中。

Queue:消息的承载者,消息的发送者的消息最终通过Exchange路由到匹配的Queue,消息的接收者从Queue接收消息并进行处理。

Exchange模式:在消息发送到Exchange时,需要路由到匹配的Queue中,至于如何路由,则是由Exchange模式决定的。

1.Direct模式:特定的路由键(消息类型)转发到该Exchange的指定Queue中。

2.Fanout模式:发送到该Exchange的消息,被同时发送到Exchange下绑定的所有Queue中。

3.Topic模式:具有某种特征的消息转发到该Exchange的指定Queue中。

我们最常见的使用是Direct模式,如果消息要被多个消费者消费,则可以使用Fanout模式。

实现基于RabbitMq的消息总线: 我们首先需要安装Erlang与RabbitMq到服务器上,然后就可以进行基于RabbitMq的消息总线的开发了,开发的总体思路与步骤如下:

1.首先建立一个项目作为消息总线,然后引入Rabbitmq.Client 这个nuget包,这样就有了RabbitMq开发的支持。

2.前面实现了基本的消息总线,所有基于RabbitMq的消息总线是从它继承下来的,并需要传入特定的参数到消息总线的构造函数中:

 public RabbitMqEB(IConnectionFactory connectionFactory,IEventHandlerExecutionContext context,
            string exchangeName,string exchangeType,string queueName,int publisherorconsumer,
            bool autoAck = true) : base(context)
        {
            this.connectionFactory = connectionFactory;
            this.connection = this.connectionFactory.CreateConnection();
            this.exchangeName = exchangeName;
            this.exchangeType = exchangeType;
            this.autoAck = autoAck;
            this.queueName = queueName;
            if (publisherorconsumer == 2)
            {
                this.channel = CreateComsumerChannel();
            }
        }

connectionFactory:RabbitMq.Client中的类型,用于与RabbitMq服务器建立连接时需要使用的对象。

context:消息与消息处理器之间的关联关系的对象。

exchangeName:生产者或消费者需要连接到的Exchange的名字。

exchangeType:前面所描述的Exchange模式。

queueName:生产者或消费者发送或接收消息时的Queue的名字。

publisherorconsumer:指定连接到消息总线的组件是消息总线的生产者还是消费者,消费者和生产者会有不同,消费者(publisherorconsumer==2)会构建一个消费通道,用于从Queue接收消息并调用父类的ieventHandlerExecutionContext的HandleAsync方法来处理消息。

3.建立到RabbitMq的连接: 

//判断是否已经建立了连接
public bool IsConnected
        {
            get { return this.connection != null && this.connection.IsOpen; }
        }
        public bool TryConnect()
        {
           //出现连接异常时的重试策略,通常通过第三方nuget包实现重试功能,这里出现连接异常时,每个1秒重试一次,共重试5次
            var policy = RetryPolicy.Handle<SocketException>().Or<BrokerUnreachableException>()
                .WaitAndRetry(5, p => TimeSpan.FromSeconds(1),(ex,time)=> {
                    //记录错误日志
                });
            policy.Execute(() =>
            {
                //建立RabbitMq Server的连接
                this.connection = this.connectionFactory.CreateConnection();
            });
            if (IsConnected)
            {
                return true;
            }
            return false;
        }

 4.创建消费者通道:

private IModel CreateComsumerChannel()
        {
            if (!IsConnected)
            {
                TryConnect();
            }
            var channel = this.connection.CreateModel();            
            channel.ExchangeDeclare(exchange: exchangeName, type: exchangeType, durable: true);
            channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false,
                arguments: null);
            var consumer = new EventingBasicConsumer(channel);
          //消费者接收到消息的处理
            consumer.Received += async (model, ea) =>
                {
                    var eventbody = ea.Body;
                    var json = Encoding.UTF8.GetString(eventbody);
                    var @event = (IEvent)JsonConvert.DeserializeObject(json);
                   //调用关联对象中消息对应的处理器的处理方法
                    await this.eventHandlerExecutionContext.HandleAsync(@event);
                   //向会话通道确认此消息已被处理
                    channel.BasicAck(ea.DeliveryTag, multiple: false);
                };
            channel.BasicConsume(queue: this.queueName, autoAck: false, consumer: consumer);
            
            channel.CallbackException += (sender, ea) =>
            {
                this.channel.Dispose();
                this.channel = CreateComsumerChannel();
             };
            return channel;
        }

5.对生产者发布消息到交换机队列的支持:

 public override void Publish<TEvent>(TEvent @event)
        {
            if (!IsConnected)
            {
                TryConnect();
            }
            using(var channel = this.connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: exchangeName, type: exchangeType, durable: true);
                var message = JsonConvert.SerializeObject(@event);
                var body = Encoding.UTF8.GetBytes(message);
              //发布到交换机,根据交换机与队列的绑定以及交换机模式,最终发布到指定的队列中
                channel.BasicPublish(this.exchangeName, @event.GetType().FullName,null, body);
            }
        }

6.对订阅者从交换机队列中订阅消息的支持:

 public override void Subscribe<TEvent, TEventHandler>()
        {
           //注册接收到的消息类型到订阅方的处理器之间的关系
            if (!this.eventHandlerExecutionContext.IsRegisterEventHandler < TEvent,TEventHandler>()){
                this.eventHandlerExecutionContext.RegisterEventHandler<TEvent, TEventHandler>();
              //消费者进行队列绑定
                this.channel.QueueBind(this.queueName, this.exchangeName, typeof(TEvent).FullName);
            }
        }

从上面的6个步骤,我们基本上就完成了基于RabbitMq消息总线的基本功能,这里需要说明的是,上述代码只是演示,在实际生产环境中,不能直接使用以上代码,还需要小心的重构此代码以保证可靠性与性能。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • DDD实战进阶第一波(九):开发一般业务的大健康行业直销系统(实现经销商上下文仓储与领域逻辑)

    用户1910585
  • DDD实战进阶第一波(九):开发一般业务的大健康行业直销系统(实现经销商上下文仓储与领域逻辑)

    用户1910585
  • DDD实战进阶第一波(十三):开发一般业务的大健康行业直销系统(订单上下文领域逻辑)

    用户1910585
  • Spark Scala当中reduceByKey(_+_) reduceByKey((x,y) => x+y)的用法

    reduceByKey(_+_)是reduceByKey((x,y) => x+y)的一个 简洁的形式 */ val rdd08 = sc.paral...

    马克java社区
  • 36条常用Excel技巧 收藏备用!

    1、两列数据查找相同值对应的位置 =MATCH(B1,A:A,0) 2、已知公式得结果 定义名称=EVALUATE(Sheet1!C1) 已知结果得公式 定义名...

    CDA数据分析师
  • 【Augustzhang 张元龙】知根知底,方能游刃有余

    小编语:据江湖传闻,龙哥从初中就开始写代码,高中通过计算机竞赛免试上了大学,大学里则是ACM大神。2010年毕业加入腾讯,先后从事密保、验证码等后台研发工作,...

    TEG云端专业号
  • Go语言源码笔记 --- netpoller

    总览:Go中网络交互采用多路复用的技术,具体到各个平台,即Kqueue、Epoll、Select、Poll等,下面以Linux下的Epoll实现为例进行分析。

    后台搬砖鹅
  • 搜索结果质量评估(上)

    【废话少说—文章思路】 ? 1.引言 如果说以前的传统报刊、搜索引擎、门户网站等媒介解决的是信息不对称的矛盾,那么现在我们面临的矛盾是信息过载的问题。 无疑,解...

    企鹅号小编
  • 证明费马最后定理的英国数学家,终获2016阿贝尔奖

    大数据文摘
  • ASP.NET MVC 页面校验和区域

    通常来说,web项目通常使用前后端混合校验,使用诸如:Bootstrap Validator,jquery.validate.js,配合 MVC框架来做校验则。

    李郑

扫码关注云+社区

领取腾讯云代金券