前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava源码剖析

RxJava源码剖析

原创
作者头像
程序员小何SS
发布2021-12-08 11:59:30
5550
发布2021-12-08 11:59:30
举报
文章被收录于专栏:Android理论Android理论

前言 本篇的文章是基于Rxjava 2.1.2。从下面的一段代码中,我们从源码的角度分析 RxJava 的实现原理:

代码语言:javascript
复制
ObservableOnSubscribe<Integer> oos = new ObservableOnSubscribe<Integer>() { 
    @Override 
    public void subscribe(ObservableEmitter<Integer> e) throws Exception { 
        final int max = 100; 
        for (int i = 1; i <= max; i++) { 
            e.onNext(max); 
        } 
        e.onComplete(); 
    } 
}; 
Observer<Integer> o = new Observer<Integer>() { 
    @Override 
    public void onSubscribe(Disposable d) { 
    } 
    @Override 
    public void onNext(Integer integer) { 
    } 
    @Override 
    public void onError(Throwable e) { 
    } 
    @Override 
    public void onComplete() { 
    } 
}; 
Observable.create(oos) 
        .observeOn(AndroidSchedulers.mainThread()) 
        .subscribeOn(Schedulers.computation()) 
        .subscribe(o);

分析源码之前,我们先定义一下名词,RxJava 是基于观察者模式的,这里将被观察者叫做主题(Source),观察者叫做观察者(Observer)。

上面的代码首先创建了一个主题对象,然后又创建了一个观察者对象,最后将两者关联起来,并且最重要的一点,指定了主题对象和观察者对象执行的线程。

正文 Observable.create(oos)

首先分析这行代码做了什么事情:

代码语言:javascript
复制
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { 
    ObjectHelper.requireNonNull(source, "source is null"); 
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); 
}

ObjectHelper 只是用来做非空判断,这里就不用管它了。看看 RxJavaPlugins 做了什么:

代码语言:javascript
复制
/** 
 * Calls the associated hook function. 
 * @param <T> the value type 
 * @param source the hook's input value 
 * @return the value returned by the hook 
 */ 
