前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >针对事件驱动架构的Spring Cloud Stream

针对事件驱动架构的Spring Cloud Stream

作者头像
ImportSource
发布2018-04-03 13:11:52
1.6K0
发布2018-04-03 13:11:52
举报
文章被收录于专栏:ImportSourceImportSource

今天我们要分享一个比较有意思的内容。就是如何通过spring cloud 的stream来改造一个微服务下事件驱动的框架。

为什么要改造?我们都知道事件驱动的微服务开发框架,一个非常重要的点就是每次的操作和状态转换都是一个事件。而现在的spring cloud stream对这样的频繁而不同类型的事件并不是很友好。本文希望通过改造让cloud stream变成一个对事件驱动的微服务开发更友好更方便的事件驱动框架。

准备工作

我们还是通过spring initializr来新建一个项目吧:

如上,我们引入了web、stream kafka依赖。

然后生成项目并下载,打开项目开始我们的改造之旅吧。

然后我们来看看现在的spring cloud的版本:

代码语言:javascript
复制
<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-dependencies</artifactId>
         <version>Camden.SR6</version>
         <type>pom</type>
         <scope>import</scope>
      </dependency>
   </dependencies>
</dependencyManagement>

现在的版本是Camden.SR6。而我们今天要演示的stream是最新版本,所以我们得把cloud版本修改为Brixton.SR7:

代码语言:javascript
复制
<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-dependencies</artifactId>
         <version>Brixton.SR7</version>
         <type>pom</type>
         <scope>import</scope>
      </dependency>
   </dependencies>
</dependencyManagement>

你也许会在一些事件源框架中,比如Axon中,看到以下类似代码:

代码语言:javascript
复制
public class MyEventHandler {
   @EventHandler
   public void handle(CustomerCreatedEvent event) {
   ...
   }

   @EventHandler
   public void handle(AccountCreatedEvent event) {
   ...
   }
}

没错,这其实就是事件源框架中最终所呈现出来的入口最核心的样子。

现在我们对spring cloud stream进行改造,让它变成一个真正的或者说像Axon那样的一个事件源框架。

Cloud Stream 现有处理事件的做法

在开始真正的改造之前,我们还是先看看spring cloud stream 1.1.2(也就是cloud版本为Camden.SR中的stream版本) 中的消息处理的基本样子:

代码语言:javascript
复制
@StreamListener(Sink.INPUT)
public void handle(Foo foo){
   ...
}

没错,就是一个通过@StreamListener注解的handle方法,就可监听到消息流了。

然后我们看看使用最新的Brixton.SR7版本spring cloud stream的样子:

代码语言:javascript
复制
@EnableBinding
class MyEventHandler {
    @StreamListener
    target=Sink.INPUT,condition="payload.eventType=='CustomerCreatedEvent'")

    public void handleCustomerEvent(@Payload Event event) {
        // handle the message</span>
    }

    @StreamListener
    target=Sink.INPUT,condition="payload.eventType=='AccountCreatedEvent'")

    public void handleAccountEvent(@Payload Event event) {
        // handle the message</span>
    }
}

通过上面的代码,我们知道spring cloud stream可以支持配置一个condition的属性来让不同的事件类型路由到不同的handle方法中来处理。其中condition里边的表达式叫做SpEL,就是spring 表达式,通过返回true或false来表示是否匹配。

另外上面的支持是4天前才发布的。也许就是为了支持最近炒得火热的CQRS+ES而发布的。

之前@StreamListener的源码是这样的:

代码语言:javascript
复制
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
public @interface StreamListener {

   /**
    * The name of the binding target (e.g. channel) that the method subscribes to.
    */
   String value() default "";

}

发现没?只有一个value方法属性。

而最新的已经增加了condition条件。这显然是为了支持事件驱动的微服务开发而支持的。

我们点进去看看StreamListener新增加了什么:

发现新增了两个方法属性,一个是target,一个是condition。

而且描述也变成了含有“事件驱动”字样。

ok,现在我们已经知道了spring cloud stream的基本用法和代码样子。

最新版的做法已经算是一种不错的改进了。不过,从编程的语法上,它也许并没有我们想要的那么清晰。当然这只是一种个人的喜好,抑或是我们希望把改造成像Axon那样。

自定义注解

这里我们希望把spring cloud stream改造成一个像Axon那样的风格。因为这也许对于CQRS + ES 框架来说是一种比较理想的开发入口。

像下面这样:

代码语言:javascript
复制
@EnableEventHandling
class MyEventHandler {
    @EventHandler("CustomerCreatedEvent")
            public void handleCustomerEvent(@Payload Event event) {
        // handle the message
    }

    @EventHandler("AccountCreatedEvent")
            public void handleAccountEvent(@Payload Event event) {
        // handle the message
    }
}

我们既然想要上面的样子,那么就得新定义上面的这两个注解。

我们首先来封装一个@EventHandler注解吧:

