Rxjava 2.x 源码系列 - 线程切换 (上)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/gdutxiaoxu/article/details/80577389

Rxjava 2.x 源码系列 - 基础框架分析

Rxjava 2.x 源码系列 - 线程切换 (上)

Rxjava 2.x 源码系列 - 线程切换 (下)

Rxjava 2.x 源码系列 - 变换操作符 Map(上)

前言

在上一篇博客 Rxjava 源码系列 - 基础框架分析,我们分析了 Rxjava 的基础框架。

Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer,并且回调 Observer 的相应的方法。

用一张简单的流程图描述如下:


Observable#subscribeOn(Scheduler)

在 Android 中,我们知道默认都是执行在主线程的,那么 Rxjava 是如何实现线程切换的。

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onComplete();
            }
        })
        .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("TAG", "onSubscribe():  ");
            }

            @Override
            public void onNext(String s) {
                Log.e("TAG", "onNext():  " + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.e("TAG", "onComplete():  ");
            }
        });

我们先来看一下 subscribeOn 方法,可以看到

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    // scheduler 判空
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    // 用 ObservableSubscribeOn 将 scheduler 包装 起来
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

而我们从上一篇博客中知道,当我们调用 observable.subscibe(observable) 的时候,最终会调用到具体的 observable 的实例的 subscribActual 方法。而这里具体的 observable 的实例为 ObservableSubscribeOn。

接下来,我们来看一下 ObservableSubscribeOn 这个类,可以看到继承 AbstractObservableWithUpstream ,而 AbstractObservableWithUpstream 继承 Observable,实现 HasUpstreamObservableSource 这个接口。

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    ---
}


abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;

    /**
     * Constructs the ObservableSource with the given consumable.
     * @param source the consumable Observable
     */
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}

public interface HasUpstreamObservableSource<T> {
    /**
     * Returns the upstream source of this Observable.
     * <p>Allows discovering the chain of observables.
     * @return the source ObservableSource
     */
    ObservableSource<T> source();
}

observableSubscribeOn 的 subscribeActual 方法,跟 ObservableCreate 的 subscribeActual 的套路差不多,它也是 Observable 的一个子类。只不过比 ObservableCreate 多实现了一个接口HasUpstreamObservableSource,这个接口很有意思,他的 source() 方法返回类型是 ObservableSource(还记得这个类的角色吗?)。也就是说 ObservableSubscribeOn 这个 Observable 是一个拥有上游的 Observable 。他有一个非常关键的属性 source,这个 source 就代表了他的上游。

接下来我们一起来看一下 ObservableSubscribeOn 的具体实现

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
}

首先先来看他的构造函数 ,有两个参数 source ,scheduler。

  • source 代表上游的引用,是 Observable 的一个实例
  • scheduler 可以通过 Schedulers.newThread() 或者 Schedulers.io() 创建相应的实例

这里我们先大概了解一下 Scheduler 是个什么东东,Scheduler 里面封装了 Worker 和 DisposeTask,下面会详细讲到。

Schedulers.newThread()

@NonNull
public static Scheduler newThread() {
    return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}


NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
static final class NewThreadTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return NewThreadHolder.DEFAULT;
    }
}
static final class NewThreadHolder {
    static final Scheduler DEFAULT = new NewThreadScheduler();
}
public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}

IO = RxJavaPlugins.initIoScheduler(new IOTask());

static final class IOTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return IoHolder.DEFAULT;
    }
}
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}

我们再回到 ObservableSubscribeOn 的 subscribeActual 方法,在上一篇博客的时候已经讲解 Observable 和 Observer 之间是怎样实现订阅关系的,这里就不再具体展开了。

接下来,我们重点关注这一行代码

 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

我们先来看一下 SubscribeTask 这个类,他是 ObservableSubscribeOn 的一个非静态内部类,可以看到 其实也比较简单,他实现了 Runnable 接口,并且持有 parent 引用。

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

然后在 run 方法中,通过 source.subscribe(parent) 建立联系。因而,当我们的 SubscribeTask 的 run 方法运行在哪个线程,相应的 observer 的 subscribe 方法就运行在哪个线程。

