前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布

RxJava

作者头像
spilledyear
发布2019-01-28 15:26:54
1.1K0
发布2019-01-28 15:26:54
举报
文章被收录于专栏:小白鼠

我不想说这些乱七八糟的概念了,实际上是我根本说不清,不过观察者模式和回调机制要知道

基本使用

代码语言:javascript
复制
@Test
public void test01() {
    Observable.create(new ObservableOnSubscribe<Object>() {
        @Override
        public void subscribe(ObservableEmitter<Object> observableEmitter) throws Exception {
            log.info("可以在这里触发消费者的方法");

            observableEmitter.onNext("onNext方法被调用");
            observableEmitter.onComplete();
        }
    }).subscribe(new Observer<Object>() {

        @Override
        public void onSubscribe(Disposable disposable) {
            log.info("Observable调用subscribe方法时会触发这个onSubscribe方法");
        }

        @Override
        public void onNext(Object o) {
            log.info(o.toString());
        }

        @Override
        public void onError(Throwable throwable) {
            log.info("onError");
        }

        @Override
        public void onComplete() {
            log.info("onComplete方法被调用");
        }
    });
}

输出结果如下

代码语言:javascript
复制
Observable调用subscribe方法时会触发这个onSubscribe方法
可以在这里触发消费者的方法
onNext方法被调用
onComplete方法被调用

源码分析

Observable#create
代码语言:javascript
复制
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
ObservableOnSubscribe

为订阅的每个观察者调用

代码语言:javascript
复制
public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
ObservableCreate

实现了Observable接口

代码语言:javascript
复制
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    ......
}
RxJavaPlugins.onAssembly

这里的source就是ObservableCreate,先不分析 f != null的情况, 直接返回source

代码语言:javascript
复制
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

Observable的create方法,返回了一个ObservableCreate对象,ObservableCreate内部持有ObservableOnSubscribe实例

上面分析的是create方法,接下分析Observable的subscribe方法 接收一个Observer对象,这里指的是代码中的匿名内部类

代码语言:javascript
复制
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        // 返回observer
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        // 调用ObservableCreate的subscribeActual方法
        subscribeActual(observer);
    } catch (NullPointerException e) {
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}
RxJavaPlugins.onSubscribe

不分享f != null情况,直接返回Observer

代码语言:javascript
复制
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
    BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
    if (f != null) {
        return apply(f, source, observer);
    }
    return observer;
}
ObservableCreate#subscribeActual
代码语言:javascript
复制
@Override
protected void subscribeActual(Observer<? super T> observer) {
    // 将 observer 包装成 CreateEmitter
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);

    // 调用 observer的 onSubscribe方法
    observer.onSubscribe(parent);

    try {
        // 调用ObservableOnSubscribe的subscribe方法,前面说过 ObservableCreate 持有一个ObservableOnSubscribe实例,就是create方法传进来的匿名类
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
CreateEmitter

ObservableOnSubscribe的subscribe方法,调用CreateEmitter的相关方法,本质上是调用 Observer 的相关方法

代码语言:javascript
复制
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
    private static final long serialVersionUID = -3434801548987643227L;

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public boolean tryOnError(Throwable t) {
        if (t == null) {
            t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        }
        if (!isDisposed()) {
            try {
                observer.onError(t);
            } finally {
                dispose();
            }
            return true;
        }
        return false;
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }
}

上面是基本版的,下面来一个简便快捷版的

代码语言:javascript
复制
@Test
public void test02(){
    Observable.just("呵呵").subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(String s) {
            log.info(s);
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });
}

输出结果

代码语言:javascript
复制
呵呵
Observable#just

本质上并没有变化,只不过ObservableCreate换成了ObservableJust

代码语言:javascript
复制
public static <T> Observable<T> just(T item) {
    ObjectHelper.requireNonNull(item, "The item is null");
    return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
ObservableJust

这个地方好像是有一些区别,之前是触发ObservableOnSubscribe的subscribe方法,然后调用CreateEmitter的相关方法,本质上是调用 Observer 的相关方法。不过这里 并没有ObservableOnSubscribe相关概念,而是多了一个ScalarDisposable

代码语言:javascript
复制
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        // 先调用调用Observer的onSubscribe方法
        observer.onSubscribe(sd);
        sd.run();
    }
    ......
}

ScalarDisposable

继承了AtomicInteger,run方法中调用了observer的onNext和onComplete方法

代码语言:javascript
复制
public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable {
    static final int START = 0;
    static final int FUSED = 1;
    static final int ON_NEXT = 2;
    static final int ON_COMPLETE = 3;

    public ScalarDisposable(Observer<? super T> observer, T value) {
        this.observer = observer;
        this.value = value;
    }

    @Override
    public void run() {
        if (get() == START && compareAndSet(START, ON_NEXT)) {
            observer.onNext(value);
            if (get() == ON_NEXT) {
                lazySet(ON_COMPLETE);
                observer.onComplete();
            }
        }
    }
}

背压

代码语言:javascript
复制
@Test
public void test03() {
    Flowable.create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(FlowableEmitter<String> emitter) throws Exception {
            if (!emitter.isCancelled()) {
                emitter.onNext("onNext 1");
                emitter.onNext("onNext 2");
                emitter.onNext("onNext 3");
                emitter.onComplete();
            }
        }
    }, DROP).subscribe(new Subscriber<String>() {
        @Override
        public void onSubscribe(Subscription subscription) {
            subscription.request(2L);
            log.info("背压订阅");
        }

        @Override
        public void onNext(String s) {
            log.info(s);
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    });
}

