Reactor

Flux.just

从以下两行简单代码入手

Flux<String> fewWords = Flux.just("Hello", "World");
fewWords.subscribe(System.out::println);

Flux.subscribe是一个final方法,如下,最终入参consumer被封装成一个 LambdaSubscriber

public final Disposable subscribe(Consumer<? super T> consumer) {
    Objects.requireNonNull(consumer, "consumer");
    return subscribe(consumer, null, null);
}

public final Disposable subscribe(
        @Nullable Consumer<? super T> consumer,
        @Nullable Consumer<? super Throwable> errorConsumer,
        @Nullable Runnable completeConsumer,
        @Nullable Consumer<? super Subscription> subscriptionConsumer) {
    return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer,
            completeConsumer,
            subscriptionConsumer));
}

LambdaSubscriber继承关系

内部流程

  1. Flux.just("Hello", "World") 返回一个 FluxArray 对象,两个参数被封装成一个数组,并作为 FluxArray 的属性 array
  2. fewWords.subscribe(System.out::println),入参 System.out::println 被封装成一个 LambdaSubscriber ,然后在 FluxArray.subscribe 方法中调用 LambdaSubscriber.onSubscribe 方法(以自己和array为参数,封装一个ArraySubscription对象作为onSubscribe方法的参数) ,即 LambdaSubscriber.onSubscribe(new ArraySubscription<>(LambdaSubscriber, array))
  3. LambdaSubscriber.onSubscribe 方法内部调用 ArraySubscription.request 方法(入参为Long.MAX_VALUE);
  4. ArraySubscription.request 方法内部调用 LambdaSubscriber.onNext/onError/onComplete 方法;
  5. LambdaSubscriber的onNext方法内部调用真正的逻辑;

Flux.map

Flux<String> fewWords = Flux.just("Hello", "World").map(v -> v.toUpperCase());
fewWords.subscribe(System.out::println);

内部流程

  1. Flux.just("Hello", "World") 返回一个 FluxArray 对象,两个参数被封装成一个数组,并作为 FluxArray 的属性 array
  2. Flux.map(v -> v.toUpperCase()) 返回一个 FluxMapFuseable 对象,创建 FluxMapFuseable 对象的时候以 FluxArrayFunction 作为参数,即 return onAssembly(new FluxMapFuseable<>(this, mapper)) //这里的this即代表 FluxArray
  3. fewWords.subscribe(System.out::println),即调用 FluxMapFuseable.subscribe 方法,在该方法内调用 source.subscribe 方法(这里的source 即指 FluxArray),此时会传入一个 MapFuseableSubscriber 对象,在创建 MapFuseableSubscriber 时候以 actual(消费者) 和 mapper(map对应的Function入参) 作为入参, 即 source.subscribe(new MapFuseableSubscriber<>(actual, mapper))
  4. FluxArray.subscribe 方法内部调用 MapFuseableSubscriber.onSubscribe 方法(入参为 ArraySubscription ,创建 ArraySubscription 的时候以 MapFuseableSubscriberarray(真实数据) 为入参),即 s.onSubscribe(new ArraySubscription<>(s, array)) //这里的s代表 MapFuseableSubscriber
  5. MapFuseableSubscriber.onSubscribe 方法内部, 赋值 入参 ArraySubscription 为自己的成员变量 s,然后以自己(MapFuseableSubscriber)为入参,调用 actual即LambdaSubscriber.onSubscribe 方法, 即 actual.onSubscribe(this)
  6. LambdaSubscriber.onSubscribe 方法内部调用 (入参)MapFuseableSubscriber.request(Long.MAX_VALUE) 方法;
  7. MapFuseableSubscriber.request 方法内部,调用成员变量s(s前面说过,即ArraySubscription) 即 ArraySubscription.request 方法;
  8. ArraySubscription.request 方法内部,调用成员变量actual的相关方法(onNext/onError/onComplete)。前面对 ArraySubscription 分析过,是在调用 FluxArray.subscribe 方法的时候创建的,创建的时候以 MapFuseableSubscriberarray(真实数据) 为入参,所以这里的 actual即代表MapFuseableSubscriber,所以这里也就是调用 MapFuseableSubscriber 的相关方法(onNext/onError/onComplete);
  9. MapFuseableSubscriber.onNext/onError/onComplete 方法内部,如果不是onNext方法,会直接调用成员变量actual(LambdaSubscriber).onError/onComplete 相关方法;如果是onNext方法,会先执行 mapper的相关逻辑,得到一个结果 v,然后再以 v 为入参,调用成员变量actual(LambdaSubscriber).onNext 方法;【调用成员变量actual(LambdaSubscriber)的相关方法也就是执行真正的消费者逻辑】

Flux.create

Flux.create(new Consumer<FluxSink<String>>() {
    @Override
    public void accept(FluxSink<String> fluxSink) {
        fluxSink.next("发送数据耶");
    }
}).subscribe(System.out::println);

FluxCreate

内部流程

  1. Flux.create 方法发返回一个 FluxCreate 对象,即 return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_PULL))
  2. FluxCreate.subscribe 方法中,先以 actual(消费者)被压策略 创建 BaseSink 对象,这里返回一个 BufferAsyncSink 对象;然后调用 actual.onSubscribe(sink) 方法(这里的sink就是BaseSink对象);然后再 调用 source.accept(createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :sink) 方法(这里的source指的是create方法中传入的Function,即 匿名内部类)
public void subscribe(CoreSubscriber<? super T> actual) {
    // (1). 创建BaseSink对象,这里返回一个 BufferAsyncSink 对象
    BaseSink<T> sink = createSink(actual, backpressure);
    
    // (2). 调用消费者的onSubscribe方法
    actual.onSubscribe(sink);

    try {
        // (3). source指的是create方法中传入的 Function,即 匿名内部类
        source.accept(createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :sink);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        sink.error(Operators.onOperatorError(ex, actual.currentContext()));
    }
}
  1. actual.onSubscribe(sink) 方法中调用 sink即BufferAsyncSink .request 方法(BufferAsyncSink 继承了 BaseSink 对象,request 方法在 BaseSink 中);
  2. BaseSink.request 方法中调用子类 BufferAsyncSink.onRequestedFromDownstream 方法;
  3. BufferAsyncSink.onRequestedFromDownstream 方法中 调用 BufferAsyncSink.drain 方法;
  4. BufferAsyncSink.drain 方法中,会判断队列中是有有数据,如果有就会执行 actual.onNext 方法,因为此时队列中没有数据,所以会直接返回;
  5. 先以 BufferAsyncSink 为入参创建 SerializedSink 对象,然后以 SerializedSink 为入参,调用 Function(create方法的入参).accept 方法(即执行我们自己写的逻辑),然后调用 入参 SerializedSink.next 方法,即 fluxSink.next("发送数据耶")
  6. SerializedSink.next 方法中调用 BufferAsyncSink.next 方法;
  7. BufferAsyncSink.next 方法中,先将数据放入队列,然后再执行 BufferAsyncSink.drain 方法;注意,因为此时队列中有数据了,所以在接下来的 BufferAsyncSink.drain 方法调用中,会调用 actual.onNext(o) 方法(即执行我们自定义的消费者onNext方法逻辑);

FluxSink

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券