前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava Observable 使用和源码阅读

RxJava Observable 使用和源码阅读

作者头像
三流之路
发布2018-09-11 15:55:02
7420
发布2018-09-11 15:55:02
举报
文章被收录于专栏:三流程序员的挣扎
代码语言:javascript
复制
implementation "io.reactivex.rxjava2:rxjava:2.1.9"

Observable/Observer 的使用

过去的 Observer 观察者回调有 onNext()、onComplete()、onError(),现在多了一个 onSubscribe(),刚开始调用,相当于 1.x 的 onStart(),参数是 Disposable,相当于 1.x 中的 Subscription,用于解除订阅。

代码语言:javascript
复制
// 被观察者
var observable = Observable.create(ObservableOnSubscribe<String> { emitter ->
    emitter.onNext("create message") // 通知观察者,调用其 onNext 方法
    emitter.onComplete()
})

// 观察者,和 1.x 相比多了个方法
observerStr = object : Observer<String> {
  override fun onNext(t: String) {
      textView.text = "${textView.text} onNext $t\n"
  }

  override fun onError(e: Throwable) {
      textView.text = "${textView.text} onError\n"
  }

  override fun onComplete() {
      textView.text = "${textView.text} onComplete\n"
      disposable?.dispose() // 解除订阅
  }

  override fun onSubscribe(d: Disposable) {
      disposable = d
  }

// 订阅
observable.subscribe(observerStr)

create 方法的参数和 1.x 不同,是

代码语言:javascript
复制
public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

Consumer

代码语言:javascript
复制
public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(T t) throws Exception;
}

Consumer 是更简单的观察者,只有一个 accept 方法,方法只有一个参数。比如

代码语言:javascript
复制
// accept 依次收到被观察者发过来的 a 和 b
val disposable: Disposable = Observable.fromArray("a","b").subscribe({
    textView.text = "${textView.text}\n $it "
})

此时方法返回值是 Disposable 对象,可用于解除订阅。

源码分析

Observable 实现了 ObservableSource,只有一个 subscribe 方法。

先看如何创建一个被观察者的

代码语言:javascript
复制
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
  // source 为 null,抛出异常信息
  ObjectHelper.requireNonNull(source, "source is null");
  // 用参数 source 构造 ObservableCreate 对象
  return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

通过 RxJavaPlugins 的 onAssembly 返回最后的 Observable。

代码语言:javascript
复制
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    // 现在这情况,f 是 null,于是直接返回参数传进来的 source
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

在调用 create 时,最终返回的对象是 ObservableCreate,它内部有一个 source 属性,就是 create 的参数 ObservableOnSubscribe 对象,代表发射数据的源头。

当有观察者订阅时,调用 subscribe 方法,重载方法有几个,Consumer 最后也是封装成一个 LambdaObserver,最终都是调到了下面的方法

代码语言:javascript
复制
public final void subscribe(Observer<? super T> observer) {
    ...
    try {
        ...
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        ...
    }
}

主要的方法其实就是一句话 subscribeActual(observer),这是一个抽象方法,由不同的被观察者实现。在这里显示是 ObservableCreate,看它的 subscribeActual 方法。

代码语言:javascript
复制
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

CreateEmitter 是一个静态内部类,持有观察者 Observer 的引用,它实现了 Disposable,可用于解除订阅,然后立刻调用 observer.onSubscribe,这样外面的观察者第一个执行到的回调就是 onSubscribe,并且拿到了 Disposable 对象。

然后就是 source.subscribe(parent),这个 source 是 ObservableOnSubscribe 对象,只有一个 subscribe 方法,现在调用这个 subscribe 方法,并且把 parent 传进去,返回去看 create 的参数。

代码语言:javascript
复制
ObservableOnSubscribe<String> { emitter ->
   emitter.onNext("字符串消息")
   emitter.onComplete()
}

这个参数 emitter 就是 parent,subscribe 方法内部调用 onNext 之类的方法,看下 CreateEmitter 的实现。

代码语言:javascript
复制
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {

    ...

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        ...
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public boolean tryOnError(Throwable t) {
        ...
        if (!isDisposed()) {
            try {
                observer.onError(t);
            } finally {
                dispose();
            }
            return true;
        }
        return false;
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }

    @Override
    public void setDisposable(Disposable d) {
        DisposableHelper.set(this, d);
    }

    ....

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
}

可见各种方法,最后都是调用了 Observer 里对应的方法,在 complete 和 error 之后都会执行 dispose 方法

总体看下来,就是一个普通的观察者模式,被观察者里持有观察者,然后调用观察者的方法使其收到回调,其实就和自己平时写监听一个意思,只是做了一些封装便于流式调用。

  1. Observable 的方法,创建了一个具体的 Observable 的实现类,其内部有一个属性 source,表示上游 Observable。
  2. Observer 订阅后,Observable 内部创建一个实现了 Dispoable 的对象,持有 Observer 的引用,然后让这个对象开始发射数据或事件。
  3. 发射的数据或事件最终都传递到 Observer 的对应的方法。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.05.24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Observable/Observer 的使用
  • Consumer
  • 源码分析
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档