RxJava

我不想说这些乱七八糟的概念了,实际上是我根本说不清,不过观察者模式和回调机制要知道

基本使用

@Test
public void test01() {
    Observable.create(new ObservableOnSubscribe<Object>() {
        @Override
        public void subscribe(ObservableEmitter<Object> observableEmitter) throws Exception {
            log.info("可以在这里触发消费者的方法");

            observableEmitter.onNext("onNext方法被调用");
            observableEmitter.onComplete();
        }
    }).subscribe(new Observer<Object>() {

        @Override
        public void onSubscribe(Disposable disposable) {
            log.info("Observable调用subscribe方法时会触发这个onSubscribe方法");
        }

        @Override
        public void onNext(Object o) {
            log.info(o.toString());
        }

        @Override
        public void onError(Throwable throwable) {
            log.info("onError");
        }

        @Override
        public void onComplete() {
            log.info("onComplete方法被调用");
        }
    });
}

输出结果如下

Observable调用subscribe方法时会触发这个onSubscribe方法
可以在这里触发消费者的方法
onNext方法被调用
onComplete方法被调用

源码分析

Observable#create
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
ObservableOnSubscribe

为订阅的每个观察者调用

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
ObservableCreate

实现了Observable接口

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    ......
}
RxJavaPlugins.onAssembly

这里的source就是ObservableCreate,先不分析 f != null的情况, 直接返回source

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

Observable的create方法,返回了一个ObservableCreate对象,ObservableCreate内部持有ObservableOnSubscribe实例

上面分析的是create方法,接下分析Observable的subscribe方法 接收一个Observer对象,这里指的是代码中的匿名内部类

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        // 返回observer
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        // 调用ObservableCreate的subscribeActual方法
        subscribeActual(observer);
    } catch (NullPointerException e) {
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}
RxJavaPlugins.onSubscribe

不分享f != null情况,直接返回Observer

public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
    BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
    if (f != null) {
        return apply(f, source, observer);
    }
    return observer;
}
ObservableCreate#subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
    // 将 observer 包装成 CreateEmitter
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);

    // 调用 observer的 onSubscribe方法
    observer.onSubscribe(parent);

    try {
        // 调用ObservableOnSubscribe的subscribe方法,前面说过 ObservableCreate 持有一个ObservableOnSubscribe实例,就是create方法传进来的匿名类
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
CreateEmitter

ObservableOnSubscribe的subscribe方法,调用CreateEmitter的相关方法,本质上是调用 Observer 的相关方法

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
    private static final long serialVersionUID = -3434801548987643227L;

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public boolean tryOnError(Throwable t) {
        if (t == null) {
            t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        }
        if (!isDisposed()) {
            try {
                observer.onError(t);
            } finally {
                dispose();
            }
            return true;
        }
        return false;
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }
}

上面是基本版的,下面来一个简便快捷版的

@Test
public void test02(){
    Observable.just("呵呵").subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(String s) {
            log.info(s);
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });
}

输出结果

呵呵
Observable#just

本质上并没有变化,只不过ObservableCreate换成了ObservableJust

public static <T> Observable<T> just(T item) {
    ObjectHelper.requireNonNull(item, "The item is null");
    return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
ObservableJust

这个地方好像是有一些区别,之前是触发ObservableOnSubscribe的subscribe方法,然后调用CreateEmitter的相关方法,本质上是调用 Observer 的相关方法。不过这里 并没有ObservableOnSubscribe相关概念,而是多了一个ScalarDisposable

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        // 先调用调用Observer的onSubscribe方法
        observer.onSubscribe(sd);
        sd.run();
    }
    ......
}

ScalarDisposable

继承了AtomicInteger,run方法中调用了observer的onNext和onComplete方法

public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable {
    static final int START = 0;
    static final int FUSED = 1;
    static final int ON_NEXT = 2;
    static final int ON_COMPLETE = 3;

    public ScalarDisposable(Observer<? super T> observer, T value) {
        this.observer = observer;
        this.value = value;
    }

    @Override
    public void run() {
        if (get() == START && compareAndSet(START, ON_NEXT)) {
            observer.onNext(value);
            if (get() == ON_NEXT) {
                lazySet(ON_COMPLETE);
                observer.onComplete();
            }
        }
    }
}

背压

@Test
public void test03() {
    Flowable.create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(FlowableEmitter<String> emitter) throws Exception {
            if (!emitter.isCancelled()) {
                emitter.onNext("onNext 1");
                emitter.onNext("onNext 2");
                emitter.onNext("onNext 3");
                emitter.onComplete();
            }
        }
    }, DROP).subscribe(new Subscriber<String>() {
        @Override
        public void onSubscribe(Subscription subscription) {
            subscription.request(2L);
            log.info("背压订阅");
        }

        @Override
        public void onNext(String s) {
            log.info(s);
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    });
}

输出结果