@SuppressWarnings({ "rawtypes", "unchecked" }) 
@NonNull public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { 
    Function<? super Observable, ? extends Observable> f = onObservableAssembly; 
    if (f != null) { 
        return apply(f, source); 
    } 
    return source; 

注释上都说了,这是一个钩子函数,也就是说如果 onObservableAssembly 的值不为空,那么就调用这个钩子函数,onObservableAssembly 是一个静态变量,需要我们主动的去设置才会赋值,这里当做空来考虑,如果 onObservableAssembly 为空的话,也就是说这个方法啥都没做,直接返回 source 参数,也就是上面的 ObservableCreate 对象。

总结一下,Observable.create(oos) 只是创建了一个 ObservableCreate 对象。这个方法就暂时先分析到这里,至于这个对象内部有什么东西,我们后面会说到。

observeOn(AndroidSchedulers.mainThread()) 既然之前的 create 方法创建了一个 ObservableCreate 对象并返回,也就是说 observeOn(Schedulers.computation()) 这个方法是调用的 ObservableCreate 这个对象上的方法。

代码语言:javascript
复制
public final class ObservableCreate<T> extends Observable<T> {

ObservableCreate 是继承至 Observable 的。

代码语言:javascript
复制
public final Observable<T> observeOn(Scheduler scheduler) { 
    return observeOn(scheduler, false, bufferSize()); 
}

Observable 的 observeOn 是 final 的,所以走的父类的方法。继续跟踪 observeOn 调用的同名方法:

代码语言:javascript
复制
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { 
    ObjectHelper.requireNonNull(scheduler, "scheduler is null"); 
    ObjectHelper.verifyPositive(bufferSize, "bufferSize"); 
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); 
}

ObjectHelper 跳过。这里又是一个 onAssembly 方法的调用,我很好奇这个单词是什么意思。点进去看一下:

代码语言:javascript
复制
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { 
    Function<? super Observable, ? extends Observable> f = onObservableAssembly; 
    if (f != null) { 
        return apply(f, source); 
    } 
    return source; 
}

同样是一个钩子方法,现在也是有经验的人呢,再看到这个方法,就直接跳过,只关心它传递的参数和返回值就行了。它的返回值默认就是传递进来的参数。

所以,observeOn 方法就是创建并返回了一个 ObservableObserveOn 对象(大神教你起类名系列二),这里预警一下,之后像这样类似的类名差不多还有3个。

subscribeOn(Schedulers.computation()) 由于 observeOn 创建并返回了一个 ObservableObserveOn 对象,所以这里调用的是 ObservableObserveOn 对象上的方法。

代码语言:javascript
复制
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { 

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

也是继承至 Observable 。

代码语言:javascript
复制
public final Observable<T> subscribeOn(Scheduler scheduler) { 
    ObjectHelper.requireNonNull(scheduler, "scheduler is null"); 
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); 
}

不出意料,也是 final 的,创建了一个 ObservableSubscribeOn 对象(大神教你起类名系列三)并返回。

subscribe(o) 前面 subscribeOn 创建了一个 ObservableSubscribeOn 对象并返回,所以这里调用的是 ObservableSubscribeOn 这个对象上面的方法。

代码语言:javascript
复制
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { 

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

继承至 Observable 类。

代码语言:javascript
复制
public final void subscribe(Observer<? super T> observer) { 
    ObjectHelper.requireNonNull(observer, "observer is null"); 
    try { 
        observer = RxJavaPlugins.onSubscribe(this, observer); 
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); 
        subscribeActual(observer); 
    } catch (NullPointerException e) { // NOPMD 
        throw e; 
    } catch (Throwable e) { 
        Exceptions.throwIfFatal(e); 
        // can't call onError because no way to know if a Disposable has been set or not 
        // can't call onSubscribe because the call might have set a Subscription already 
        RxJavaPlugins.onError(e); 
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); 
        npe.initCause(e); 
        throw npe; 
    } 
}

这个方法也是 final 的,所以是调用的这个方法。

代码语言:javascript
复制
public final void subscribe(Observer<? super T> observer) { … }

这个方法是我们需要分析的重点,看看内部具体的代码吧先:

代码语言:javascript
复制
public final void subscribe(Observer<? super T> observer) { 
    ObjectHelper.requireNonNull(observer, "observer is null"); 
    try { 
        observer = RxJavaPlugins.onSubscribe(this, observer); 
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); 
        subscribeActual(observer); 
    } catch (NullPointerException e) { // NOPMD 
        throw e; 
    } catch (Throwable e) { 
        Exceptions.throwIfFatal(e); 
        // can't call onError because no way to know if a Disposable has been set or not 
        // can't call onSubscribe because the call might have set a Subscription already 
        RxJavaPlugins.onError(e); 
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); 
        npe.initCause(e); 
        throw npe; 
    } 
}

这里忽略 ObjectHelper 和异常处理的代码只有两行代码是关键。先看 observer = RxJavaPlugins.onSubscribe(this, observer):

代码语言:javascript
复制
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) { 
    BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe; 
    if (f != null) { 
        return apply(f, source, observer); 
    } 
    return observer; 
}

没想到啊没想到,你这浓眉大眼的家伙也是一个钩子方法。所以这行代码相当于 obsever = observer。接着看,subscribeActual(observer):

代码语言:javascript
复制
protected abstract void subscribeActual(Observer<? super T> observer);

这是一个抽象方法,没啥好分析的。接下来我们要进入正题了,根据我们编写的代码,是 ObservableSubscribeOn 这个对象调用了 subscribe 方法,所以我们看看这个类的 subscribeActual 方法。

代码语言:javascript
复制
@Override 
public void subscribeActual(final Observer<? super T> s) { 
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); 
    s.onSubscribe(parent); 
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); 
}

这里需要注意的是 s 这个参数,后面会有很多地方看到这个参数,一定要搞清楚这个参数是谁传递过来的。比如说:

代码语言:javascript
复制
A.subscribe(B) 
那么,参数 s 就是 B。

