前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于spring event实现消息异步延时队列

基于spring event实现消息异步延时队列

作者头像
木左
发布2024-03-26 10:20:01
1980
发布2024-03-26 10:20:01
举报

一、业务场景

最近做个小项目,基本单体应用就能满足要求。项目虽然小,但是需求可一点都不少,真是麻雀虽小,要求五脏俱全。这里面有个业务场景是需要给相应的人员发送消息通知。

之前做分布式应用都是引入第三方组件mq,单独部署一个消息服务用于接收mq消息并发送对应的通知。现在单体应用也想解耦处理,但是引入组件感觉还得部署mq,多少有些浪费,想着直接内部处理下。

于是乎想到了Spring本身的event。这不也是发个消息,然后监听者收到后进行逻辑处理。但是看了下直接发的话不做任何处理会是同步方式进行处理,而我其实需要得是异步,那就再加个注解@async。这样就达到了异步处理。

二、开搞

最终整体代码结构如下

最开始确实可以达到异步处理,但是在测试的时候发现,日志打印的时候,发送消息的日志还没打印,接收消息得日志都打印出来了,如下图。

于是乎想到能不能延迟下处理,就像组件mq提供的功能一样。

有问题找百度,看看是不是有人已经解决过这个问题了,结果找了半天,就找到类似的,但是做法就是发送的时候自己搞了一个延时队列,到时间后再执行发送,感觉实现也算实现了,但是感觉和我想要的不一样。因为消息处理本身是在线程池里处理的,有个任务队列,自己再搞一个多少有点别扭。还有一种是在监听的方法里线程sleep,这种感觉也不行。我总共开两个线程,那任务堆积不是很严重,影响处理效率。

好吧,既然没有能直接拿来主义的,那就自己研究研究。既然@async本身就是交给了线程池处理,而我使用的是ThreadPoolTaskExecutor,这个线程池不支持延时队列,解决办法就只能是sleep,于是乎我换成了有延时队列的线程池ScheduledThreadPoolExecutor。

那现在就是看@async注解是怎么把任务扔到任务队列里的,找了下源码,打上断点看看。

跟着走,发现最后调用ScheduledThreadPoolExecutor的Submit方法放进去的。如下图,那看到这个方法实际调用的是schedule方法,而传参里默认是不延迟。

找到这里就好说了,我直接继承下ScheduledThreadPoolExecutor类然后用子类覆盖下图里的submit方法,然后直接固定延迟5s。如下图。

调整完测试下,结果如图:

可以看到 真的实现了。真是今个要高兴,咱是老百姓啊,去个厕所放放水,搞这么长时间,都忘记上厕所了。回来接着干。。。

没有压力就是轻松,看着实现的功能准备再试试,结果又想到,这只能固定延迟多少秒,能不能和组件mq一样,交给消息本身自己设置,我这里取到消息内容里得延迟字段,直接设置延迟。

然后就在抽象基类里加了个延时枚举,重启后把断点打在了submit上,如图。

看到这个有点晕,找找有没有我的消息内容吧,然后就找到了。

看是看到了,怎么拿出来呢,看到arg$2有点蒙,这是什么东西,咋拿出来呢。然后看他是怎么构造出来的,一找找到了

是使用Lambda表达式创建的,这咋能拿到啊,继续百度,最后找了好长时间,硬是没找到,就是取不到。正想着放弃呀,反正固定延时目前也行,但是看着这个arg$2总感觉是既熟悉又陌生,实体要是断点出来好像这个就是对象的属性名称,要不直接反射拿下值试试?那就试试吧。结果就写代码试。

结果断点下取到了。

完美解决啦。看看日志。。

解决解决,完美解决。刚要高兴,定睛一看,怎么那个tlog追踪异步不一样啊,重新试了下,还真是不一样,重启后发现是第一次创建线程的时候传入的,后续不会改了。这要是查日志追踪,全是一样的就不好了,百度了下,有人解决了直接拿来。

结果改了这个,处理消息延迟取内容报错了,是被包装了,改了下。

最后再试试。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2024-03-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 木左侃技术人生 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档