代码语言:javascript
复制
@StreamListener 
@Target({ElementType.METHOD}) 
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface EventHandler {
    @AliasFor(annotation = StreamListener.class, attribute = "target")
    String value() default "";

    @AliasFor(annotation = StreamListener.class, attribute = "target")
    String target() default Sink.INPUT;

    @AliasFor(annotation = StreamListener.class, attribute = "condition")
    String condition() default "";
}

我们把@StreamListener封装上面的注解内。

现在已经很接近了我们上面想要的样子了:

代码语言:javascript
复制
@EnableBinding
class MyEventHandler{
    @EventHandler(condition="payload.eventType=='CustomerCreatedEvent'")
    public void handleCustomerEvent(@Payload Event event) {
        // handle the message
    }

    @EventHandler(condition="payload.eventType=='AccountCreatedEvent'")
    public void handleAccountEvent(@Payload Event event) {
        // handle the message
    }
}

但,@EnableEventHandling这个注解还没有定义。现在我们来定义这个注解:

我们先来搞一个配置类(可横屏观看,排版效果更好):

代码语言:javascript
复制
@Configuration
public class EventHandlerConfig {
 
    /*
     * 用于允许spring cloud stream binder把事件路由到匹配的方法上的SpEL表达式
     */
    private String eventHandlerSpelPattern = "payload.eventType=='%s'";

    /**
     *  在此bean中自定义processor,然后把eventType属性转成condition表达式
     * @return
     */
    @Bean(name = STREAM_LISTENER_ANNOTATION_BEAN_POST_PROCESSOR_NAME)
    public BeanPostProcessor streamListenerAnnotationBeanPostProcessor() {
        return new StreamListenerAnnotationBeanPostProcessor() {
            @Override
            protected StreamListener postProcessAnnotation(StreamListener originalAnnotation, Method annotatedMethod) {
                Map<String, Object> attributes = new HashMap<>(
                        AnnotationUtils.getAnnotationAttributes(originalAnnotation));
                if (StringUtils.hasText(originalAnnotation.condition())) {
                    String spelExpression = String.format(eventHandlerSpelPattern, originalAnnotation.condition());
                    attributes.put("condition", spelExpression);
                }
                return AnnotationUtils.synthesizeAnnotation(attributes, StreamListener.class, annotatedMethod);
            }
        };
    }
}

然后我们,再新建@EnableEventHandling注解:

代码语言:javascript
复制
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EnableBinding
@Import({EventHandlerConfig.class})
public @interface EnableEventHandling {

}

上面的注解我们只是把刚才的那个配置类import即可。

你也许发现了,其实spring boot中的很多类似@EnableXXXX的注解其实都是一个框架预定义好的配置类,然后在@EnableXXXX的中通过@Import注解导入就好了。本质上是一个配置类。

最后我们再把@EventHandler注解修改一下,把condition修改成eventType作为condition的一个别名:

代码语言:javascript
复制
@StreamListener
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface EventHandler {
    /**
     * 方法所订阅的channel的名称
     * @return 绑定目标的名称
     */
    @AliasFor(annotation=StreamListener.class, attribute="condition")
    String value() default "";

    /**
     * 方法所订阅对的channel的名称
     * @return 绑定目标的名称
     */
    @AliasFor(annotation=StreamListener.class, attribute="target")
    String target()  default Sink.INPUT;

    /**
     * 对 condition的封装
     * @return SpEL 的表达式
     */
    @AliasFor(annotation=StreamListener.class, attribute="condition")
    String eventType() default "";
}

总结

通过上面一系列的spring算是“奇技淫巧”我们愣是把spring cloud stream改造成了一个CQRS和EventSourcing那样的事件驱动的全新框架。

代码语言:javascript
复制
@EnableEventHandling
class MyEventHandler{
    @EventHandler("CustomerCreatedEvent")
    public void handleCustomerEvent(@Payload Event event) {
        // handle the message
    }

    @EventHandler("AccountCreatedEvent")
    public void handleAccountEvent(@Payload Event event) {
        // handle the message
    }
}

上面改造的技术核心其实就是利用@EnableXxx的一贯做法,自定义注解。然后import一个configuration。然后configuration类中则实例化并注册一个

自定义BeanPostProcessor到context中。而这个自定义的BeanPostProcessor则是在postProcessAnnotation方法中拦截到使用@Import的当前注解@StreamListener,然后动态把要设置到转化后设置进去,从而实现了改造。

为什么要改造?我们都知道事件驱动的微服务开发框架,一个非常重要的点就是每次都操作和状态转换都是一个事件。而现在的spring cloud stream对这样的频繁而不同类型的事件并不是很友好。通过改造后,开发事件驱动的微服务就变得更加的方便和友好。

本文只是对Spring Cloud Stream的入口做了一个简单的封装,并没有大动任何内部代码。也许你并不喜欢这样的风格。你完全可以使用最新的那种基于SpEL的默认做法。

另外有关CQRS以及Event Sourcing的内容,你可以移步:微服务业务开发三个难题-拆分、事务、查询(上)微服务业务开发三个难题-拆分、事务、查询(下)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-04-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 ImportSource 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档