专栏首页Coder的技术之路借鉴AQS的CHL思路解决消息多线程消费顺序ACK问题

借鉴AQS的CHL思路解决消息多线程消费顺序ACK问题

文章出自本人公号,欢迎关注,后台提供高并发系列历史文章整理版下载

背景

我们的支付场景下,要求消费的业务消息绝不能丢失,且能充分利用高规格的服务器的性能,比如用线程池对业务消息进行快速处理。有同学可能没太理解这个问题有啥不好处理,让我一步步分析下。

MQ的优势和缺点

MQ是我们在应对高并发场景最常用的一种措施,它可以帮我们对业务解耦、对流程异步化以及削峰填谷的妙用。

但是,由于引入了这一额外的中间件,也增加了系统的复杂度和不稳定因素。

消息可靠性的应对

消息的可靠性保证需要从消息流转的每个环节进行保障,比如生产端的事务型消息,broker的实时刷盘持久化,消费端的手动ACK 。

这里,我们对生产端和存储端的保障措施不作讨论,重点关注消费端的手动ACK机制。

手动ACK的问题

手动ACK可以保证消息一定被消费,但是需要确保手动ACK的顺序和消息顺序一致,为什么?

消息队列之所以性能高处理快,是因为采用了文件顺序读写方式,系统在拉取消息进行消费时,是按顺序文件的offset进行拉取的,如果commit offset的顺序错乱,会使得服务端的消息状态错乱,比如消息重发。

因此,如果我们在本地启动了线程池,对消息进行拉取处理,由于各线程的处理速度不一定一致,所以无法保证各线程处理完之后对各自消息的ACK操作是顺序的,怎么办,难道只能同步拉消费取然后ACK么。

解决方案

最不济,可以提交一批任务,批量等待统一提交。不过总觉得不优雅。

某次看JUC中的AQS的时候,启发了我。

我们平时用的类似CountDownLauch这些并发工具类,不也是处理的多线程协作的问题么。

我们的场景完全没有AQS复杂,借鉴它的思路,应该是没有问题的。

  1. 创建双端队列,队列节点中需要维护自身处理状态state,和对应msg的offset。
  2. 服务从消息中心拉取消息,在提交本地线程池执行之前,先入队列。
  3. 消息消费完之后,通知队列中对应的节点,更新状态为完成。
  4. 队列头被更新后出队列,提交offset,并判断新的队列头的状态,直到遇到state是未完成的head时阻塞。 undefined

方案解析

该方案可以有效利用本地线程的资源,并行的处理,并通过队列和异步通知机制保证最终commit offset时有序。

在最差情况下(即head节点对应的msg最后一个被处理完),相当于等待一批线程处理完成后统一提交。除此之外等待性能都要更优。

异步通知的实现

public class MSGFuture {
    /*全局变量,存放msg对应的future对象*/
    private static final Map<Long, MSGFuture> FUTURES = new ConcurrentHashMap<Long, MSGFuture>();
    /*全局不变唯一标识*/
    private final long id;
    /*最长等待时间*/
    private final int timeout;
    /*并发锁*/
    private final Lock lock = new ReentrantLock();
    /*通知条件*/
    private final Condition done = lock.newCondition();
    /*开始时间*/
    private final long start = System.currentTimeMillis();
    /*业务结果*/
    private volatile Object response;
}
//构造函数
public MSGFuture(Request request, int timeout) {
    /*全局自增ID*/
    this.id = request.getrId();
    /*超时时间*/
    this.timeout = timeout > 0 ? timeout : 1000;
    /*放入全局变量*/
    FUTURES.put(id, this);
}
//业务处理结果更新
public static void received(long id, Object response) {

        MSGFuture future = FUTURES.remove(id);
        if (future != null) {
            future.doReceived(response);
        } else {
            logger.warn("response return timeout,id:"+id);
        }

    }
//结果更新,通知等待条件
private void doReceived(Object res) {
        lock.lock();
        try {
            response = res;
            done.signal();
        } finally {
            lock.unlock();
        }
    }
//异步等待获取结果
public Object get(int timeout) throws TimeoutException {
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException();
            }
        }
        return returnFromResponse();
    }

总结

看到这里,有同学会说,这个和AQS有啥关系呀~

其实,只是处理思路的一种借鉴,比如state状态,比如锁机制和通知等待。既然都是多线程任务协调,那总有相似之处。

总之一句话,别说背八股文没用,多多了解会有大帮助~

