专栏首页androidBlogRxjava 2.x 源码系列 - 线程切换 (下)

Rxjava 2.x 源码系列 - 线程切换 (下)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/gdutxiaoxu/article/details/80599799

Rxjava 2.x 源码系列 - 基础框架分析

Rxjava 2.x 源码系列 - 线程切换 (上)

Rxjava 2.x 源码系列 - 线程切换 (下)

Rxjava 2.x 源码系列 - 变换操作符 Map(上)

前言

在上一篇博客 Rxjava 2.x 源码系列 - 线程切换 (上) 我们讲解到,Observable#subscribeOn 是如何控制上游 Observable 的执行线程的,他的实质是将 Observable#subscribe(Observer) 的操作放在了指定线程,当我们调用 subcribe 的时候,它的过程是从下往上的,即下面的 Observable 调用上面的 Observable。用下面的流程图表示如下。

接下来,我们先来回顾一下,Observable 与 Observer 之间是如何订阅的

简单来说就是,当我们调用 Observable 的 subsribe 方法的时候,会调用当前对应 observbale 的 subscribeActual 方法,在该方法里面,会调用 observer 的 onSubeciber 方法,并调用对应 ObservableOnSubscirbe 的 subcribe 的方法,并将 ObservableEmitter 作为方法参数暴露出去。而 ObservableEmitter 持有我们的 Observer 的引用,当我们调用 ObservableEmitter 的 onNext,onErrot,onComplete 方法的时候,会调用他持有的 Observer 的相应的方法。

这篇博客主要讲解以下问题:

  • observeOn 是如何控制 Observer 的回调线程的

Observable#observeOn 方法

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {

        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

observeOn 的套路跟 Observable.create 方法的套路基本一样,都是先判断是否为空,不为 null,用一个新的类包装起来,并持有上游的引用 source。这里我们的包装类是 ObservableObserveOn。

这里我们来看一下 ObservableObserveOn

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 如果是当前线程,直接低啊用
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            // 否则,通过 worker 的形式调用
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}

从第一篇博客 Rxjava 2.x 源码系列 - 基础框架分析,我们知道,当我们调用 Observable.subscibe(observer) 方法的时候,会调用到 对应的 Observable 实例的 subscribeActual 方法,而这里我们的 Observable 为 ObservableObserveOn 。

在 ObservableObserveOn.subscribeActual 方法中,首先会判断 scheduler instanceof TrampolineScheduler (是否是当前线程),true 的话,会直接调用 source.subscribe(observer)。否则,先用 ObserveOnObserver 包装 observer,再调用 source.subscribe 方法

接下来,我们一起来看一下 ObserveOnObserver 类

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

     final Observer<? super T> actual;


    }

public abstract class BasicIntQueueDisposable<T>
extends AtomicInteger
implements QueueDisposable<T> {

ObserveOnObserver 继承于 BasicIntQueueDisposable,实现 Observer, Runnable 接口,而 BasicIntQueueDisposable extends AtomicInteger ,是原子操作类。

其中,还有一个很重要的属性 actual ,即是实际的 observer。

接下来,我们来看一下几个重要的方法:

onNext,onError,onComplete,onSubscribition

public void onSubscribe(Disposable s) {
    if (DisposableHelper.validate(this.s, s)) {

            -------

            if (m == QueueDisposable.SYNC) {
                sourceMode = m;
                queue = qd;
                done = true;
                actual.onSubscribe(this);
                schedule();
                return;
            }
            if (m == QueueDisposable.ASYNC) {
                sourceMode = m;
                queue = qd;
                actual.onSubscribe(this);
                return;
            }
        }

        queue = new SpscLinkedArrayQueue<T>(bufferSize);

        actual.onSubscribe(this);
    }
}

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

     ------
    schedule();
}

@Override
public void onError(Throwable t) {
    if (done) {
        RxJavaPlugins.onError(t);
        return;
    }
    ----

    schedule();
}

@Override
public void onComplete() {
    if (done) {
        return;
    }
     ----

    schedule();
}

在 onNext,onError,onComplete 方法中首先都会先判断是否 done,如果没有的话,会调用 schedule() 方法。

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

而在 schedule() 方法中,直接调用 Worker 的 schedule 方法,这样就会执行我们当前 ObserveOnObserver 的 run 方法,

public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

在 drainFused 和 drainNormal 方法中,会根据状态去调用 actual(外部传入的 observer) 的 onNext、onError、onComplete 方法。因此 observer 的回调所在的线程将取决于外部传入的 scheduler 的 schedule 方法所在的线程。

假设我们传入的是 observeOn(AndroidSchedulers.mainThread())

public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    ---
}
private static final class HandlerWorker extends Worker {
    private final Handler handler;

    private volatile boolean disposed;

    HandlerWorker(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        if (disposed) {
            return Disposables.disposed();
        }

        run = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.

        handler.sendMessageDelayed(message, unit.toMillis(delay));

        // Re-check disposed state for removing in case we were racing a call to dispose().
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }

        return scheduled;
    }

}

从上面的分析我们知道 observer 的回调所在的线程将取决于外部传入的 scheduler 的 schedule 方法所在的线程。即 指定 observeOn(AndroidSchedulers.mainThread()) 之后,将取决于 HandlerWorker 的 schedule 方法执行的线程,在该方法中,很明显执行于主线程。


总结

控制 Observer 的回调实际是放到 ObservableObserveOn 的 run 方法中,即 ObservableObserveOn 的 run 执行在主线程, Observer 的回调也发生在主线程,而 ObservableObserveOn 的 run 执行在哪个线程,取决于 外部传入的 scheduler。因此, 当外部传入的 scheduler 的 schedule 方法在主线程,那么 observer 也在主线程回调。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Android 面试必备 - 线程

    java thread的运行周期中, 有几种状态, 在 java.lang.Thread.State 中有详细定义和说明:

    用户2965908
  • Git 配置别名 —— 让命令变得更简单

    版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/gdutxiaoxu/article/de...

    用户2965908
  • Android 常用正则表达式

    版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/gdutxiaoxu/article/de...

    用户2965908
  • 跳表(skiplist)的原理及concurrentskiplistmap的源码学习

    本文分为两个部分,第一个是对跳表(SKipList)这种数据结构的介绍,第二部分则是对Java中ConcurrentSkilListMap的源码解读.

    呼延十
  • 编写优雅代码的最佳实践

    Robert Martin曾说过"在代码阅读中说脏话的频率是衡量代码质量额唯一标准"。同时,代码的写法应当使别人理解它所需的时间最小化,也就是说我们写的代码是给...

    木可大大
  • 编写优雅代码的最佳实践

    Robert Martin曾说过"在代码阅读中说脏话的频率是衡量代码质量额唯一标准"。同时,代码的写法应当使别人理解它所需的时间最小化,也就是说我们写的代码是给...

    木可大大
  • P2P接口Booth乘法器设计描述原理代码实现

    月见樽
  • C++ CreateThread的使用

    HANDLE WINAPI CreateThread( In_opt LPSECURITY_ATTRIBUTES lpThreadAttributes...

    包子388321
  • Mybatis 文档(二)

    这条语句提供了一种可选的查找文本功能。如果没有传入“title”,那么所有处于“ACTIVE”状态的BLOG都会返回;反之若传入了“title”,那么就会对“t...

    Remember_Ray
  • Mybatis 动态 SQL

    Mybatis 动态SQL,通过 ●if ●choose (when, otherwise) ●trim (where, set) ●foreach ...

    赵哥窟

扫码关注云+社区

领取腾讯云代金券