这里可能会有人有疑问,SubscribeTask 没有 source 属性,它是怎么访问到 ObservableSubscribeOn 的属性的。

我们知道 java 中,非静态内部类默认持有外部类的引用,因而他可以正常访问外部类 ObservableSubscribeOn 的 source 属性。


接着,我们再来看一下 scheduler.scheduleDirect 这个方法

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    // 判断 run 是否为 null
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}
  • 首先,创建一个 Worker w
  • 第二步,DisposeTask 将 decoratedRun 包装起来
  • 第三步:w 去调度 task

这里我们以 NewThreadScheduler 为例,来看看这个 Worker 到底是什么?

public Worker createWorker() {
    return new NewThreadWorker(threadFactory);
}



public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    --- 
}


public static ScheduledExecutorService create(ThreadFactory factory) {
    final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
    if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
        ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
        POOLS.put(e, exec);
    }
    return exec;
}

从上面可以看到,其实 worker 里面封装了 executor(线程池),看到这里,相信你也基本明白 Rxjava 线程切换的原理了,其实很简单。

在 ObservableSubscribeOn subscribeActual 方法中, SubscribeTask 包装 parent(SubscribeOnObserver ,包装了 Observer),SubscribeTask 实现了 Runnable 接口,在 run 方法里面调用了 source.subscribe(parent),因而 run 方法所执行的线程将由 worker 决定。这就是 下游决定上游 observable 执行线程的原理。

接下来我们再来看一下:DisposeTask

static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
        final Runnable decoratedRun;
        final Worker w;

        Thread runner;

        DisposeTask(Runnable decoratedRun, Worker w) {
            this.decoratedRun = decoratedRun;
            this.w = w;
        }

        @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }

        @Override
        public void dispose() {
            if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
                ((NewThreadWorker)w).shutdown();
            } else {
                w.dispose();
            }
        }

        @Override
        public boolean isDisposed() {
            return w.isDisposed();
        }

        @Override
        public Runnable getWrappedRunnable() {
            return this.decoratedRun;
        }
    }
}
// 将 新的 Disposable 设置给 parent ,方便取消订阅关系,
//(因为我们对  Observer 进行相应的包装,原来的 parent 的 Disposable 已经不能代表最新的 Disposable)
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

DisposeTask 实现了 Disposable,Runnable ,SchedulerRunnableIntrospection 接口,Disposable 接口主要是用来取消订阅关系的 Disposable。


Observable#subscribeOn(Scheduler) 第一次有限原理

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.i(TAG, "subscribe: getName=" +Thread.currentThread().getName());
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onComplete();
            }
        }) // 进行两次 subscribeOn
        .subscribeOn(Schedulers.io()).subscribeOn(Schedulers.computation()).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("TAG", "onSubscribe():  ");
            }

            @Override
            public void onNext(String s) {
                Log.e("TAG", "onNext():  " + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.e("TAG", "onComplete():  ");
            }
        });

subscribe: getName=RxCachedThreadScheduler-1

如果将上述的 subscribeOn 的顺序置换

subscribeOn(Schedulers.computation()).subscribeOn(Schedulers.io())

那么将打印出

subscribe: getName=RxComputationThreadPool-1

为什么是第一次 Observable#subscribeOn(Scheduler) 才有效呢?

前面我们分析到,Observable#subscribeOn(Scheduler) 实际上是将 Observable#subscribe(Observer) 的操作放在了指定线程,当我们调用 subcribe 的时候,它的过程是从下往上的,即下面的 Observable 调用上面的 Observanle。

所以对于我们上面的第一个例子,他的调用流程是这样的:第三个 Observable 调用 Observable#subscribe(Observer) 启动订阅,在其内部会激活第二个 Observable 的 Observable#subscribe(Observer) 方法,但是此时该方法外部被套入了一个 Schedulers.computation() 线程

于是这个订阅的过程就被运行在了该线程中。用伪代码演示如下

public class Observable {
    // 第「二」个 Observable
    Observable source;
    Observer observer;

