前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >借鉴AQS的CHL思路解决消息多线程消费顺序ACK问题

借鉴AQS的CHL思路解决消息多线程消费顺序ACK问题

作者头像
Coder的技术之路
修改2021-05-17 17:24:23
4340
修改2021-05-17 17:24:23
举报
文章被收录于专栏:Coder的技术之路Coder的技术之路
文章出自本人公号,欢迎关注,后台提供高并发系列历史文章整理版下载
文章出自本人公号,欢迎关注,后台提供高并发系列历史文章整理版下载

背景

我们的支付场景下,要求消费的业务消息绝不能丢失,且能充分利用高规格的服务器的性能,比如用线程池对业务消息进行快速处理。有同学可能没太理解这个问题有啥不好处理,让我一步步分析下。

MQ的优势和缺点

MQ是我们在应对高并发场景最常用的一种措施,它可以帮我们对业务解耦、对流程异步化以及削峰填谷的妙用。

但是,由于引入了这一额外的中间件,也增加了系统的复杂度和不稳定因素。

消息可靠性的应对

消息的可靠性保证需要从消息流转的每个环节进行保障,比如生产端的事务型消息,broker的实时刷盘持久化,消费端的手动ACK 。

这里,我们对生产端和存储端的保障措施不作讨论,重点关注消费端的手动ACK机制。

手动ACK的问题

手动ACK可以保证消息一定被消费,但是需要确保手动ACK的顺序和消息顺序一致,为什么?

消息队列之所以性能高处理快,是因为采用了文件顺序读写方式,系统在拉取消息进行消费时,是按顺序文件的offset进行拉取的,如果commit offset的顺序错乱,会使得服务端的消息状态错乱,比如消息重发。

因此,如果我们在本地启动了线程池,对消息进行拉取处理,由于各线程的处理速度不一定一致,所以无法保证各线程处理完之后对各自消息的ACK操作是顺序的,怎么办,难道只能同步拉消费取然后ACK么。

解决方案

最不济,可以提交一批任务,批量等待统一提交。不过总觉得不优雅。

某次看JUC中的AQS的时候,启发了我。

我们平时用的类似CountDownLauch这些并发工具类,不也是处理的多线程协作的问题么。

我们的场景完全没有AQS复杂,借鉴它的思路,应该是没有问题的。

  1. 创建双端队列,队列节点中需要维护自身处理状态state,和对应msg的offset。
  2. 服务从消息中心拉取消息,在提交本地线程池执行之前,先入队列。
  3. 消息消费完之后,通知队列中对应的节点,更新状态为完成。
  4. 队列头被更新后出队列,提交offset,并判断新的队列头的状态,直到遇到state是未完成的head时阻塞。 undefined

方案解析

该方案可以有效利用本地线程的资源,并行的处理,并通过队列和异步通知机制保证最终commit offset时有序。

在最差情况下(即head节点对应的msg最后一个被处理完),相当于等待一批线程处理完成后统一提交。除此之外等待性能都要更优。

异步通知的实现

代码语言:javascript
复制
public class MSGFuture {
    /*全局变量,存放msg对应的future对象*/
    private static final Map<Long, MSGFuture> FUTURES = new ConcurrentHashMap<Long, MSGFuture>();
    /*全局不变唯一标识*/
    private final long id;
    /*最长等待时间*/
    private final int timeout;
    /*并发锁*/
    private final Lock lock = new ReentrantLock();
    /*通知条件*/
    private final Condition done = lock.newCondition();
    /*开始时间*/
    private final long start = System.currentTimeMillis();
    /*业务结果*/
    private volatile Object response;
}
代码语言:javascript
复制
//构造函数
public MSGFuture(Request request, int timeout) {
    /*全局自增ID*/
    this.id = request.getrId();
    /*超时时间*/
    this.timeout = timeout > 0 ? timeout : 1000;
    /*放入全局变量*/
    FUTURES.put(id, this);
}
代码语言:javascript
复制
//业务处理结果更新
public static void received(long id, Object response) {

        MSGFuture future = FUTURES.remove(id);
        if (future != null) {
            future.doReceived(response);
        } else {
            logger.warn("response return timeout,id:"+id);
        }

    }
代码语言:javascript
复制
//结果更新,通知等待条件
private void doReceived(Object res) {
        lock.lock();
        try {
            response = res;
            done.signal();
        } finally {
            lock.unlock();
        }
    }
代码语言:javascript
复制
//异步等待获取结果
public Object get(int timeout) throws TimeoutException {
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException();
            }
        }
        return returnFromResponse();
    }

总结

看到这里,有同学会说,这个和AQS有啥关系呀~

其实,只是处理思路的一种借鉴,比如state状态,比如锁机制和通知等待。既然都是多线程任务协调,那总有相似之处。

总之一句话,别说背八股文没用,多多了解会有大帮助~

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Coder的技术之路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • MQ的优势和缺点
  • 消息可靠性的应对
  • 手动ACK的问题
  • 解决方案
  • 方案解析
  • 异步通知的实现
  • 总结
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档