Spring Cloud Bus中的事件的订阅与发布(一)

年前最后一篇文章,提前祝大家新年快乐!

下面进入正文。Spring Cloud Bus用轻量级的消息代理将分布式系统的节点连接起来。这可以用来广播状态的该表(比如配置的改变)或者其他关联的指令。一个关键的想法是,总线就像是一个分布式Actuator,用于Spring Boot应用程序的扩展,但它也可以用作应用程序之间的通信通道。Spring Cloud提供了AMQP 传输的代理和Kafka启动Starters,对具有相同的基本功能集的其他传输组件的支持,也在未来的规划中。

Spring Cloud Bus

Spring Cloud Bus是在Spring Cloud Stream的基础上进行的封装,对于指定主题的消息的发布与订阅是通过Spring Cloud Stream的具体binder实现。因此引入的依赖可以是spring-cloud-starter-bus-amqpspring-cloud-starter-bus-kafka其中的一种,分别对应于binder的两种实现。根据上一节的基础应用,我们总结出Spring Cloud Bus的主要功能如下两点:

  • 对指定主题springCloudBus的消息订阅与发布。
  • 事件监听,包括刷新事件、环境变更事件、远端应用的ack事件以及本地服务端发送事件等。

下面我们以这两方面作为主线,进行Spring Cloud Bus的源码分析。本文主要针对事件的订阅户发布。

事件的订阅与发布

事件驱动模型

这部分需要读者首先了解下Spring的事件驱动模型。我们在这边简单介绍下设计的主要概念,帮助大家易于理解后面的内容。

event-source

Spring的事件驱动模型由三部分组成:

  • 事件:ApplicationEvent,继承自JDK的EventObject,所有事件将继承它,并通过source得到事件源。
  • 事件发布者:ApplicationEventPublisher及ApplicationEventMulticaster接口,使用这个接口,我们的Service就拥有了发布事件的能力。
  • 事件订阅者:ApplicationListener,继承自JDK的EventListener,所有监听器将继承它。

事件的定义

Spring的事件驱动模型的事件定义均继承自ApplicationEventSpring Cloud Bus中有多个事件类,这些事件类都继承了一个重要的抽象类RemoteApplicationEvent,我们看一下事件类的类图:

bus-event

涉及的事件类有:代表了对特定事件确认的事件AckRemoteApplicationEvent、环境变更的事件EnvironmentChangeRemoteApplicationEvent、刷新事件RefreshRemoteApplicationEvent、发送事件 SentApplicationEvent、以及未知事件UnknownRemoteApplicationEvent。下面我们分别看一下这些事件的定义。

抽象基类:RemoteApplicationEvent

通过上面的类图,我们知道RemoteApplicationEvent是其他事件类的基类,定义了事件对象的公共属性。

 1@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") //序列化时使用子类的名称作为type
 2@JsonIgnoreProperties("source") //序列化时,忽略 source
 3public abstract class RemoteApplicationEvent extends ApplicationEvent {
 4    private static final Object TRANSIENT_SOURCE = new Object();
 5    private final String originService;
 6    private final String destinationService;
 7    private final String id;
 8
 9    protected RemoteApplicationEvent(Object source, String originService,
10            String destinationService) {
11        super(source);
12        this.originService = originService;
13        if (destinationService == null) {
14            destinationService = "**";
15        }
16
17        if (!"**".equals(destinationService)) {
18            if (StringUtils.countOccurrencesOf(destinationService, ":") <= 1
19                    && !StringUtils.endsWithIgnoreCase(destinationService, ":**")) {
20                //destination的所有实例
21                destinationService = destinationService + ":**";
22            }
23        }
24        this.destinationService = destinationService;
25        this.id = UUID.randomUUID().toString();
26    }
27    ...
28}