输出结果

代码语言:javascript
复制
背压订阅
onNext 1
onNext 2

在subscribe方法里面调用了三次onNext方法,但是控制台只打印了两次,说明被限制了。注意onSubscribe方法中的subscription.request(2L)

原理分析

基本上和无背压版本的类似,不过这里的create方法传入了两个参数,一个是FlowableOnSubscribe,另一个是一个枚举类型,暂将它理解成背压策略

Flowable#create
代码语言:javascript
复制
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
    ObjectHelper.requireNonNull(source, "source is null");
    ObjectHelper.requireNonNull(mode, "mode is null");
    return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
FlowableCreate
代码语言:javascript
复制
public final class FlowableCreate<T> extends Flowable<T> {

    final FlowableOnSubscribe<T> source;

    final BackpressureStrategy backpressure;

    public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
        this.source = source;
        this.backpressure = backpressure;
    }

    @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;

        switch (backpressure) {
            case MISSING: {
                emitter = new MissingEmitter<T>(t);
                break;
            }
            case ERROR: {
                emitter = new ErrorAsyncEmitter<T>(t);
                break;
            }
            case DROP: {
                emitter = new DropAsyncEmitter<T>(t);
                break;
            }
            case LATEST: {
                emitter = new LatestAsyncEmitter<T>(t);
                break;
            }
            default: {
                emitter = new BufferAsyncEmitter<T>(t, bufferSize());
                break;
            }
        }

        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
}

根据传进来的参数,这里会使用的DropAsyncEmitter

DropAsyncEmitter

继承关系如下

t.onSubscribe(emitter)也就是匿名内部类的onSubscribe方法

代码语言:javascript
复制
public void onSubscribe(Subscription subscription) {
   subscription.request(2L);
   log.info("背压订阅");
}

然后调用BaseEmitter的request方法,BaseEmitter实现了Subscription接口

BaseEmitter#request
代码语言:javascript
复制
@Override
public final void request(long n) {
    // 校验n是否大于0,大于0返回true,小于0返回false
    if (SubscriptionHelper.validate(n)) {
        // 设置 Emitter的值 = Emitter的值 + n
        BackpressureHelper.add(this, n);
        // 空实现
        onRequested();
    }
}

接下来调用FlowableOnSubscribe的subscrib方法,FlowableOnSubscribe中的subscribe即匿名内部类中的subscribe方法会, 先调用调用BaseEmitter中的相关方法,BaseEmitter会根据value值选择是否调用Subscriber的相关方法(onNext、onComplete、onError)

操作符原理

操作符的核心原理就是包一层,类似于代理,这里以map为例

代码语言:javascript
复制
@Test
public void test04() {
    Flowable.create(emitter -> emitter.onNext("onNext 1"), DROP)
            .map(v -> v + " MAP")
            .subscribe(System.out::println);
}
Flowable#map

这里返回的是FlowableMap, 创建FlowableMap时以 当前Flowable实例 和 map操作符对应的逻辑函数Function 为参数。即 FlowableMap 持有上一层的Flowable实例

代码语言:javascript
复制
public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new FlowableMap<T, R>(this, mapper));
}
FlowableMap
代码语言:javascript
复制
public final class FlowableMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
    final Function<? super T, ? extends U> mapper;
    public FlowableMap(Flowable<T> source, Function<? super T, ? extends U> mapper) {
        super(source);
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(Subscriber<? super U> s) {
        if (s instanceof ConditionalSubscriber) {
            source.subscribe(new MapConditionalSubscriber<T, U>((ConditionalSubscriber<? super U>)s, mapper));
        } else {
            source.subscribe(new MapSubscriber<T, U>(s, mapper));
        }
    }

    static final class MapSubscriber<T, U> extends BasicFuseableSubscriber<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapSubscriber(Subscriber<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }
            U v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }

            // 调用上一层的 onNext 方法, 这里指的是 匿名函数 Subscriber 的onNext方法,如果连接了多个操作符,就是指向上一个操作符的onNext方法
            downstream.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}

总结一下

代码语言:javascript
复制
Flowable.create  =>     传入 FlowableOnSubscribe, 返回 FlowableCreate
.map =>       以create返回的FlowableCreate为参数,构建一个FlowableMap并返回
.subscribe =>     以原生的Subscriber作为参数调用Flowable的subscribe方法,然后再对原生的Subscriber做一层包装作为参数,调用FlowableMap的subscribeActual,
然后再调用FlowableCreate的subscribe方法(即lowable的subscribe方法),然后再以上一层包装的Subscriber作为参数调用FlowableCreate的subscribeActual方法,
更加背压策略,以包装的Subscriber的作为参数创建BaseEmitter对象,调用包装的Subscriber的onSubscribe方法。以BaseEmitter为参数调用FlowableOnSubscribe的subscribe
方法,即调用BaseEmitter的相关方法(onNext......),但这个其实本质上还是调用包装的Subscriber的相关方法(onNext......)。在包装的Subscriber内部,执行map中的相关
逻辑修改值,然后再以新值作为参数,调用原生的Subscriber的相关方法。

也就是说,有多个个操作符,就会包装多少层
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019.01.19 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基本使用
  • 源码分析
  • ScalarDisposable
  • 背压
  • 原理分析
  • 操作符原理
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档