背压订阅
onNext 1
onNext 2

在subscribe方法里面调用了三次onNext方法,但是控制台只打印了两次,说明被限制了。注意onSubscribe方法中的subscription.request(2L)

原理分析

基本上和无背压版本的类似,不过这里的create方法传入了两个参数,一个是FlowableOnSubscribe,另一个是一个枚举类型,暂将它理解成背压策略

Flowable#create
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
    ObjectHelper.requireNonNull(source, "source is null");
    ObjectHelper.requireNonNull(mode, "mode is null");
    return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
FlowableCreate
public final class FlowableCreate<T> extends Flowable<T> {

    final FlowableOnSubscribe<T> source;

    final BackpressureStrategy backpressure;

    public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
        this.source = source;
        this.backpressure = backpressure;
    }

    @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;

        switch (backpressure) {
            case MISSING: {
                emitter = new MissingEmitter<T>(t);
                break;
            }
            case ERROR: {
                emitter = new ErrorAsyncEmitter<T>(t);
                break;
            }
            case DROP: {
                emitter = new DropAsyncEmitter<T>(t);
                break;
            }
            case LATEST: {
                emitter = new LatestAsyncEmitter<T>(t);
                break;
            }
            default: {
                emitter = new BufferAsyncEmitter<T>(t, bufferSize());
                break;
            }
        }

        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
}

根据传进来的参数,这里会使用的DropAsyncEmitter

DropAsyncEmitter

继承关系如下

t.onSubscribe(emitter)也就是匿名内部类的onSubscribe方法

public void onSubscribe(Subscription subscription) {
   subscription.request(2L);
   log.info("背压订阅");
}

然后调用BaseEmitter的request方法,BaseEmitter实现了Subscription接口

BaseEmitter#request
@Override
public final void request(long n) {
    // 校验n是否大于0,大于0返回true,小于0返回false
    if (SubscriptionHelper.validate(n)) {
        // 设置 Emitter的值 = Emitter的值 + n
        BackpressureHelper.add(this, n);
        // 空实现
        onRequested();
    }
}

接下来调用FlowableOnSubscribe的subscrib方法,FlowableOnSubscribe中的subscribe即匿名内部类中的subscribe方法会, 先调用调用BaseEmitter中的相关方法,BaseEmitter会根据value值选择是否调用Subscriber的相关方法(onNext、onComplete、onError)

操作符原理

操作符的核心原理就是包一层,类似于代理,这里以map为例

@Test
public void test04() {
    Flowable.create(emitter -> emitter.onNext("onNext 1"), DROP)
            .map(v -> v + " MAP")
            .subscribe(System.out::println);
}
Flowable#map

这里返回的是FlowableMap, 创建FlowableMap时以 当前Flowable实例 和 map操作符对应的逻辑函数Function 为参数。即 FlowableMap 持有上一层的Flowable实例

public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new FlowableMap<T, R>(this, mapper));
}
FlowableMap
public final class FlowableMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
    final Function<? super T, ? extends U> mapper;
    public FlowableMap(Flowable<T> source, Function<? super T, ? extends U> mapper) {
        super(source);
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(Subscriber<? super U> s) {
        if (s instanceof ConditionalSubscriber) {
            source.subscribe(new MapConditionalSubscriber<T, U>((ConditionalSubscriber<? super U>)s, mapper));
        } else {
            source.subscribe(new MapSubscriber<T, U>(s, mapper));
        }
    }

    static final class MapSubscriber<T, U> extends BasicFuseableSubscriber<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapSubscriber(Subscriber<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }
            U v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }

            // 调用上一层的 onNext 方法, 这里指的是 匿名函数 Subscriber 的onNext方法,如果连接了多个操作符,就是指向上一个操作符的onNext方法
            downstream.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}

总结一下

Flowable.create  =>     传入 FlowableOnSubscribe, 返回 FlowableCreate
.map =>       以create返回的FlowableCreate为参数,构建一个FlowableMap并返回
.subscribe =>     以原生的Subscriber作为参数调用Flowable的subscribe方法,然后再对原生的Subscriber做一层包装作为参数,调用FlowableMap的subscribeActual,
然后再调用FlowableCreate的subscribe方法(即lowable的subscribe方法),然后再以上一层包装的Subscriber作为参数调用FlowableCreate的subscribeActual方法,
更加背压策略,以包装的Subscriber的作为参数创建BaseEmitter对象,调用包装的Subscriber的onSubscribe方法。以BaseEmitter为参数调用FlowableOnSubscribe的subscribe
方法,即调用BaseEmitter的相关方法(onNext......),但这个其实本质上还是调用包装的Subscriber的相关方法(onNext......)。在包装的Subscriber内部,执行map中的相关
逻辑修改值,然后再以新值作为参数,调用原生的Subscriber的相关方法。

也就是说,有多个个操作符,就会包装多少层

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券