一、业务场景
最近做个小项目,基本单体应用就能满足要求。项目虽然小,但是需求可一点都不少,真是麻雀虽小,要求五脏俱全。这里面有个业务场景是需要给相应的人员发送消息通知。
之前做分布式应用都是引入第三方组件mq,单独部署一个消息服务用于接收mq消息并发送对应的通知。现在单体应用也想解耦处理,但是引入组件感觉还得部署mq,多少有些浪费,想着直接内部处理下。
于是乎想到了Spring本身的event。这不也是发个消息,然后监听者收到后进行逻辑处理。但是看了下直接发的话不做任何处理会是同步方式进行处理,而我其实需要得是异步,那就再加个注解@async。这样就达到了异步处理。
二、开搞
最终整体代码结构如下
最开始确实可以达到异步处理,但是在测试的时候发现,日志打印的时候,发送消息的日志还没打印,接收消息得日志都打印出来了,如下图。
于是乎想到能不能延迟下处理,就像组件mq提供的功能一样。
有问题找百度,看看是不是有人已经解决过这个问题了,结果找了半天,就找到类似的,但是做法就是发送的时候自己搞了一个延时队列,到时间后再执行发送,感觉实现也算实现了,但是感觉和我想要的不一样。因为消息处理本身是在线程池里处理的,有个任务队列,自己再搞一个多少有点别扭。还有一种是在监听的方法里线程sleep,这种感觉也不行。我总共开两个线程,那任务堆积不是很严重,影响处理效率。
好吧,既然没有能直接拿来主义的,那就自己研究研究。既然@async本身就是交给了线程池处理,而我使用的是ThreadPoolTaskExecutor,这个线程池不支持延时队列,解决办法就只能是sleep,于是乎我换成了有延时队列的线程池ScheduledThreadPoolExecutor。
那现在就是看@async注解是怎么把任务扔到任务队列里的,找了下源码,打上断点看看。
跟着走,发现最后调用ScheduledThreadPoolExecutor的Submit方法放进去的。如下图,那看到这个方法实际调用的是schedule方法,而传参里默认是不延迟。
找到这里就好说了,我直接继承下ScheduledThreadPoolExecutor类然后用子类覆盖下图里的submit方法,然后直接固定延迟5s。如下图。
调整完测试下,结果如图:
可以看到 真的实现了。真是今个要高兴,咱是老百姓啊,去个厕所放放水,搞这么长时间,都忘记上厕所了。回来接着干。。。
没有压力就是轻松,看着实现的功能准备再试试,结果又想到,这只能固定延迟多少秒,能不能和组件mq一样,交给消息本身自己设置,我这里取到消息内容里得延迟字段,直接设置延迟。
然后就在抽象基类里加了个延时枚举,重启后把断点打在了submit上,如图。
看到这个有点晕,找找有没有我的消息内容吧,然后就找到了。
看是看到了,怎么拿出来呢,看到arg$2有点蒙,这是什么东西,咋拿出来呢。然后看他是怎么构造出来的,一找找到了
是使用Lambda表达式创建的,这咋能拿到啊,继续百度,最后找了好长时间,硬是没找到,就是取不到。正想着放弃呀,反正固定延时目前也行,但是看着这个arg$2总感觉是既熟悉又陌生,实体要是断点出来好像这个就是对象的属性名称,要不直接反射拿下值试试?那就试试吧。结果就写代码试。
结果断点下取到了。
完美解决啦。看看日志。。
解决解决,完美解决。刚要高兴,定睛一看,怎么那个tlog追踪异步不一样啊,重新试了下,还真是不一样,重启后发现是第一次创建线程的时候传入的,后续不会改了。这要是查日志追踪,全是一样的就不好了,百度了下,有人解决了直接拿来。
结果改了这个,处理消息延迟取内容报错了,是被包装了,改了下。
最后再试试。