Spring Cloud Bus中的事件的订阅与发布(二)

在之前的文章Spring Cloud Bus中的事件的订阅与发布(一)介绍了消息总线的相关事件。本文主要介绍消息总线的事件监听器以及消息的订阅与发布。

事件监听器

Spring Cloud Bus中,事件监听器的定义可以是实现ApplicationListener接口,或者是使用@EventListener注解的形式。我们看一下事件监听器的类图。

listener

ApplicationListener接口实现有两个:刷新监听器RefreshListener和环境变更监听器EnvironmentChangeListener

RefreshListener

RefreshListener对应的事件是RefreshRemoteApplicationEvent

 1public class RefreshListener
 2        implements ApplicationListener<RefreshRemoteApplicationEvent> {
 3    private ContextRefresher contextRefresher;
 4
 5    public RefreshListener(ContextRefresher contextRefresher) {
 6        this.contextRefresher = contextRefresher;
 7    }
 8
 9    @Override
10    public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
11        Set<String> keys = contextRefresher.refresh();
12        log.info("Received remote refresh request. Keys refreshed " + keys);
13    }
14}

对于刷新时间的处理,调用ContextRefresherrefresh()方法,而定义在Spring Cloud Context中的ContextRefresher用于提供上下文刷新的功能。我们具体看一下refresh()方法。

 1    public synchronized Set<String> refresh() {
 2        Map<String, Object> before = extract(
 3                this.context.getEnvironment().getPropertySources());
 4        addConfigFilesToEnvironment();
 5        Set<String> keys = changes(before,
 6                extract(this.context.getEnvironment().getPropertySources())).keySet();
 7        this.context.publishEvent(new EnvironmentChangeEvent(keys));
 8        this.scope.refreshAll();
 9        return keys;
10    }

实现很简单,先获取之前环境变量的key-value,然后重新加载新的配置环境文件,通过比对新旧环境变量的map集合,然后发布新的环境变更EnvironmentChangeEvent的事件。this.scope.refreshAll()销毁了在这个范围内,当前实例的所有bean并在下次方法的执行时强制刷新。

EnvironmentChangeListener

EnvironmentChangeListener对应的事件类是EnvironmentChangeRemoteApplicationEvent

 1public class EnvironmentChangeListener
 2        implements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> {
 3    @Autowired
 4    private EnvironmentManager env;
 5
 6    @Override
 7    public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) {
 8        Map<String, String> values = event.getValues();
 9        for (Map.Entry<String, String> entry : values.entrySet()) {
10            env.setProperty(entry.getKey(), entry.getValue());
11        }
12    }
13}

RefreshListener的实现中,可以知道该事件的实现最终又发布了一个新的事件EnvironmentChangeListener。在刷新监听器中,构造了变更了的环境变量的map,交给环境变更监听器。上面对环境变更事件的处理,遍历变更了的配置环境属性,并在本地应用程序的环境中将新的属性值设置到对应的键。

TraceListener

TraceListener的实现是通过注解@EventListener的形式,监听的事件为:确认事件AckRemoteApplicationEvent和发送事件SentApplicationEvent

 1@EventListener
 2    public void onAck(AckRemoteApplicationEvent event) {
 3        this.repository.add(getReceivedTrace(event));
 4    }
 5
 6    @EventListener
 7    public void onSend(SentApplicationEvent event) {
 8        this.repository.add(getSentTrace(event));
 9    }
10
11    protected Map<String, Object> getSentTrace(SentApplicationEvent event) {
12        Map<String, Object> map = new LinkedHashMap<String, Object>();
13        map.put("signal", "spring.cloud.bus.sent");
14        map.put("type", event.getType().getSimpleName());
15        map.put("id", event.getId());
16        map.put("origin", event.getOriginService());
17        map.put("destination", event.getDestinationService());
18        if (log.isDebugEnabled()) {
19            log.debug(map);
20        }
21        return map;
22    }
23
24    protected Map<String, Object> getReceivedTrace(AckRemoteApplicationEvent event) {
25        Map<String, Object> map = new LinkedHashMap<String, Object>();
26        map.put("signal", "spring.cloud.bus.ack");
27        map.put("event", event.getEvent().getSimpleName());
28        map.put("id", event.getAckId());
29        map.put("origin", event.getOriginService());
30        map.put("destination", event.getAckDestinationService());
31        if (log.isDebugEnabled()) {
32            log.debug(map);
33        }
34        return map;
35    }