在我们的代码中是 ObservableSubscribeOn.subscribe(o); 理解了这一点,我们详细分析代码里面的内容:首先创建了一个 SubscribeOnObserver (大神教你起类名系列四)。然后调用了我们创建的对象 o 的 onSubscribe 方法:

代码语言:javascript
复制
@Override 
public void onSubscribe(Disposable d) { 
}

我们的 onSubscribe 方法里面啥都没做。不过一般来说,你应该调用一下 onStart 方法。

接下来是调用 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));,setDisposable 的方法不影响流程分析,这里就先跳过了,有兴趣的可以点进去看一下。然后就是 scheduler 变量,这个变量就是我们使用 subscribeOn 传递的参数:

代码语言:javascript
复制
public final Observable<T> subscribeOn(Scheduler scheduler) { 
    ObjectHelper.requireNonNull(scheduler, "scheduler is null"); 
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); 
} 

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { 
    super(source); 
    this.scheduler = scheduler; 
}

这个 scheduler 就是 Schedulers.computation()。然后调用了它的 scheduleDirect 方法:

代码语言:javascript
复制
public Disposable scheduleDirect(@NonNull Runnable run) { 
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); 
} 

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { 
    final Worker w = createWorker(); 
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); 
    DisposeTask task = new DisposeTask(decoratedRun, w); 
    w.schedule(task, delay, unit); 
    return task; 
}

这两个方法是父类的,Schedulers.computation() 返回的是一个 ComputationScheduler 对象,这里找具体的实现类由于调用链比较长,就不给出了,自己点着点着就能找到了。看看 ComputationScheduler 有没有复写这两个方法:

代码语言:javascript
复制
@NonNull 
@Override 
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) { 
    PoolWorker w = pool.get().getEventLoop(); 
    return w.scheduleDirect(run, delay, unit); 
}

它覆盖了父类的第2个 scheduleDirect 方法。这里就不深入分析里面的池了。看 w.scheduleDirect(run, delay, unit);:

代码语言:javascript
复制
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) { 
    ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run)); 
    try { 
        Future<?> f; 
        if (delayTime <= 0L) { 
            f = executor.submit(task); 
        } else { 
            f = executor.schedule(task, delayTime, unit); 
        } 
        task.setFuture(f); 
        return task; 
    } catch (RejectedExecutionException ex) { 
        RxJavaPlugins.onError(ex); 
        return EmptyDisposable.INSTANCE; 
    } 
}

熟悉的线程池使用代码。希望看到这里你还没有忘记我们要分析的是什么。简单的归纳一下,其实就是向我们创建的 scheduler 里面提交了一个 runnable。最终这个 Runnable 肯定会执行,那么看看这个 Runnable 里面有什么代码:

代码语言:javascript
复制
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); 

final class SubscribeTask implements Runnable { 
    private final SubscribeOnObserver<T> parent; 
    SubscribeTask(SubscribeOnObserver<T> parent) { 
        this.parent = parent; 
    } 
    @Override 
    public void run() { 
        source.subscribe(parent); 
    } 
}

run 方法里面就只有一句代码,但是我们需要搞清楚这里的 source 和 parent 分别是哪个对象。

parent 可以直接看到是 SubscribeOnObserver 对象。

source 是使用的外部类的变量。

代码语言:javascript
复制
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { 
    super(source); 
    this.scheduler = scheduler; 
}

这里调用了 super 方法,所以构造函数里传递的变量就是 source。

代码语言:javascript
复制
public final Observable<T> subscribeOn(Scheduler scheduler) { 
    ObjectHelper.requireNonNull(scheduler, "scheduler is null"); 
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); 
}

这里是我们之前分析过的创建 ObservableSubscribeOn 的代码,这里的 this 指的是 observeOn 创建的 ObservableObserveOn 对象。希望看到这里你没有搞晕,如果你是使用手机看的,并且看懂了,我是真的佩服。

