专栏首页赵KK日常技术记录死信队列的消息处理方案

死信队列的消息处理方案

昨天在处理死信队列消息时,发生了很多疑问,但是实际方案还未实现,一一记录解答。

1.死信队列出现的原因

跟预想的什么事务啊,重试啊,宕机啊没dei关系

Cannot display ObjectMessage body. Reason: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: xxx

应该是处理此条消息的时候,实体类未序列化?然后我重试下,将实体类序列化去掉,这在运行时会直接异常的,目前原因不详。

2.如何处理死信队列中的消息?

这个监听的思路是对的,就是实施有点问题,总是监听不到

1:人工处理(太累)

2:定时任务(太耗性能)

3:监听死信队列

4:死信队列写库

另外处理消息时,会发生与预想结果不一致,业务是点赞/取消点赞,如果原本目的是取消点赞,但操作失败redis是有的,进入死信队列数据库是没数据的,我在此期间对这条数据进行了点赞,然后又取消了,那如果此时我处理这条消息,会进行点赞,与原本的目的不一致

3.监听+时间

创建一个监听器,监听死信队列ActiveMQ.DLQ队列是否有消息,有消息就进行消费。每次mq入队前标识一个时间戳,取出死信队列的消息,与当前库里的操作时间对比,如果最后一条记录的时间大于此条消息时间不予处理,否则进行消息补偿。redis+mq+mysql进行数据同步时同理

4.redis+mq并发1万会产生消息积压吗?

不会,产生积压的原因是业务系统不再监控某队列,即便是1万并发同事请求,肯定会发生队列排队消费,但不会发生积压,另外如出现此情况,需要短信报警,并手动删除或脚本删除此队列。

最高等待队列数

5.一个业务一个队列,无用队列怎么处理?

目前接触的业务,每个业务都需要自定义队列名,有的队列等待,有的始终没处理业务,此时可自定义关闭监测时间内不工作的队列,如需要时再开启,以此减少其他队列的压力。

配置可看下activemq.xml的47行

constantPendingMessageLimitStrategy用于防止
慢话题消费者阻碍生产者和影响其他消费者
通过限制保留的消息数
<destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
<!-- 在这里加上schedulePeriodForDestinationPurge属性。-->
<broker xmlns="http://activemq.apache.org/schema/core" schedulePeriodForDestinationPurge="10000"
    <destinationPolicy>
        <policyMap>
            <policyEntries>
                <!-- 在这里加上gcInactiveDestinations和inactiveTimoutBeforeGC两个属性 -->
                <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/>
            </policyEntries>
        </policyMap>
    </destinationPolicy>
</broker>

6.为什么预想3万次的任务执行,结果不一致?

为了测试业务是否会出现频繁取消确认出现不一致的情况,单接口一万次,测了3次,目前一共执行了3次,第一次告8552,第二次,第三次是成功的,按理说一共是28552次,但结果是28527,理想是3万次,在jmeter的结果树种分析无错误日志

原因不晓得。勾选Scroll无用。

这个队列加时间跟

如何解决redis的并发竞争key问题相似,处理方案也是相似

方案仅供参考。

本文分享自微信公众号 - 赵KK日常技术记录(gh_cc4c9f1a9521),作者:赵kk

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-09-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 消息中间件MQ之ActiveMQ学习总结(中)

    昨天简单总结了看资料了解的MQ基本知识点,实际操作以后,会有地方与预想不一致,多次试验后解决,为了加强记忆,需要根据官网文档继续整理,并尽可能实...

    疯狂的KK
  • mq监听死信队列后如何处理

    昨天试了半天为啥监听不到死信队列的消息,原因是打开方式不对,还有死信队列就一条消息,没意思。

    疯狂的KK
  • 彻底解决分布式系统一致性问题整理(下)

    其实个人理解的时候,更希望能够得到代码层面的实现,单纯的理论知识还是不够落地,总结容易,真正实现起来还是需要项目的积累。

    疯狂的KK
  • 解读Java阻塞队列BlockingQueue的实现

    上篇文章我们介绍了队列的基类接口Queue它定义了所有实现队列的类必须拥有的方法行为而BlockingQueue阻塞队列接口继承了Queue接口,此外Block...

    我是攻城师
  • 【数据结构(C语言版)系列三】 队列

    队列是一种先进先出的线性表,它只允许在表的一端进行插入,而在另一端删除元素。这和我们日常生活中的排队是一致的,最早进入队列的元素最早离开。在队列中,允许插入的一...

    闪电gogogo
  • AI_第一部分 数据结构与算法(8.队列)

    第四阶段我们进行深度学习(AI),本部分(第一部分)主要是对底层的数据结构与算法部分进行详尽的讲解,通过本部分的学习主要达到以下两方面的效果:

    还是牛6504957
  • 三分钟基础:什么是队列?

    像线程池、异步队列、消息队列等有限的资源容器中,往往存储大量的任务事件,这些大量的任务事件需要进行有条理的进行任务分发以及各种情况处理,为了能够使得资源容器的正...

    帅地
  • Java队列学习第一篇之列介绍

    队列大家都知道,但是在Java中队列分哪几种呢?清楚吗?都有哪些地方用到了队列呢?最常用的场景的就是消息中间件,比如各种MQ都是使用的队列来的。如果没有用过消息...

    凯哥Java
  • 算法与数据结构(二) 栈与队列的线性和链式表示(Swift版)

    数据结构中的栈与队列还是经常使用的,栈与队列其实就是线性表的一种应用。因为线性队列分为顺序存储和链式存储,所以栈可以分为链栈和顺序栈,队列也可分为顺序队列和链队...

    lizelu
  • 栈与队列:总结篇!

    相信不仅仅是C++中有这些问题,那么大家使用其他编程语言,也可以考虑一下这四个问题,栈和队列是如何实现的。

    代码随想录

扫码关注云+社区

领取腾讯云代金券