前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava源码浅析(一): 基础流程

RxJava源码浅析(一): 基础流程

原创
作者头像
笔头
修改2022-03-16 10:52:13
4510
修改2022-03-16 10:52:13
举报
文章被收录于专栏:Android记忆Android记忆

一、RxJava是什么

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences 翻译下来就是 “是一个使用可观测序列来组建异步、基于事件的程序的库”。说人话,看了这句话,初学者一脸懵逼。

RxJava如何使用,我这边就不细说了,网上有很多文章写的不错。

二、基础订阅流程浅析

RxJava源码有点庞大,我先从基础订阅流程下手。注:RxJava源码是 io.reactivex.rxjava3:rxjava:3.1.3版本。

demo示例来看看吧

代码语言:javascript
复制
//上游-被观察者
Observable<Integer> myobservable=Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    }
});
//下游-观察者
Observer myobserver=new Observer<Integer>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
    }
    @Override
    public void onNext(Integer integer) {
    }
    @Override
    public void onError(Throwable t) {
    }

    @Override
    public void onComplete() {
    }
};
//关联上游和下游
myobservable.subscribe(myobserver);

这个很基础,没有切换线程,没有其他操作符。后面文章会不断增加操作符来学习源码。

我们先从myobservable.subscribe(myobserver)开始吧。

1.myobservable

我们可以看到,myobservable是个抽象类,具体实例是通过Observable.create()获得,具体我们通过源码看下

代码语言:javascript
复制
public static <@NonNull T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
    Objects.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}

我们看下,最终调用RxJavaPlugins.onAssembly(new ObservableCreate<>(source))

代码语言:javascript
复制
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    return f != null ? (Observable)apply(f, source) : source;
}

onObservableAssembly默认是null,所以RxJavaPlugins.onAssembly(new ObservableCreate<>(source))就是返回ObservableCreate对象。

好了,myobservable具体实现类就是ObservableCreate对象,接下来看下subscribe方法,subscribe在Observable有具体的实现,我们来看下

2.subscribe方法

代码语言:javascript
复制
public final void subscribe(@NonNull Observer<? super T> observer) {
    Objects.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer); //步骤1
        Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.");
        subscribeActual(observer);//步骤二
    } 
    ..............................
}

先看步骤1

代码语言:javascript
复制
@NonNull
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
    BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
    return f != null ? (Observer)apply(f, source, observer) : observer;
}

这里主要判断 f 也就是onObservableSubscribe是不是null,这里默认是null,我们也没在其他地方设置onObservableSubscribe,那就直接返回observer,也就是下游myobserver。

接下来步骤2,调用当前subscribeActual方法,当前类是Observable,subscribeActual方法没有具体的实现,具体的实现在他的实现类中现实。那我们直接来看ObservableCreate的subscribeActual方法

代码语言:javascript
复制
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

1.这个里面创建了CreateEmitter,是一个分发器,封装了当前的myobserver对象。具体结构后面看。

然后调用了observer.onSubscribe(parent)接口,这里的observer具体现实类是myobserver,具体实现如下

2.接下来执行source.subscribe(parent);这个source是哪里来的呢?我们仔细排查,source就是ObservableOnSubscribe

这里source.subscribe(parent),就是调用上图的subscribe方法,这里parent看下代码,就是CreateEmitter对象。这里ObservableEmitter是个接口,具体实现就是CreateEmitter。

这里调用了emitter.onNext(1);这样的方法,现在我们看下CreateEmitter源码。

代码语言:javascript
复制
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
    final Observer<? super T> observer;
    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }
    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }
    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }
    @Override
    public boolean tryOnError(Throwable t) {
        if (t == null) {
            t = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
        }
        if (!isDisposed()) {
            try {
                observer.onError(t);
            } finally {
                dispose();
            }
            return true;
        }
        return false;
    }
    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }
   ..............................
}

CreateEmitter 利用装饰模式封装了observer,增加了一系列控制Disposable。

我们看下emitter.onNext(1);源码,最终是调用了 observer.onNext(t);这里的observer就是myobserver对象,那就是调用

同理,emitter.onComplete();可以得知,就是调用myobserver的onComplete回调。

在此,Rxjava基本流程完成。

三、总结

主要有四个主体

ObservableCreate、ObservableOnSubscribe、ObservableEmitter、Observer

个人理解打个比喻,ObservableCreate就是一个包裹发货地,ObservableOnSubscribe是收发员,ObservableEmitter就是包裹,Observer就是收件人地址。

ObservableCreate创建一个场地的同时创建ObservableOnSubscribe(发货员),在subscribeActual中创建(ObservableEmitter)把收件人和包裹绑定后,source.subscribe(parent)发货员把包裹onNext发货,收件人在myobserver中onNext接收。

当然RxJava里面还有其他类型,比如FlowableCreate、ObservableJust等等,但是原理都差不多。

这个就是个人对RxJava订阅流程大概理解,欢迎拍板!

下一篇文章我们加一些操作符,个人理解就是发货员把包裹经过包装后发给收件人。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、RxJava是什么
  • 二、基础订阅流程浅析
    • 1.myobservable
      • 2.subscribe方法
      • 三、总结
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档