也就是说,run 里面的代码就是调用了 ObservableObserveOn 对象的 subscribe 方法。之前我们分析过了,subscribe 方法实际上没有做什么,只是调用了 subscribeActual 方法,所以我们进入这个类内部看看:

代码语言:javascript
复制
@Override 
protected void subscribeActual(Observer<? super T> observer) { 
    if (scheduler instanceof TrampolineScheduler) { 
        source.subscribe(observer); 
    } else { 
        Scheduler.Worker w = scheduler.createWorker(); 
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); 
    } 
}

我们在 observeOn 传递的 scheduler 不是 TrampolineScheduler 类型的,所以只需要看 else 的代码。这里是先创建了一个工作线程(由于我们使用的是 AndroidScheduler,所以这里是指的主线程),然后调用了 source 的 subscribe 方法。需要注意的是这里最后创建了一个 ObserveOnObserver 对象(大神教你起类名系列五)。

先看看 createWork,在 HandlerScheduler 中:

代码语言:javascript
复制
@Override 
public Worker createWorker() { 
    return new HandlerWorker(handler); 
}

返回了一个 HandlerWorker 对象。再看 source.subscribe(),首先这里的 source 指的是 create 方法创建的 ObservableCreate 对象,调用 subscribe 传递的是 ObserveOnObserver。看看这个对象的 subscribeActual 方法:

代码语言:javascript
复制
@Override 
protected void subscribeActual(Observer<? super T> observer) { 
    CreateEmitter<T> parent = new CreateEmitter<T>(observer); 
    observer.onSubscribe(parent); 
    try { 
        source.subscribe(parent); 
    } catch (Throwable ex) { 
        Exceptions.throwIfFatal(ex); 
        parent.onError(ex); 
    } 
}

这里的参数 observer 是 ObserveOnObserver,source 是我们代码中创建的 oos 对象。

首先创建了一个 CreateEmitter 对象。接着看 ObserveOnObserver 的 onSubscribe 方法做了啥:

代码语言:javascript
复制
@Override 
public void onSubscribe(Disposable s) { 
    if (DisposableHelper.validate(this.s, s)) { 
        this.s = s; 
        if (s instanceof QueueDisposable) { 
            @SuppressWarnings("unchecked") 
            QueueDisposable<T> qd = (QueueDisposable<T>) s; 
            int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); 
            if (m == QueueDisposable.SYNC) { 
                sourceMode = m; 
                queue = qd; 
                done = true; 
                actual.onSubscribe(this); 
                schedule(); 
                return; 
            } 
            if (m == QueueDisposable.ASYNC) { 
                sourceMode = m; 
                queue = qd; 
                actual.onSubscribe(this); 
                return; 
            } 
        } 
        queue = new SpscLinkedArrayQueue<T>(bufferSize); 
        actual.onSubscribe(this); 
    } 
}

这里代码比较长,只分析重要的代码,就是 actual.onSubscribe 这句。actual 是构造函数中赋值的,所以我们回到创建 ObserveOnObserver 的地方,actual 指的是 SubscribeOnObserver 对象。所以它调用了 SubscribeOnObserver 的 onSubscribe 方法。接下来分析一下它的 onSubscribe 方法做了什么,这里不看也不会影响流程。

代码语言:javascript
复制
SubscribeOnObserver(Observer<? super T> actual) { 
    this.actual = actual; 
    this.s = new AtomicReference<Disposable>(); 
} 
@Override 
public void onSubscribe(Disposable s) { 
    DisposableHelper.setOnce(this.s, s); 
}

这个方法调用了 setOnce 方法:

代码语言:javascript
复制
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) { 
    ObjectHelper.requireNonNull(d, "d is null"); 
    if (!field.compareAndSet(null, d)) { 
        d.dispose(); 
        if (field.get() != DISPOSED) { 
            reportDisposableSet(); 
        } 
        return false; 
    } 
    return true; 
}

这里涉及到了乐观锁等玩意,简单来说就是先判断 field 的值是否为空,如果为空则设置为 d,不为空则将 d dispose。然后判断 field 的值,由于 field 的值只能设定一次非 DISPOSED 值,所以如果不为 DISPOSED,说明已经被设置过了,再报出异常,如果为 DISPOSED 是可以再次设置的。按照正常的流程,这里只是将 field 的值设置为 d,然后返回true。这个方法可以先不用管。

