针对事件驱动架构的Spring Cloud Stream

今天我们要分享一个比较有意思的内容。就是如何通过spring cloud 的stream来改造一个微服务下事件驱动的框架。

为什么要改造?我们都知道事件驱动的微服务开发框架,一个非常重要的点就是每次的操作和状态转换都是一个事件。而现在的spring cloud stream对这样的频繁而不同类型的事件并不是很友好。本文希望通过改造让cloud stream变成一个对事件驱动的微服务开发更友好更方便的事件驱动框架。

准备工作

我们还是通过spring initializr来新建一个项目吧:

如上,我们引入了web、stream kafka依赖。

然后生成项目并下载,打开项目开始我们的改造之旅吧。

然后我们来看看现在的spring cloud的版本:

<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-dependencies</artifactId>
         <version>Camden.SR6</version>
         <type>pom</type>
         <scope>import</scope>
      </dependency>
   </dependencies>
</dependencyManagement>

现在的版本是Camden.SR6。而我们今天要演示的stream是最新版本,所以我们得把cloud版本修改为Brixton.SR7:

<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-dependencies</artifactId>
         <version>Brixton.SR7</version>
         <type>pom</type>
         <scope>import</scope>
      </dependency>
   </dependencies>
</dependencyManagement>

你也许会在一些事件源框架中,比如Axon中,看到以下类似代码:

public class MyEventHandler {
   @EventHandler
   public void handle(CustomerCreatedEvent event) {
   ...
   }

   @EventHandler
   public void handle(AccountCreatedEvent event) {
   ...
   }
}

没错,这其实就是事件源框架中最终所呈现出来的入口最核心的样子。

现在我们对spring cloud stream进行改造,让它变成一个真正的或者说像Axon那样的一个事件源框架。

Cloud Stream 现有处理事件的做法

在开始真正的改造之前,我们还是先看看spring cloud stream 1.1.2(也就是cloud版本为Camden.SR中的stream版本) 中的消息处理的基本样子:

@StreamListener(Sink.INPUT)
public void handle(Foo foo){
   ...
}

没错,就是一个通过@StreamListener注解的handle方法,就可监听到消息流了。

然后我们看看使用最新的Brixton.SR7版本spring cloud stream的样子:

@EnableBinding
class MyEventHandler {
    @StreamListener
    target=Sink.INPUT,condition="payload.eventType=='CustomerCreatedEvent'")

    public void handleCustomerEvent(@Payload Event event) {
        // handle the message</span>
    }

    @StreamListener
    target=Sink.INPUT,condition="payload.eventType=='AccountCreatedEvent'")

    public void handleAccountEvent(@Payload Event event) {
        // handle the message</span>
    }
}

通过上面的代码,我们知道spring cloud stream可以支持配置一个condition的属性来让不同的事件类型路由到不同的handle方法中来处理。其中condition里边的表达式叫做SpEL,就是spring 表达式,通过返回true或false来表示是否匹配。

另外上面的支持是4天前才发布的。也许就是为了支持最近炒得火热的CQRS+ES而发布的。

之前@StreamListener的源码是这样的:

@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
public @interface StreamListener {

   /**
    * The name of the binding target (e.g. channel) that the method subscribes to.
    */
   String value() default "";

}

发现没?只有一个value方法属性。

而最新的已经增加了condition条件。这显然是为了支持事件驱动的微服务开发而支持的。

我们点进去看看StreamListener新增加了什么:

发现新增了两个方法属性,一个是target,一个是condition。

而且描述也变成了含有“事件驱动”字样。

ok,现在我们已经知道了spring cloud stream的基本用法和代码样子。

最新版的做法已经算是一种不错的改进了。不过,从编程的语法上,它也许并没有我们想要的那么清晰。当然这只是一种个人的喜好,抑或是我们希望把改造成像Axon那样。

自定义注解

这里我们希望把spring cloud stream改造成一个像Axon那样的风格。因为这也许对于CQRS + ES 框架来说是一种比较理想的开发入口。

像下面这样:

@EnableEventHandling
class MyEventHandler {
    @EventHandler("CustomerCreatedEvent")
            public void handleCustomerEvent(@Payload Event event) {
        // handle the message
    }

    @EventHandler("AccountCreatedEvent")
            public void handleAccountEvent(@Payload Event event) {
        // handle the message
    }
}

我们既然想要上面的样子,那么就得新定义上面的这两个注解。

我们首先来封装一个@EventHandler注解吧:

@StreamListener 
@Target({ElementType.METHOD}) 
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface EventHandler {
    @AliasFor(annotation = StreamListener.class, attribute = "target")
    String value() default "";

    @AliasFor(annotation = StreamListener.class, attribute = "target")
    String target() default Sink.INPUT;

    @AliasFor(annotation = StreamListener.class, attribute = "condition")
    String condition() default "";
}

我们把@StreamListener封装上面的注解内。

现在已经很接近了我们上面想要的样子了:

@EnableBinding
class MyEventHandler{
    @EventHandler(condition="payload.eventType=='CustomerCreatedEvent'")
    public void handleCustomerEvent(@Payload Event event) {
        // handle the message
    }

    @EventHandler(condition="payload.eventType=='AccountCreatedEvent'")
    public void handleAccountEvent(@Payload Event event) {
        // handle the message
    }
}

但,@EnableEventHandling这个注解还没有定义。现在我们来定义这个注解:

我们先来搞一个配置类(可横屏观看,排版效果更好):

@Configuration
public class EventHandlerConfig {
 
    /*
     * 用于允许spring cloud stream binder把事件路由到匹配的方法上的SpEL表达式
     */
    private String eventHandlerSpelPattern = "payload.eventType=='%s'";

    /**
     *  在此bean中自定义processor,然后把eventType属性转成condition表达式
     * @return
     */
    @Bean(name = STREAM_LISTENER_ANNOTATION_BEAN_POST_PROCESSOR_NAME)
    public BeanPostProcessor streamListenerAnnotationBeanPostProcessor() {
        return new StreamListenerAnnotationBeanPostProcessor() {
            @Override
            protected StreamListener postProcessAnnotation(StreamListener originalAnnotation, Method annotatedMethod) {
                Map<String, Object> attributes = new HashMap<>(
                        AnnotationUtils.getAnnotationAttributes(originalAnnotation));
                if (StringUtils.hasText(originalAnnotation.condition())) {
                    String spelExpression = String.format(eventHandlerSpelPattern, originalAnnotation.condition());
                    attributes.put("condition", spelExpression);
                }
                return AnnotationUtils.synthesizeAnnotation(attributes, StreamListener.class, annotatedMethod);
            }
        };
    }
}

然后我们,再新建@EnableEventHandling注解:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EnableBinding
@Import({EventHandlerConfig.class})
public @interface EnableEventHandling {

}

上面的注解我们只是把刚才的那个配置类import即可。

你也许发现了,其实spring boot中的很多类似@EnableXXXX的注解其实都是一个框架预定义好的配置类,然后在@EnableXXXX的中通过@Import注解导入就好了。本质上是一个配置类。

最后我们再把@EventHandler注解修改一下,把condition修改成eventType作为condition的一个别名:

@StreamListener
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface EventHandler {
    /**
     * 方法所订阅的channel的名称
     * @return 绑定目标的名称
     */
    @AliasFor(annotation=StreamListener.class, attribute="condition")
    String value() default "";

    /**
     * 方法所订阅对的channel的名称
     * @return 绑定目标的名称
     */
    @AliasFor(annotation=StreamListener.class, attribute="target")
    String target()  default Sink.INPUT;

    /**
     * 对 condition的封装
     * @return SpEL 的表达式
     */
    @AliasFor(annotation=StreamListener.class, attribute="condition")
    String eventType() default "";
}

总结

通过上面一系列的spring算是“奇技淫巧”我们愣是把spring cloud stream改造成了一个CQRS和EventSourcing那样的事件驱动的全新框架。

