专栏首页猿天地嘘!异步事件这样用真的好么?

嘘!异步事件这样用真的好么?

故事背景

今年年初的时候写了一篇文章 《围观:基于事件机制的内部解耦之心路历程》。这篇文章主要讲的是用 ES 数据异构的场景。程序订阅 Mysql Binlog 的变更,然后程序内部使用 Spring Event 来分发具体的事件,因为一个表的数据变更可能会需要更新多个 ES 索引。

为了方便大家理解我把之前方案的图片复制过来了,如下:

上图的方案存在一个问题,就是我们今天文章要聊的内容。

这个问题就是当 MQ Consumer 收到消息后,就直接发布 Event 了,如果是同步的,没有问题。如果某个 EventListener 中处理失败了,那么这条消息将不会 ACK。

如果是异步发布 Event 的场景,发布完消息马上就 ACK 了。就算某个 EventListener 中处理失败了,MQ 也感知不到,不会进行消息的重新投递,这就是存在的问题。

解决方案

方案一

既然消息已经 ACK 了,那就不利用 MQ 的重试功能了,使用方自己重试是不是也可以呢?

可肯定是可以的,内部处理是否成功肯定是可以知道的,如果处理失败了可以默认重试,或者有一定策略的重试。实在不行还可以落库,保存记录。

这样的问题在于太烦了呀,每个使用的地方都要去做这件事情,而且对于未来接手你代码的程序小哥哥来说,这很有可能让小哥哥头发慢慢脱落啊。。。。

脱落不要紧,关键他还不知道要做这个处理,说不定哪天就背锅了,惨兮兮。。。。

方案二

要保证消息和业务处理的一致性,就不能立马进行 ACK 操作。而是要等业务处理完成后再决定是否要 ACK。

如果有处理失败的就不应该 ACK,这样就能复用 MQ 的重试机制了。

分析下来,这就是一个典型的异步转同步的场景。像 Dubbo 中也有这个场景,所以我们可以借鉴 Dubbo 中的实现思路。

创建一个 DefaultFuture 用于同步等待获取任务执行结果。然后在 MQ 消费的地方使用 DefaultFuture。

@Service
@RocketMQMessageListener(topic = "${rocketmq.topic.data_change}", consumerGroup = "${rocketmq.group.data_change_consumer}")
public class DataChangeConsume implements RocketMQListener<DataChangeRequest> {
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private CustomApplicationContextAware customApplicationContextAware;
    @Override
    public void onMessage(DataChangeRequest dataChangeRequest) {
        log.info("received message {} , Thread {}", dataChangeRequest, Thread.currentThread().getName());
        DataChangeEvent event = new DataChangeEvent(this);
        event.setChangeType(dataChangeRequest.getChangeType());
        event.setTable(dataChangeRequest.getTable());
        event.setMessageId(dataChangeRequest.getMessageId());
        DefaultFuture defaultFuture = DefaultFuture.newFuture(dataChangeRequest, customApplicationContextAware.getTaskCount(), 6000 * 10);
        applicationContext.publishEvent(event);
        Boolean result = defaultFuture.get();
        log.info("MessageId {} 处理结果 {}", dataChangeRequest.getMessageId(), result);
        if (!result) {
            throw new RuntimeException("处理失败,不进行消息ACK,等待下次重试");
        }
    }
}

newFuture() 会传入事件参数,超时时间,任务数量几个参数。任务数量是用于判断所有 EventListener 是否全部执行完成。

defaultFuture.get(); 这不就会阻塞,等待所有任务执行完成才会返回结果,如果所有业务都处理成功了,那么会返回 true,流程结束,消息自动 ACK。

如果返回了 false 证明有处理失败的或者超时的,就不需要 ACK 了,抛出异常等待重试。

public Boolean get() {
    if (isDone()) {
        return true;
    }
    long start = System.currentTimeMillis();
    lock.lock();
    try {
        while (!isDone()) {
            done.await(timeout, TimeUnit.MILLISECONDS);
            // 有失败的任务反馈
            if (!isSuccessDone()) {
                return false;
            }
            // 全部执行成功
            if (isDone()) {
                return true;
            }
            // 超时
            if (System.currentTimeMillis() - start > timeout) {
                return false;
            }
        }
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        lock.unlock();
    }
    return true;
}

isDone() 会判断反馈结果了的任务数量跟总数量是否一致,如果一直就说明全部执行完成了。

public boolean isDone() {
    return feedbackResultCount.get() == taskCount;
}

那么任务执行完了怎么反馈呢? 不可能让每个使用的方法去关心,所以我们定义了一个切面来做这件事情。

