Spring Cloud 基于Bus 的AB-TEST组件

一、前情提要:

>因剧情需要,所以准备在基础开发平台中进行AB-TEST组件开发。目前主要使用了Spring Cloud E-SR2 版本,其中使用了kafka作为内置bus总线,另一组kafka用于监控trace推送(如zipkin、自定义监控)。AB-TEST大家都应该了解过,如不了解请参考 https://www.itcodemonkey.com/article/10398.html ,这里就不多讲了。其实简单来说就是根据配置好的分桶模式如A 、B、C ,在进行逻辑处理时根据某一类条件(如uid)计算出当前用户分桶进行动态的逻辑处理(简单来说就是多个if else)。

二、方案选型:

如要做成组件化那必然要对上层开发透明,最好时无感知的进行逻辑切换,当然我们第一步不需要那么完美,先实现功能组件。在进行技术选型的时候参考了两种模式: 1、zookeeper 优点:技术简单,有定义好的工具 缺点:增加应用依赖的组件,当zk出现问题时造成ab-test失效 2、bus总线 优点:实现简单,耦合度低 缺点:需要详解cloud bus 机制 当然我们选择了后者,因为在基础平台中,组件的侵入性越低,对上层开发越友好,而且就越稳定。

三、Spring CLoud Bus 事件机制

因为目前使用的是Spring Cloud E SR2全家桶,所以在技术处理上也遵循其原则,尽量使用内置技术栈实现。内部主要以cloud bus机制进行了简单的扩展实现,下面我们先来简单了解下BUS EVENT机制。

Bus 机制主要建立在 stream的基础之上,在cloud的下 我们可以选择rabbitmq 或者kafka,其特点主要是针对spring event消息的订阅与发布。

Spring Event 事件驱动模型

可以看出模型由三部分构成:事件:ApplicationEvent,继承自JDK的EventObject,所有事件将继承它,并通过source得到事件源发布者:ApplicationEventPublisher及ApplicationEventMulticaster接口,使用这个接口,我们的Service就拥有了发布事件的能力。订阅者:在spring bus 中可以实现 ApplicationListener(继承自JDK的EventListener),所有监听器将继承它或添加@EventListener

Bus 事件众所周知,在bus使用中,最多的场景就是配置中心的动态配置刷新。也就是说我们通过/bus/refresh 接口调用就可以进行针对性的配置刷新了,根据这条线,我们来看下内部的源码结构。

1、通过rest 接口进行外部请求此处cloud 借助其端点监控机制实现,主要看 RefreshBusEndpoint,当然Cloud E 和 F版本有些许不一样,但其实不难理解Cloud E SR2

Cloud F SR2

通过上面的代码可以看到,请求进来的话都调用了 AbstractBusEndpoint 的publish进行了事件发布

其中ApplicationEventPublisher 在哪定义的呢,这就不得不说BusAutoConfiguration 这里了,这是bus的核心加载器,在通过外部接口调用发布事件后内部对事件进行了监听和处理就在BusAutoConfiguration中,如下:

观看完内部事件消费,和stream消息订阅,那bus 的stream又是怎么进行初始化和工作的呢,答案依然在BusAutoConfiguration 中,如下:

对于cloud stream就不多说了,这里很简单的进行了初始化,所以对于发布和订阅消息就很清晰了。其实在BusAutoConfiguration中所以的事件消费、发布、订阅都是为了集群内部的通信,而真正的事件处理却不此处。那么对于配置的刷新行为到底在哪呢,经过查看对于刷新的操作要看RefreshListener 了。如下图 RefreshListener 针对事件进行了监听,其事件使用的是RefreshRemoteApplicationEvent,其继承RemoteApplicationEvent。

通过源码可以看出,事件的刷新行为contextRefresher.refresh(); (当然contextRefresher 定义也是在BusAutoConfiguration 有兴趣可以查看下),对于刷新到底怎么实现的,也就接近了bus config刷新的核心:ContextRefresher

经过以上的源码,脉络就很清晰了:1、外部POST /bus/refresh 进行了刷新行为,发布内部事件RefreshRemoteApplicationEvent2、通过@EventListener 进行内部事件消费,如果是自己内部发布的事件就通过stream进行广播3、通过@StreamListener 对stream进行监听,如果是给自己的事件,就进行内部转发,具体需不需要ack trace则根据配置进行。4、内部通过RefreshListener 消费事件,通过ContextRefresher.refresh 进行配置刷新这下一个完整的bus机制就展现在我们眼前,我们只需要简单的进行改造,就能实现自己的动态事件推送了。

四、AB-TEST机制实现:

1、应用启动load分桶数据看过了bus的模式,这里就简单咯。在这里我们通过继承 PropertyResourceConfigurer 来实现配置的初始化,而配置的来源就是cloud 配置中心的 application.properties,因为使用了配置中心后此配置会自动加载不要额外的处理。然后在对加载的配置进行归类即可(因我为test配置定义了前缀,所以只需过滤其即可),模仿bus配置筛选即可。

2、在请求来临时进行动态计算分桶定义自定义注解@NoveTest用于标注需要进行测试的方法,定义NoveParamInterceptor 对入参进行解析,定义NoveTestInterceptor内部拦截器进行注解切入 ,为增加了@NoveTest进行动态分桶计算。

其中分桶计算的策略很简单,通过uid + 因子进行hash 计算index 获取数据

好了,定义完成,上层应用开发使用方式:

3、修改配置进行分桶策略的动态刷新分桶策略不可能一直不变,而且变化的时候也不应该重新发版,那真是太恶心人了。所以动态配置推送就至关重要了,当然大家也可以使用自带的bus总线进行数据推送,但是其destroy的问题真是恶心到家了,而且有可能造成服务大面积瘫痪,慎用。基于种种问题,就需要自定义bus event,结合其现有的bus进行数据推送。

其实很简单,分为几步:自定义端点,进行自定义事件发送。(事件广播不需要特殊处理)自定义Listener进行本地事件消费,进行数据刷新注意几点,在Cloud E 和F 版本有一些不同之处。主要在于端点的暴露的策略上,在F版本上使用如下

SO 到目前为止一个可用于生产的AB-TEST 机制就实现了,有的同学就会说了,你这根据数据在进行if else 的逻辑判断调用真是恶心到人了。确实,因为第一版目前是最简单的实现,在第二版将进行动态调用的优化。其实动态调用说来也很简单,无非通过定义的接口实现各个不同的逻辑,然后针对其进行简单的动态代理即可。后期源码会同步上传。

总结:自定义AB-TEST组件的过程1、自定义内部端点暴露动态配置刷新,发送刷新事件2、接收事件,定义刷新策略,只刷新需要的配置即可3、定义启动初始化方式4、通过动态代理实现动态逻辑调用(待完成)

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190105G0U72S00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

同媒体快讯

扫码关注云+社区

领取腾讯云代金券