Reactor

Flux.just

从以下两行简单代码入手

Flux<String> fewWords = Flux.just("Hello", "World");
fewWords.subscribe(System.out::println);

Flux.subscribe是一个final方法,如下,最终入参consumer被封装成一个 LambdaSubscriber

public final Disposable subscribe(Consumer<? super T> consumer) {
    Objects.requireNonNull(consumer, "consumer");
    return subscribe(consumer, null, null);
}

public final Disposable subscribe(
        @Nullable Consumer<? super T> consumer,
        @Nullable Consumer<? super Throwable> errorConsumer,
        @Nullable Runnable completeConsumer,
        @Nullable Consumer<? super Subscription> subscriptionConsumer) {
    return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer,
            completeConsumer,
            subscriptionConsumer));
}

LambdaSubscriber继承关系

内部流程

  1. Flux.just("Hello", "World") 返回一个 FluxArray 对象,两个参数被封装成一个数组,并作为 FluxArray 的属性 array
  2. fewWords.subscribe(System.out::println),入参 System.out::println 被封装成一个 LambdaSubscriber ,然后在 FluxArray.subscribe 方法中调用 LambdaSubscriber.onSubscribe 方法(以自己和array为参数,封装一个ArraySubscription对象作为onSubscribe方法的参数) ,即 LambdaSubscriber.onSubscribe(new ArraySubscription<>(LambdaSubscriber, array))
  3. LambdaSubscriber.onSubscribe 方法内部调用 ArraySubscription.request 方法(入参为Long.MAX_VALUE);
  4. ArraySubscription.request 方法内部调用 LambdaSubscriber.onNext/onError/onComplete 方法;
  5. LambdaSubscriber的onNext方法内部调用真正的逻辑;

Flux.map

Flux<String> fewWords = Flux.just("Hello", "World").map(v -> v.toUpperCase());
fewWords.subscribe(System.out::println);

内部流程

  1. Flux.just("Hello", "World") 返回一个 FluxArray 对象,两个参数被封装成一个数组,并作为 FluxArray 的属性 array
  2. Flux.map(v -> v.toUpperCase()) 返回一个 FluxMapFuseable 对象,创建 FluxMapFuseable 对象的时候以 FluxArrayFunction 作为参数,即 return onAssembly(new FluxMapFuseable<>(this, mapper)) //这里的this即代表 FluxArray
  3. fewWords.subscribe(System.out::println),即调用 FluxMapFuseable.subscribe 方法,在该方法内调用 source.subscribe 方法(这里的source 即指 FluxArray),此时会传入一个 MapFuseableSubscriber 对象,在创建 MapFuseableSubscriber 时候以 actual(消费者) 和 mapper(map对应的Function入参) 作为入参, 即 source.subscribe(new MapFuseableSubscriber<>(actual, mapper))
  4. FluxArray.subscribe 方法内部调用 MapFuseableSubscriber.onSubscribe 方法(入参为 ArraySubscription ,创建 ArraySubscription 的时候以 MapFuseableSubscriberarray(真实数据) 为入参),即 s.onSubscribe(new ArraySubscription<>(s, array)) //这里的s代表 MapFuseableSubscriber
  5. MapFuseableSubscriber.onSubscribe 方法内部, 赋值 入参 ArraySubscription 为自己的成员变量 s,然后以自己(MapFuseableSubscriber)为入参,调用 actual即LambdaSubscriber.onSubscribe 方法, 即 actual.onSubscribe(this)
  6. LambdaSubscriber.onSubscribe 方法内部调用 (入参)MapFuseableSubscriber.request(Long.MAX_VALUE) 方法;
  7. MapFuseableSubscriber.request 方法内部,调用成员变量s(s前面说过,即ArraySubscription) 即 ArraySubscription.request 方法;
  8. ArraySubscription.request 方法内部,调用成员变量actual的相关方法(onNext/onError/onComplete)。前面对 ArraySubscription 分析过,是在调用 FluxArray.subscribe 方法的时候创建的,创建的时候以 MapFuseableSubscriberarray(真实数据) 为入参,所以这里的 actual即代表MapFuseableSubscriber,所以这里也就是调用 MapFuseableSubscriber 的相关方法(onNext/onError/onComplete);
  9. MapFuseableSubscriber.onNext/onError/onComplete 方法内部,如果不是onNext方法,会直接调用成员变量actual(LambdaSubscriber).onError/onComplete 相关方法;如果是onNext方法,会先执行 mapper的相关逻辑,得到一个结果 v,然后再以 v 为入参,调用成员变量actual(LambdaSubscriber).onNext 方法;【调用成员变量actual(LambdaSubscriber)的相关方法也就是执行真正的消费者逻辑】

Flux.create

Flux.create(new Consumer<FluxSink<String>>() {
    @Override
    public void accept(FluxSink<String> fluxSink) {
        fluxSink.next("发送数据耶");
    }
}).subscribe(System.out::println);

FluxCreate