RemoteApplicationEvent中定义了主要的三个通用属性事件的来源originService、事件的目的服务destinationService和随机生成的全局id。通过其构造方法可知,destinationService可以使用通配符的形式{serviceId}:{appContextId},两个变量都省略的话,则通知到所有服务的所有实例。只省略appContextId时,则对应的destinationService为相应serviceId的所有实例。另外,注解@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")对应于序列化时,使用子类的名称作为type;而@JsonIgnoreProperties("source")表示序列化时,忽略source属性,source定义在JDK中的EventObject

EnvironmentChangeRemoteApplicationEvent

用于动态更新服务实例的环境属性,我们在基础应用中更新cloud.version属性时,关联到该事件。

 1public class EnvironmentChangeRemoteApplicationEvent extends RemoteApplicationEvent {
 2
 3    private final Map<String, String> values;
 4
 5    public EnvironmentChangeRemoteApplicationEvent(Object source, String originService,
 6            String destinationService, Map<String, String> values) {
 7        super(source, originService, destinationService);
 8        this.values = values;
 9    }
10    ...
11}

可以看到,EnvironmentChangeRemoteApplicationEvent事件类的实现很简单。定义了Map类型的成员变量,key对应于环境变量名,而value对应更新后的值。

RefreshRemoteApplicationEvent

刷新远端应用配置的事件,用于接收远端刷新的请求。

1public class RefreshRemoteApplicationEvent extends RemoteApplicationEvent {
2    public RefreshRemoteApplicationEvent(Object source, String originService,
3            String destinationService) {
4        super(source, originService, destinationService);
5    }
6}

继承自抽象事件类RemoteApplicationEvent,没有特别的成员属性。

AckRemoteApplicationEvent

确认远端应用事件,该事件表示一个特定的RemoteApplicationEvent事件被确认。

 1public class AckRemoteApplicationEvent extends RemoteApplicationEvent {
 2
 3    private final String ackId;
 4    private final String ackDestinationService;
 5    private Class<? extends RemoteApplicationEvent> event;
 6
 7    public AckRemoteApplicationEvent(Object source, String originService,
 8            String destinationService, String ackDestinationService, String ackId,
 9            Class<? extends RemoteApplicationEvent> type) {
10        super(source, originService, destinationService);
11        this.ackDestinationService = ackDestinationService;
12        this.ackId = ackId;
13        this.event = type;
14    }
15    ...
16
17    public void setEventName(String eventName) {
18        try {
19            event = (Class<? extends RemoteApplicationEvent>) Class.forName(eventName);
20        } catch (ClassNotFoundException e) {
21            event = UnknownRemoteApplicationEvent.class;
22        }
23    }
24}

该事件类在RemoteApplicationEvent基础上,定义了成员属性ackId、ackDestinationService和event。 ackId和ackDestinationService,分别表示确认的时间的id和对应的目标服务。event对应事件类型,确认事件能够确认的必然是RemoteApplicationEvent的子类,因此event属性设值时需要进行检查,如果转换出现异常,则定义为未知的事件类型。这些事件可以被任何需要统计总线事件响应的应用程序来监听。 它们的行为与普通的远程应用程序事件相似,即如果目标服务与本地服务ID匹配,则应用程序会在其上下文中触发该事件。

SentApplicationEvent

发送应用事件,表示系统中的某个地方发送了一个远端事件。

 1@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 2@JsonIgnoreProperties("source")
 3public class SentApplicationEvent extends ApplicationEvent {
 4
 5    private static final Object TRANSIENT_SOURCE = new Object();
 6    private final String originService;
 7    private final String destinationService;
 8    private final String id;
 9    private Class<? extends RemoteApplicationEvent> type;
10
11    protected SentApplicationEvent() {
12        // for serialization libs like jackson
13        this(TRANSIENT_SOURCE, null, null, null, RemoteApplicationEvent.class);
14    }
15
16    public SentApplicationEvent(Object source, String originService,
17            String destinationService, String id,
18            Class<? extends RemoteApplicationEvent> type) {
19        super(source);
20        this.originService = originService;
21        this.type = type;
22        if (destinationService == null) {
23            destinationService = "*";
24        }
25        if (!destinationService.contains(":")) {
26            // All instances of the destination unless specifically requested
27            destinationService = destinationService + ":**";
28        }
29        this.destinationService = destinationService;
30        this.id = id;
31    }
32    ...
33}

