专栏首页后端技术KafkaConsumer RequestFuture异步流程控制

KafkaConsumer RequestFuture异步流程控制

org.apache.kafka.clients.consumer.internals包内的RequestFuture类可用来定义异步流程,常用的addListener、compose作用如图所示:

addListener

addListener可以使一个流程添加到RequestFuture后

compose

compose利用addListener,使其挂在RequestFuture<T>完成后的流程上。同时返回一个新创建的RequestFuture<S>。

  • 用户需要实现RequestFutureAdapter的接口onSucess/onFailure,进一步加工这个异步流程。当onSucess/onFailure被调用时,上游流程已经完成,那么在你实现的方法中:
    • 既可以选择完成RequestFuture<S>
    • 又可以在RequestFuture<S>前增添其它的异步流程。
    • 我们将在下文举例说明这两种用法。
    public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
        // 创建了一个RequestFuture<S>并在方法结束时返回,但并没有调用其complete或raise方法。
        final RequestFuture<S> adapted = new RequestFuture<>();
        addListener(new RequestFutureListener<T>() {
            @Override
            public void onSuccess(T value) {
                adapter.onSuccess(value, adapted);  // 在用户实现的onSuccess中,可以完成adapted,也可以为它添加前置流程
            }

            @Override
            public void onFailure(RuntimeException e) {
                adapter.onFailure(e, adapted);
            }
        });
        return adapted;
    }

compose的效果如下图所示,加了listener2是出于严谨考虑,因为compose调用了addListener方法。


什么叫"完成RequestFuture<S>"? 比如下面的实现,在onSuccess中可以调用future.complete


什么叫"也可以在RequestFuture<S>前增添其它的异步流程"?这是第二种用法。 我们先看CoordinatorResponseHandler,onSuccess会调用handle接口。

再看它的一个实现类JoinGroupResponseHandler,调用onJoinLeader新创建了一个RequestFuture,并调用chain,将handle方法参数中的future接在了新建RequestFuture的流程后面。这样,我们就为future添加了前置流程

以上两种用法图示就相当于下图:

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • spring 后置处理器回调

    PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors 代码太长就不贴出来了,请读...

    平凡的学生族
  • 总结io (nio、InputStream等)

    蓝色线和红色线的含义都是 "相当于",比如,在InputStream中,抽象构件指的是InputStream类,而装饰角色指的是FileInputStream角...

    平凡的学生族
  • springboot mybatis druid 多数据源

    先阅读基于SpirngBoot2.0+ 的 SpringBoot+Mybatis 多数据源配置,主要是理解DataSource1Config和DataSourc...

    平凡的学生族
  • Linux命令(64)——strings命令

    strings命令是二进制工具集GNU Binutils的一员,用于打印文件中可打印字符串,文件可以是文本文件(test.c),但一般用于打印二进制目标文件、库...

    Dabelv
  • centos编译qemu

    战神伽罗
  • 为什么亚洲企业更青睐混合云?

    众所周知,目前云在亚洲已成为最广泛采用的技术之一。就像任意其它消费模式一样,每个国家对于技术消费有着不同的具体需求和偏好,云也不例外。本区域的经济因素,全球和本...

    静一
  • 通过重建Hosting系统理解HTTP请求在ASP.NET Core管道中的处理流程[上]:采用管道处理请求

    之所以称ASP.NET Core是一个Web开发平台,而不是一个单纯的开发框架,源于它具有一个极具扩展性的请求处理管道,我们可以通过对这个管道的定制来满足各种场...

    蒋金楠
  • 嗨,IT小哥哥,我想问问你,我们有爱一点好不好?

    IT男,普遍会被贴上“智商极高,情商却极低”的标签,其实小宁同学也觉得IT小哥哥这个特殊的群体有些超乎寻常的高智商。

    用户1257393
  • asp.net web api 文件上传

    首先分别介绍正确的做法和错误的做法,然后分析他们的不同和错误之处,以便读者在实现此功能时可避开误区 1正确的做法 public class AvaterCont...

    甜橙很酸
  • MySQL主主同步环境出现1236错误

    环境: MySQL 5.7.25 主主架构 故障现象: 发现互相之间的同步均发生异常,两端均出现1236错误,在两个主节点上分别执行show slave st...

    Alfred Zhao

扫码关注云+社区

领取腾讯云代金券