回到主线流程上,source.subscribe(parent); 这是最重要的一句代码。source 是在构造函数赋值的,看看构造方法:

代码语言:javascript
复制
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { 
    ObjectHelper.requireNonNull(source, "source is null"); 
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); 
}

也就是说这里的 source 是我们代码中创建的 oos 对象。parent 是新创建的 CreateEmitter 对象。看看我们 oos 的 subscribe 方法:

代码语言:javascript
复制
@Override 
public void subscribe(ObservableEmitter<Integer> e) throws Exception { 
    Log.e("aprz", Thread.currentThread().getName()); 
    final int max = 100; 
    for (int i = 1; i <= max; i++) { 
        e.onNext(i); 
    } 
    e.onComplete(); 
}

这里就是事件开始的起点。所有的事件都由 ObservableEmitter 开始发送,看看它的代码,它是一个接口,在我们的例子中,它的实现类是 CreateEmitter,所有我们分析这个类的 onNext 方法:

代码语言:javascript
复制
@Override 
public void onNext(T t) { 
    if (t == null) { 
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); 
        return; 
    } 
    if (!isDisposed()) { 
        observer.onNext(t); 
    } 
}

isDisposed 方法返回 false 才会去调用 observer 的 onNext 方法,这个 observer 是谁呢? 看到这里我们就要从后往前推一遍之前的代码了,不管你绝不绝望,反正我是很绝望。这的 observer 是 ObserveOnObserver 对象。

接下来我们就进入 ObserveOnObserver 里面,看看它接受事件之后做了什么,上面的参数 e 就是:

代码语言:javascript
复制
@Override 
public void onNext(T t) { 
    if (done) { 
        return; 
    } 
    if (sourceMode != QueueDisposable.ASYNC) { 
        queue.offer(t); 
    } 
    schedule(); 
}

调用了 schedule 方法:

代码语言:javascript
复制
void schedule() { 
    if (getAndIncrement() == 0) { 
        worker.schedule(this); 
    } 
}

向 work 中提交了一个 Runnable,这里传递的是 this。说明它自己肯定实现了这个接口,我们看看它的 run 方法做了啥:

代码语言:javascript
复制
@Override 
public void run() { 
    if (outputFused) { 
        drainFused(); 
    } else { 
        drainNormal(); 
    } 
}

这里一般是走 drainNormal 吧,我猜的,我们分析这个方法吧。

代码语言:javascript
复制
void drainNormal() { 
    int missed = 1; 
    final SimpleQueue<T> q = queue; 
    final Observer<? super T> a = actual; 
    for (;;) { 
        if (checkTerminated(done, q.isEmpty(), a)) { 
            return; 
        } 
        for (;;) { 
            boolean d = done; 
            T v; 
            try { 
                v = q.poll(); 
            } catch (Throwable ex) { 
                Exceptions.throwIfFatal(ex); 
                s.dispose(); 
                q.clear(); 
                a.onError(ex); 
                worker.dispose(); 
                return; 
            } 
            boolean empty = v == null; 
            if (checkTerminated(d, empty, a)) { 
                return; 
            } 
            if (empty) { 
                break; 
            } 
            a.onNext(v); 
        } 
        missed = addAndGet(-missed); 
        if (missed == 0) { 
            break; 
        } 
    } 
}

代码很长,具体做了啥我们暂时不用关心,只需要注意到 a.onNexe(v) 这行代码,这个 a 是 actual 变量,actual 又是 SubscribeOnObserver 对象,我们看看它的 onNext 方法:

代码语言:javascript
复制
@Override 
public void onNext(T t) { 
    actual.onNext(t); 
}

很简单,这里的 actual 就是我们创建的 o 了,所以最终调用到了我们的代码里面。

好了,到这里一个完整的流程就整理出来了,但是还有一个问题没有解决,就是线程切换是发生在哪里。因为为了不影响整体流程的分析,所以上面并没有去分析线程切换的东西,下面开始分析。