内部流程

  1. Flux.create 方法发返回一个 FluxCreate 对象,即 return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_PULL))
  2. FluxCreate.subscribe 方法中,先以 actual(消费者)被压策略 创建 BaseSink 对象,这里返回一个 BufferAsyncSink 对象;然后调用 actual.onSubscribe(sink) 方法(这里的sink就是BaseSink对象);然后再 调用 source.accept(createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :sink) 方法(这里的source指的是create方法中传入的Function,即 匿名内部类)
public void subscribe(CoreSubscriber<? super T> actual) {
    // (1). 创建BaseSink对象,这里返回一个 BufferAsyncSink 对象
    BaseSink<T> sink = createSink(actual, backpressure);
    
    // (2). 调用消费者的onSubscribe方法
    actual.onSubscribe(sink);

    try {
        // (3). source指的是create方法中传入的 Function,即 匿名内部类
        source.accept(createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :sink);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        sink.error(Operators.onOperatorError(ex, actual.currentContext()));
    }
}
  1. actual.onSubscribe(sink) 方法中调用 sink即BufferAsyncSink .request 方法(BufferAsyncSink 继承了 BaseSink 对象,request 方法在 BaseSink 中);
  2. BaseSink.request 方法中调用子类 BufferAsyncSink.onRequestedFromDownstream 方法;
  3. BufferAsyncSink.onRequestedFromDownstream 方法中 调用 BufferAsyncSink.drain 方法;
  4. BufferAsyncSink.drain 方法中,会判断队列中是有有数据,如果有就会执行 actual.onNext 方法,因为此时队列中没有数据,所以会直接返回;
  5. 先以 BufferAsyncSink 为入参创建 SerializedSink 对象,然后以 SerializedSink 为入参,调用 Function(create方法的入参).accept 方法(即执行我们自己写的逻辑),然后调用 入参 SerializedSink.next 方法,即 fluxSink.next("发送数据耶")
  6. SerializedSink.next 方法中调用 BufferAsyncSink.next 方法;
  7. BufferAsyncSink.next 方法中,先将数据放入队列,然后再执行 BufferAsyncSink.drain 方法;注意,因为此时队列中有数据了,所以在接下来的 BufferAsyncSink.drain 方法调用中,会调用 actual.onNext(o) 方法(即执行我们自定义的消费者onNext方法逻辑);

FluxSink

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Ionic3 高德Web定位

    高德提供了Web平台定位的JS API,同样需要用到 APP_Key,并且需要注意是使用Web端的Key,如下图所示。必须是Web端的,其它平台的无效。 ht...

    spilledyear
  • Ionic3 拍照上传

    本文主要介绍使用cordova实现拍照上传,走通 “拍照 》预览 》上传 》 下载 ”这个流程。为了方便查看测试结果,需要了解Ionic应用调试的基本方法,有关...

    spilledyear
  • Cat消息存储

    即数据文件中的存储结构为: 【blockSize(4byte)->blockData】=>【blockSize(4byte)->blockData】

    spilledyear
  • 使用 VisualVM 和 JProfiler 进行性能分析及调优

    在我们开发大型 Java 应用程序的过程中,难免遇到内存泄露、性能瓶颈等问题,比如文件、网络、数据库的连接未释放,未优化的算法等。随着应用程序的持续运行,可能会...

    CG国斌
  • DjangoRestFramework,restful规范、APIview、解析器组件、Postman等

      大家还记得CBV的这个视图函数,为什么get请求就能找到类的get方法,post请求就能找到post方法,其内部有个dispatch方法来进行分发,这又怎么...

    changxin7
  • mapreduce项目调优

    一、调优的目的 充分的利用机器的性能,更快的完成mr程序的计算任务。甚至是在有限的机器条件下,能够支持运行足够多的mr程序。 二、调优的总体概述 从mr程...

    Albert陈凯
  • 订单下单

    某天准备出远门时,想到没有充电宝,就打开京东或天猫超市,选择一个心仪的充电宝,“哎哟,居然还有一个10元的优惠券”,下单付款,下午快递员敲门,充电宝就到家了。

    普通程序员
  • 大数据投融资周报(8月6日——8月12日 共11起)

    1大数据公司MapR完成5000万美元融资,预计明年挂牌上市 基于 Hadoop 的开源大数据技术公司MapR日前宣布,公司已经完成了新一轮的D轮融资,融资金...

    数据猿
  • 今日头条近10亿美元融资入账,Adobe推出体验云平台Experience Cloud | 大数据周周看

    数据猿导读 为整合公司各类云计算服务,Adobe推出Experience Cloud平台;数之联完成A轮数千万元融资,大数据分析挖掘领跑者获资本青睐;软件技术服...

    数据猿
  • ceph分布式存储-集群容量评估

    cpu型号: Intel(R) Xeon(R) CPU E5-2630 v4 @ 2.20GHz cpu核数: 40 硬盘: hdd

    Lucien168

扫码关注云+社区

领取腾讯云代金券