@EnableEventHandling
class MyEventHandler{
    @EventHandler("CustomerCreatedEvent")
    public void handleCustomerEvent(@Payload Event event) {
        // handle the message
    }

    @EventHandler("AccountCreatedEvent")
    public void handleAccountEvent(@Payload Event event) {
        // handle the message
    }
}

上面改造的技术核心其实就是利用@EnableXxx的一贯做法,自定义注解。然后import一个configuration。然后configuration类中则实例化并注册一个

自定义BeanPostProcessor到context中。而这个自定义的BeanPostProcessor则是在postProcessAnnotation方法中拦截到使用@Import的当前注解@StreamListener,然后动态把要设置到转化后设置进去,从而实现了改造。

为什么要改造?我们都知道事件驱动的微服务开发框架,一个非常重要的点就是每次都操作和状态转换都是一个事件。而现在的spring cloud stream对这样的频繁而不同类型的事件并不是很友好。通过改造后,开发事件驱动的微服务就变得更加的方便和友好。

本文只是对Spring Cloud Stream的入口做了一个简单的封装,并没有大动任何内部代码。也许你并不喜欢这样的风格。你完全可以使用最新的那种基于SpEL的默认做法。

另外有关CQRS以及Event Sourcing的内容,你可以移步:微服务业务开发三个难题-拆分、事务、查询(上)微服务业务开发三个难题-拆分、事务、查询(下)

原文发布于微信公众号 - ImportSource(importsource)

原文发表时间:2017-04-09

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏coolblog.xyz技术专栏

Spring IOC 容器源码分析系列文章导读

Spring 是一个轻量级的企业级应用开发框架,于 2004 年由 Rod Johnson 发布了 1.0 版本。经过十几年的迭代,现在的 Spring 框架已...

29510
来自专栏Java3y

JDK10都发布了,nio你了解多少?

1909
来自专栏前端儿

Webpack编译结果浅析

如今Webpack已经是一个不可或缺的前端构建工具,借助这个构建工具,我们可以使用比较新的技术(浏览器不能直接支持)来开发。

1192
来自专栏无题

Spring Boot核心原理-自动配置

为什么spring boot能够如此简单的让我们迅速上手。 之前在公司内部推行spring boot时,有同事跟我提到过,感觉换到spring boot这个框...

5574
来自专栏Java工程师日常干货

透彻理解MyBatis设计思想之手写实现

MyBatis,曾经给我的感觉是一个很神奇的东西,我们只需要按照规范写好XXXMapper.xml以及XXXMapper.java接口。要知道我们并没有提供XX...

801
来自专栏battcn

一起来学SpringBoot | 第二十二篇:轻松搞定重复提交(一)

在平时开发中,如果网速比较慢的情况下,用户提交表单后,发现服务器半天都没有响应,那么用户可能会以为是自己没有提交表单,就会再点击提交按钮重复提交表单,我们在开发...

3922
来自专栏郭霖

巧用Android网络通信技术,在网络上直接传输对象

要做一个优秀的Android应用,使用到网络通信技术是必不可少的,很难想象一款没有网络交互的软件最终能发展得多成功。那么我们来看一下,一般Android应用程序...

2306
来自专栏熊二哥

JavaNIO快速入门

NIO是Jdk中非常重要的一个组成部分,基于它的Netty开源框架可以很方便的开发高性能、高可靠性的网络服务器和客户端程序。本文将就其核心基础类型Channel...

1K9
来自专栏Android 研究

APK安装流程详解12——PMS中的新安装流程上(拷贝)

从上面一片文章我们知道InstallAppProgress里面最后更新的代码是调用到PackageManager#installPackageWithVerif...

1531
来自专栏JavaEdge

Redis实践(八)-Sentinal12 主从复制高可用?3 Redis Sentinel 架构4 安装与配置5 安装与演示6 客户端11 三个定时任务12 主观下线和客观下线13 领导者选举14

由于Redis Sentinel只会对主节点进行故障转移,对从节点采取主观的下线,所以需要自定义一个客户端来监控对应的事件

2311

扫码关注云+社区

领取腾讯云代金券