    public Observable(Observable source, Observer observer) {
        this.source = source;
        this.observer = observer;
    }

    public void subscribe(Observer Observer) {
        new Thread("computation") {
            @Override
            public void run() {
                // 第「二」个 Observable 订阅
                source.subscribe(observer);
            }
        }
    }
}

再往上走,第二个 Observable 订阅内部会激活第一个 Observable 的 Observable#subscribe(Observer) 方法,同样的,该方法被套在了 Schedulers.io() 线程中,用伪代码演示

public class Observable {
    // 第「一」个 Observable
    Observable source;
    Observer observer;

    public Observable(Observable source, Observer observer) {
        this.source = source;
        this.observer = observer;
    }

    public void subscribe(Observer Observer) {
        new Thread("io") {
            @Override
            public void run() {
                // 第「一」个 Observable 订阅
                source.subscribe(observer);
            }
        }
    }
}

此时到达第一个 Observable 了之后就要开始发射事件了,此时的执行线程很明显是 io 线程。还可以换成 Thread 伪代码来表示。

new Thread("computation") {
    @Override
    public void run() {
        // 第二个 Observable.subscribe(Observer) 的实质
        // 就是切换线程,效果类似如下
        new Thread("io") {
            @Override
            public void run() {
                // 第一个 Observable.subscribe(Observer) 的实质
                // 就是发射事件
                System.out.println("onNext(T)/onError(Throwable)/onComplete() 的执行线程是: " + Thread
                                   .currentThread().getName());
            }
        } .start();
    }
} .start();

总结

用流程图描述如下:


参考博客:

友好 RxJava2.x 源码解析(二)线程切换

下一篇我们将讲解到 observeOn(AndroidSchedulers.mainThread()) 的原理。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏技术博文

PHP的几个常用加密函数

在php的开发过程中,常常需要对部分数据(如用户密码)进行加密 一、加密类型: 1.单向散列加密   就是把任意长度的信息进行散列计算,得到固定长度的输出,这个...

4248
来自专栏Java与Android技术栈

RxJava1 升级到 RxJava2 所踩过的坑

RxJava2 发布已经有一段时间了,是对 RxJava 的一次重大的升级,由于我的一个库cv4j使用了 RxJava2 来尝鲜,但是 RxJava2 跟 Rx...

1493
来自专栏杨逸轩 ' sBlog

Java实现微信跳一跳抓包修改分数

3826
来自专栏小樱的经验随笔

XOR算法的原理和实现

XOR算法的原理和实现 XOR算法这种方法的原理 当一个数A和另一个数B进行异或运算会生成另一个数C,如果再将C和B进行异或运算则C又会还原为A。 相对于其他的...

4227
来自专栏Java与Android技术栈

Cold Observable 和 Hot Observable

Hot Observable 无论有没有 Subscriber 订阅,事件始终都会发生。当 Hot Observable 有多个订阅者时,Hot Observa...

1602
来自专栏三流程序员的挣扎

RxJava 连接操作符

看注释意思是将所有数据按原来的顺序缓存起来,就是不知道观察者什么时候订阅,什么时候解除订阅,所以缓存起来,以后直接用。

2122
来自专栏Python疯子

python hashlib模块

hashlib主要提供字符加密功能,将md5和sha模块整合到了一起,支持md5,sha1, sha224, sha256, sha384, sha512等算法

1002
来自专栏FreeBuf

非对称算法之RSA的签名剖析

数字签名,就是只有信息的发送者才能产生的别人无法伪造的一段数字串,这段数字串同时也是对信息的发送者发送信息真实性的一个有效证明。 不清楚的请自行科普数字签名。本...

2083
来自专栏三流程序员的挣扎

RxJava 错误处理操作符

在源 Observable 遇到错误时,立即停止源 Observable 的数据发送,并用新的 Observable 对象进行新的数据发送。

3032
来自专栏python3

python hashlib模块

hashlib模块:用于加密相关的操作,3.x里代替了md5模块和sha模块,主要提供 SHA1, SHA224, SHA256, SHA384, SHA512...

751

扫码关注云+社区

领取腾讯云代金券