@Aspect
@Component
public class EventListenerAspect {
    @Around(value = "@annotation(eventListener)")
    public Object aroundAdvice(ProceedingJoinPoint joinpoint, EventListener eventListener) throws Throwable {
      DataChangeEvent event = null;
      boolean executeResult = true;
       try {
         event = (DataChangeEvent)joinpoint.getArgs()[0];
         Object result = joinpoint.proceed();
         return result;
      } catch (Exception e) {
         executeResult = false;
          throw e;
      } finally {
         DefaultFuture.received(event.getMessageId(), executeResult);
      }
    }
}

通过 DefaultFuture.received() 反馈执行结果。

public static void received(String id, boolean result) {
    DefaultFuture future = FUTURES.get(id);
    if (future != null) {
        // 累加失败任务数量
        if (!result) {
            future.feedbackFailResultCount.incrementAndGet();
        }
        // 累加执行完成任务数量
        future.feedbackResultCount.incrementAndGet();
        if (future.isDone()) {
            FUTURES.remove(id);
            future.doReceived();
        }
    }
}
private void doReceived() {
    lock.lock();
    try {
        if (done != null) {
            // 唤醒阻塞的线程
            done.signal();
        }
    } finally {
        lock.unlock();
    }
}

下面我们来总结整个流程:

  • 收到 MQ 消息,组装成 DefaultFuture,通过 get 方法获取执行结果,未执行完的时候此方法阻塞。
  • 通过切面切入加了 EventListener 的方法,判断是否有异常来判断任务的执行结果。
  • 通过 DefaultFuture.received() 反馈结果。
  • 反馈时计算是否全部完成,全部完成则唤醒阻塞的线程。DefaultFuture.get()就能获取到结果。
  • 是否要进行 ACK 操作。

需要注意的是每个 EventListener 内部消费的逻辑都要做幂等控制。

源码地址:https://github.com/yinjihuan/kitty-cloud/tree/master/kitty-cloud-mqconsume[1]

关于作者:尹吉欢,简单的技术爱好者,《Spring Cloud 微服务-全栈技术与案例解析》, 《Spring Cloud 微服务 入门 实战与进阶》作者, 公众号 猿天地 发起人。个人微信 jihuan900,欢迎勾搭。

参考资料

[1]

kitty-cloud-mqconsume: https://github.com/yinjihuan/kitty-cloud/tree/master/kitty-cloud-mqconsume

本文分享自微信公众号 - 猿天地(cxytiandi),作者:尹吉欢

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-06-29

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spring Cloud Gateway重试机制

    重试,我相信大家并不陌生。在我们调用Http接口的时候,总会因为某种原因调用失败,这个时候我们可以通过重试的方式,来重新请求接口。

    猿天地
  • 从Java小白到收获BAT等offer,分享我这两年的经验和感悟

    常想,人生最有趣莫过于前路未知。于是我常常羡慕那些个“金梁古温”笔下随遇而安、随性而为、随缘而爱的浪子们。比如陆小凤,比如叶开。

    猿天地
  • 老大难的GC原理及调优,这下全说清楚了

    本文介绍 GC 基础原理和理论,GC 调优方法思路和方法,基于 Hotspot jdk1.8,学习之后你将了解如何对生产系统出现的 GC 问题进行排查解决。

    猿天地
  • GB2312、GBK、GB18030 这几种字符集的主要区别是什么?

    1 GB2312-80 GB 2312 或 GB 2312-80 是中国国家标准简体中文字符集,全称《信息交换用汉字编码字符集·基本集》,又称 GB 0,...

    henrylee2cn
  • 机器学习入门 7-5 高维数据映射为低维数据

    我们此时有一个m行n列的样本矩阵X,此时的X样本矩阵代表有m个样本n个特征。通过前面的关于主成分的学习,此时假设我们已经求出针对X样本矩阵来说前k个主成分,每一...

    触摸壹缕阳光
  • python + selenium + unittest实现简单的UI自动化

    使用的版本是python 3,其中HTMLTestRunner是修改版本,参考以下博客并下载 https://blog.csdn.net/zhanin123/a...

    未来sky
  • 设置 java -jar 的进程显示名称

    有时候我们会用 nohup java -jar xxx.jar来将一些可执行的java application挂在后台,类似windows服务一样来运行。但是有...

    菩提树下的杨过
  • 10 分钟带你了解最常见的 DevOps 工具

    DevOps工具越来越多,了解它们以及知道在什么时候使用他们越来越重要。因此,我尝试做一些研究,以便我们可以将DevOps产品分类为大家都熟悉的类别或用途。

    DevOps时代
  • 领域驱动设计(DDD)实践之路(三):如何设计聚合

    这是“领域驱动设计实践之路”系列的第三篇文章,分析了如何设计聚合。聚合这个概念看似很简单,实际上有很多因素导致我们建立不正确的聚合模型。本文对这些问题逐一进行剖...

    2020labs小助手
  • 百道Python入门级练习题(新手友好)

    【输入形式】 一行,供24个整数。以先行后列顺序输入第一个矩阵,而后输入第二个矩阵。

    py3study

扫码关注云+社区

领取腾讯云代金券