在SentTrace中,主要记录了signal、事件类型type、id、源服务origin和目的服务destination的属性值。而在ReceivedTrace中,表示对事件的确认,主要记录了signal、事件类型event、id、源服务origin和目的服务destination的属性值。这些信息默认存储于内存中,可以通过/trace端点获取最近的事件信息,如下图所示:

 1{
 2    "timestamp": 1517229555629,
 3    "info": {
 4        "signal": "spring.cloud.bus.sent",
 5        "type": "RefreshRemoteApplicationEvent",
 6        "id": "c73a9792-9409-47af-993c-65526edf0070",
 7        "origin": "config-server:8888",
 8        "destination": "config-client:8000:**"
 9    }
10},
11{
12    "timestamp": 1517227659384,
13    "info": {
14        "signal": "spring.cloud.bus.ack",
15        "event": "RefreshRemoteApplicationEvent",
16        "id": "846f3a17-c344-4d29-93f3-01b73c5bf58f",
17        "origin": "config-client:8000",
18        "destination": "config-client:8000:**"
19    }
20}

至于事件的发起,我们将在下一节结合消息的订阅与发布一起讲解。

消息的订阅与发布

Spring Cloud Bus基于Spring Cloud Stream,对特定主题的消息进行订阅与发布,事件以消息的形式传递到其他服务实例。

通道定义

既然是基于stream,我们首先看一下input和output的通道定义。

 1public interface SpringCloudBusClient {
 2
 3String INPUT = "springCloudBusInput";
 4
 5String OUTPUT = "springCloudBusOutput";
 6
 7@Output(SpringCloudBusClient.OUTPUT)
 8MessageChannel springCloudBusOutput();
 9
10@Input(SpringCloudBusClient.INPUT)
11SubscribableChannel springCloudBusInput();
12}

可以看到,bus中定义了springCloudBusInputspringCloudBusOutput两个通道,分别用于定于订阅与发布springCloudBus的消息。

bus属性定义

其次,我们看一下bus中关于stream的属性定义。在基础应用中我们就知道bus订阅的话题是springCloudBus,下面看一下在bus中的其他属性的定义。

 1@ConfigurationProperties("spring.cloud.bus")
 2public class BusProperties {
 3
 4//环境变更相关的属性
 5private Env env = new Env();
 6// 刷新事件相关的属性
 7private Refresh refresh = new Refresh();
 8//与ack相关的属性
 9private Ack ack = new Ack();
10//与追踪ack相关的属性
11private Trace trace = new Trace();
12//Spring Cloud Stream消息的话题
13private String destination = "springCloudBus";
14
15//标志位,bus是否可用
16private boolean enabled = true;
17
18...
19}

上面的bus属性,设置了一些默认值,正好与事实也是相符的,我们没有进行任何spring.cloud.bus配置也能够进行正常运行。通过在配置文件中修改相应的属性,实现bus的更多功能扩展。env、refresh、ack和trace分别对应不同的事件,在配置文件中有一个开关属性,默认都是开启的,我们可以根据需要进行关闭。

消息的监听与发送

上面两部分讲了stream通道和基本属性的定义,最后我们看下bus中对指定主题的消息如何发送与监听处理。在META-INF/spring.factories配置了EnableAutoConfiguration配置项为BusAutoConfiguration,在服务启动时会自动加载到Spring容器中,其中对于指定主题的消息如何发送与监听处理如下:

 1@Configuration
 2@ConditionalOnBusEnabled //bus启用的开关
 3@EnableBinding(SpringCloudBusClient.class) //绑定通道
 4@EnableConfigurationProperties(BusProperties.class)
 5public class BusAutoConfiguration implements ApplicationEventPublisherAware {
 6
 7//注入source接口,用于发送消息
 8@Autowired
 9@Output(SpringCloudBusClient.OUTPUT)
10private MessageChannel cloudBusOutboundChannel;
11
12// 监听RemoteApplicationEvent事件
13@EventListener(classes = RemoteApplicationEvent.class)
14public void acceptLocal(RemoteApplicationEvent event) {
15    if (this.serviceMatcher.isFromSelf(event)
16            && !(event instanceof AckRemoteApplicationEvent)) {
17        //当事件是来自自己的并且不是ack事件,则发送消息
18    this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
19    }
20}
21//消息的消费,也是事件的发起
22@StreamListener(SpringCloudBusClient.INPUT)
23public void acceptRemote(RemoteApplicationEvent event) {
24    if (event instanceof AckRemoteApplicationEvent) {
25        //ack事件
26        if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
27                && this.applicationEventPublisher != null) {
28            //当开启bus追踪且不是自己的ack事件,则通知所有的注册该事件的监听者,否则直接返回
29            this.applicationEventPublisher.publishEvent(event);
30        }
31        return;
32    }
33    //消费消息,该消息属于自己
34    if (this.serviceMatcher.isForSelf(event)
35            && this.applicationEventPublisher != null) {
36        //不是自己发布的事件,正常处理
37        if (!this.serviceMatcher.isFromSelf(event)) {
38            this.applicationEventPublisher.publishEvent(event);
39        }
40        //消费之后,需要发送ack确认事件
41        if (this.bus.getAck().isEnabled()) {
42            AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
43                    this.serviceMatcher.getServiceId(),
44                    this.bus.getAck().getDestinationService(),
45                    event.getDestinationService(), event.getId(), event.getClass());
46            this.cloudBusOutboundChannel
47                    .send(MessageBuilder.withPayload(ack).build());
48            this.applicationEventPublisher.publishEvent(ack);
49        }
50    }
51    //事件追踪相关,若是开启追踪事件则执行
52    if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
53        // 不论其来源,准备发送事件,发布了之后供本地消费
54        this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
55                event.getOriginService(), event.getDestinationService(),
56                event.getId(), event.getClass()));
57    }
58}
59
60//...
61}