直接从 subscribeOn 开始,看 ObservableSubscribeOn 的代码:

代码语言:javascript
复制
@Override 
public void subscribeActual(final Observer<? super T> s) { 
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); 
    s.onSubscribe(parent); 
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); 
}

从这里开始就进行了线程的切换,根据上面的分析我们知道这里是将 SubscribeTask 作为一个 Runnable 对象给提交进了我们指定的 scheduler (subscribeOn 传递的)中。所以后面的流程都是在 scheduler 所在的线程在运行。

再看 observeOn,看 ObservableObserveOn 的代码。

代码语言:javascript
复制
@Override 
protected void subscribeActual(Observer<? super T> observer) { 
    if (scheduler instanceof TrampolineScheduler) { 
        source.subscribe(observer); 
    } else { 
        Scheduler.Worker w = scheduler.createWorker(); 
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); 
    } 
}

这里的线程切换是发生在 ObserveOnObserver 这个对象的里面。

代码语言:javascript
复制
void schedule() { 
    if (getAndIncrement() == 0) { 
        worker.schedule(this); 
    } 
}

schedule 的流程,我们上面分析过,worker.schedule(this) 这行代码就发生了线程切换,是将 this 作为 Runnable 对象提交到了我们指定的(observerOn 传递的)scheduler 中。具体分析,由于之前的流程是在别的线程中,所以想要进行线程切换,最先想到的肯定是 Handler。由于我们传递的是 AndroidSchedulers.mainThread(),所以我们就分析这个吧。

AndroidSchedulers.mainThread() 的实现是 HandlerScheduler。看看它的 schedule 方法:

代码语言:javascript
复制
@Override 
public Disposable schedule(Runnable run, long delay, TimeUnit unit) { 
    if (run == null) throw new NullPointerException("run == null"); 
    if (unit == null) throw new NullPointerException("unit == null"); 
    if (disposed) { 
        return Disposables.disposed(); 
    } 
    run = RxJavaPlugins.onSchedule(run); 
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); 
    Message message = Message.obtain(handler, scheduled); 
    message.obj = this; // Used as token for batch disposal of this worker's runnables. 
    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay))); 
    // Re-check disposed state for removing in case we were racing a call to dispose(). 
    if (disposed) { 
        handler.removeCallbacks(scheduled); 
        return Disposables.disposed(); 
    } 
    return scheduled; 
}

post 了一个 msg,这样就完成了线程的切换。下面上一张图,有助于理解和记忆:

347bfa9d1676021741f5778bcf33b0e1.png
347bfa9d1676021741f5778bcf33b0e1.png

只需要理解,每次 observerOn 和 subscribeOn 的时候,内部都会创建一个新的 observable 和 observer。

。新创建的 observable 会引用前面的 observable,就是代码中我们分析的 source 变量。

。新创建的 observer 会引用前面的 observer,就是代码中我们分析的 actual 变量。

最后我们 subscribe 的时候,是调用的最后创建的 observable 的方法。而每个 observable 内部又调用了 source 的 subscribe 方法,这样就形成了一层一层往前传递的调用链。当调用到最前面的一个 observable 的时候,就是我们自己创建的 observable,在这里我们需要手动触发该与该 observable 对应的 observer 对象的 onNext 方法。而 observer 的 onNext 方法的内部又调用了 actual 的 onNext 方法,这样就形成了一层一层往后传递的调用链。

总结 虽然在我们的例子中,CreateEmitter 并不是一个 observer ,但是它也有 onNext 等方法,可以把它看做一个 observer。

如此,RxJava 的一个流程就理清楚了。这货的流程和 OkHttp 怎么有点像,只是稍微有点不一样。

补充一下关于背压的知识:在异步订阅的时候,使用 Observable,默认的缓冲大小是 128,超过 这个数量之后会 resize,也就是说会缓冲所有的事件,这样就会导致内存占用一直增加。 相关视频 【Android进阶】Rxjava与low响应式编程

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档