本文分享自微信公众号 - Coder的技术之路(gh_1b3189982966),作者:Coder的技术之路

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-04-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 嘘!异步事件这样用真的好么?

    今年年初的时候写了一篇文章 《围观:基于事件机制的内部解耦之心路历程》。这篇文章主要讲的是用 ES 数据异构的场景。程序订阅 Mysql Binlog 的变更,...

    猿天地
  • Java并发编程:什么是JDK内置并发框架AQS

    1995年sun公司发布了第一个Java语言版本,可以说从JDK1.1到JDK1.4期间Java的使用主要是在移动应用和中小型企业应用中。在此类领域中基本不会涉...

    码农架构
  • 理解IM消息“可靠性”和“一致性”问题,以及解决方案探讨

    即时通讯网整理的大量IM技术文章中(见本文末“参考资料”一节),有关消息可靠性和一致性问题的文章占了很大比重,原因是IM这类系统抛开各种眼花缭乱的产品功能和技术...

    JackJiang
  • 理解IM消息“可靠性”和“一致性”问题,以及解决方案探讨

    即时通讯网整理的大量IM技术文章中(见本文末“参考资料”一节),有关消息可靠性和一致性问题的文章占了很大比重,原因是IM这类系统抛开各种眼花缭乱的产品功能和技术...

    JackJiang
  • 使用canal增量订阅MySQL binlog

    【转载请注明出处】:https://cloud.tencent.com/developer/article/1634327

    后端老鸟
  • 随笔——消息队列线程池模型如何保证重启时消息不丢

    这个帖子的意思是:在使用Kafka的时候,我们已经设置了多个分区,如何去提升消费能力?如果使用线程池的方式去提升如何保证重启时消息不丢。

    用户5397975
  • Java程序员“硬闯”阿里之路,已收获offer(附超详细面经)

    本人3年开发经验、18年年底开始跑路找工作,在互联网寒冬下成功拿到阿里巴巴、今日头条、滴滴等公司offer,岗位是Java后端开发,最终选择去了阿里巴巴。

    秃顶的Java程序员
  • Redis实现消息队列的4种方案

    Redis作为内存中的数据结构存储,常用作数据库、缓存和消息代理。它支持数据结构,如 字符串,散列,列表,集合,带有范围查询的排序集(sorted sets),...

    allsmallpig
  • 提升RabbitMQ消费速度的一些实践

    https://blog.bossma.cn/rabbitmq/practices-on-improving-the-speed-of-rabbitmq-con...

    java进阶架构师
  • 一套亿级用户的IM架构技术干货(下篇):可靠性、有序性、弱网优化等

    本文内容和编写思路是基于邓昀泽的“大规模并发IM服务架构设计”、“IM的弱网场景优化”两文的提纲进行的,感谢邓昀泽的无私分享。

    JackJiang
  • 一套亿级用户的IM架构技术干货(下篇):可靠性、有序性、弱网优化等

    本文内容和编写思路是基于邓昀泽的“大规模并发IM服务架构设计”、“IM的弱网场景优化”两文的提纲进行的,感谢邓昀泽的无私分享。

    JackJiang
  • 消息队列的可靠性

    生产者: rabbitMQ支持事务,可以在发送中进行捕获异常,如果出现未接受异常进行回滚操作。

    黑白格
  • 如果面试官再问你消息队列,就把这篇甩给他!

    A 系统产生了一个比较关键的数据,很多系统需要 A 系统将数据发过来,强耦合(B,C,D,E 系统可能参数不一样、一会需要一会不需要数据,A 系统要不断修改代码...

    Bug开发工程师
  • 千亿级金融场景下,基于Pulsar的云原生消息队列有怎样的表现?

    导语 :云原生场景,多语言、多种协议兼容,任意多的消息 Topic、任意多的消费者,性能的按需快速扩展成为消息队列基本的要求。本文是对腾讯TEG技术委员会专家工...

    腾讯云中间件团队
  • 千亿级金融场景下,基于Pulsar的云原生消息队列有怎样的表现?

    腾讯计费是孵化于支撑腾讯内部业务千亿级营收的互联网计费平台,其核心是帮助用户与产品,安全、便捷的完成支付和收款,在交易过程中帮助产品盈收实现最大化。

    腾小云
  • 一篇并不起眼的Kafka面试题

    为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发...

    王知无-import_bigdata
  • 爬虫架构|利用Kafka处理数据推送问题(2)

    黄小怪
  • 分布式消息队列

    ? 作者:vincentchma,腾讯 IEG 后台开发工程师 一、消息队列的演进 分布式消息队列中间件是是大型分布式系统中常见的中间件。消息队列主要解决应用...

    腾讯技术工程官方号
  • 消息中间件系列第3讲:使用消息队列需要考虑的几个问题

    放在消息队列中,消息幂等性的意思是:一条完全一样的消息,它消息一次和消费无数次的结果是一样的。

    陈树义

扫码关注云+社区

领取腾讯云代金券