@ConditionalOnBusEnabled注解是bus的开关,默认开启。@EnableBinding绑定了SpringCloudBusClient中定义的通道。在应用服务启动时,自动化配置类加载了bus的API端点、刷新、ACK追踪以及bus环境变量的配置等beans。 @Output表示输出output绑定目标将由框架创建,由该通道发送消息。 还涉及到上面列出来的两个主要方法:acceptLocalacceptRemote

acceptLocal是一个基于注解实现的事件监听器,监听的事件类型是RemoteApplicationEvent,对于该事件的处理方法是,当事件是来自自己的并且不是ack事件,则发送消息。

@StreamListener注解是Spring Cloud Stream中提供的,用来标识一个方法作为@EnableBinding绑定的input通道的监听器。acceptRemote方法, 传递的参数RemoteApplicationEvent就是stream中的消息。如果是确认类事件,当开启了事件追踪且事件不是来自于自身,则发布该事件,对于确认类事件,处理已经完成; 如果自身需要处理该事件且该事件不是来自自身,则发布该事件。需要注意的是,当开启事件追踪时,构造一个确认事件并将该事件发布;最后,当开启了事件追踪,这边的处理是注册已发送的事件,以便发布供本地消费,而不论其来源。

总结

本文在上一篇介绍Spring Cloud Bus中的事件基础上,结合源码继续介绍事件的监听器以及事件的订阅与发布是如何在消息总线中实现的。 消息总线常用于传播状态的变更和管理指令的发布。而消息总线最常用的场景就是更新应用服务的配置信息,需要结合Config Server使用,当然消息总线的实现其实是基于Spring Cloud Stream,Stream封装了各种不同的MQ中间件,产生的消息实则是推送配置信息的变更。

参考

Spring Cloud Bus-v1.3.3

原文发布于微信公众号 - aoho求索(aohoBlog)

原文发表时间:2018-03-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java帮帮-微信公众号-技术文章全总结

Web-第三十一天 WebService学习【悟空教程】

简单的网络应用使用单一语言写成,它的唯一外部程序就是它所依赖的数据库。大家想想是不是这样呢?

1884
来自专栏软件开发 -- 分享 互助 成长

linux下进程相关操作

一、定义和理解 狭义定义:进程是正在运行的程序的实例。 广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。 进程的概念主要有两点: 第一...

2225
来自专栏拂晓风起

Spring 获取web根目录 (Spring线程获取web目录/路径/根目录,普通类获取web目录)

1033
来自专栏闵开慧

tomcat6.0下找不到jasper-runtime.jar

今天有点需求,需要用jasper-runtime.jar包。但是我在我的\apache-tomcat-6.0.16\lib目录下,怎么也找不到这个jar包。结果...

3215
来自专栏流柯技术学院

Ant_build.xml的最完整解释

Ant的概念 Make命令是一个项目管理工具,而Ant所实现功能与此类似。像make,gnumake和nmake这些编译工具都有一定的缺陷,但是Ant却克服了这...

1272
来自专栏乐百川的学习频道

Python 日志输出

打印日志是很多程序的重要需求,良好的日志输出可以帮我们更方便的检测程序运行状态。Python标准库提供了logging模块,让我们也可以方便的在Python中打...

2779
来自专栏逸鹏说道

小解Redis 系列

官网:http://redis.io/ 推荐一个开源组件:StackExchange.Redis https://github.com/StackExchang...

2999
来自专栏博岩Java大讲堂

Java日志体系(log4j)

65411
来自专栏编码小白

ofbiz初级教程

本教程是ofbiz 基本应用,它涵盖了OFBiz应用程序开发过程的基本原理。目标是使开发人员熟悉最佳实践,编码惯例,基本控制流程以及开发人员对OFBiz定制所需...

1.5K3
来自专栏happyJared

Jmeter 压测 http(s)

  上一篇文章关于Jmeter介绍了Jmeter入门相关的知识。本文是实战篇,讲讲如何使用Jmeter对Http(s)进行压力测试。

4272

扫码关注云+社区

领取腾讯云代金券