可以看到该事件类继承自ApplicationEvent,它本身并不是一个RemoteApplicationEvent事件,所以不会通过总线发送,而是在本地生成(多为响应远端事件)。想要审计远端事件的应用可以监听该事件,并且所有的AckRemoteApplicationEvent事件中的id来源于相应的SentApplicationEvent中定义的id。在其定义的成员属性中,相比于远端应用事件多了一个事件类型type,该类型限定于RemoteApplicationEvent的子类。

UnknownRemoteApplicationEvent

未知的远端应用事件,也是RemoteApplicationEvent事件类的子类。该事件类与之前的SentApplicationEventAckRemoteApplicationEvent有关,当序列化时遇到事件的类型转换异常,则自动构造成一个未知的远端应用事件。

事件监听器以及消息的订阅与发布待后续更新。。

参考

Spring Cloud Bus-v1.3.3

原文发布于微信公众号 - aoho求索(aohoBlog)

原文发表时间:2018-02-13

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏GreenLeaves

WebService 之 身份验证

  在项目开发,我们经常会使用WebService,但在使用WebService时我们经常会考虑到了WebService是安全问题,很容易想到通过一组用户名与密...

3487
来自专栏Jack-Cui

Linux应用层系统时间写入RTC时钟的方法

Linux内核版本:linux-3.0.35 开发板:i.MX6S MY-IMX6-EK200 系统:Ubuntu12 前言:之前写过一篇关于如...

2150
来自专栏扎心了老铁

springboot mybatis 事务管理

本文主要讲述springboot提供的声明式的事务管理机制。 一、一些概念 声明式的事务管理是基于AOP的,在springboot中可以通过@Transacti...

5007
来自专栏芋道源码1024

注册中心 Eureka 源码解析 —— EndPoint 与 解析器

目前有多种 Eureka-Server 访问地址的配置方式,本文只分享 Eureka 1.x 的配置,不包含 Eureka 1.x 对 Eureka 2.x 的...

1210
来自专栏java学习

关于Spring面试题讲解2

依赖注入,是IOC的一个方面,是个通常的概念,它有多种解释。这概念是说你不用创建对象,而只需要描述它如何被创建。你不在代码里直接组装你的组件和服务,但是要在配置...

842
来自专栏芋道源码1024

面试问烂的 Spring AOP 原理

来源:https://www.jianshu.com/p/e18fd44964eb

2094
来自专栏Android相关

处理器结构--ReorderBuffer

Reorder Buffer用来保存在乱序执行之前的(OOOE)指令执行顺序,当指令集合在乱序执行后按照原有指令顺序将结果提交。

1384
来自专栏xdecode

使用Dagger2做静态注入, 对比Guice.

Dagger 依赖注入的诉求, 这边就不重复描述了, 在上文Spring以及Guice的IOC文档中都有提及, 既然有了Guice, Google为啥还要搞个D...

4877
来自专栏开源优测

[接口测试 - http.client篇] 16 基于http.client之POM实战一下

概述 关注公众号回复: http.client_pom_demo 获取本文示例源码 你需要了解以下知识和技术,以便掌握后续的实例代码: http.client常...

3578
来自专栏Java帮帮-微信公众号-技术文章全总结

Web-第十一天 JSP学习

JSP全名是Java Server Pages,它是建立在Servlet规范之上的动态网页开发技术。在JSP文件中,HTML代码与Java代码共同存在,其中,H...

1513

扫码关注云+社区

领取腾讯云代金券