前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava源码浅析(三): subscribeOn线程切换和多次切换

RxJava源码浅析(三): subscribeOn线程切换和多次切换

原创
作者头像
笔头
发布2022-03-20 12:59:16
1.7K0
发布2022-03-20 12:59:16
举报
文章被收录于专栏:Android记忆Android记忆

一、subscribeOn

这篇不仅看下subscribeOn线程切换本身,我们还要研究下多次subscribeOn为啥只有第一次有效。

代码语言:javascript
复制
//上游-被观察者
Observable<Integer> myobservable=Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
        Log.e("subscribe",Thread.currentThread().getId()+"");
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();

    }
});
//下游-观察者
Observer myobserver=new Observer<Integer>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Disposable dd=d;

        Log.e("onSubscribe",Thread.currentThread().getId()+"");
    }
    @Override
    public void onNext(Integer integer) {
        Log.e("onNext",Thread.currentThread().getId()+"--"+integer+"");
    }
    @Override
    public void onError(Throwable t) {
    }

    @Override
    public void onComplete() {
    }
};

//关联上游和下游
myobservable.subscribeOn(Schedulers.newThread()).subscribeOn(AndroidSchedulers.mainThread()).subscribe(myobserver);

先看下Schedulers.newThread()这个到底是做了什么。

通过查看,我们得知Schedulers.newThread()最终创建了NewThreadScheduler类,看名称和newThread很对应。

(顺便说下,如果切换其他线程,比如subscribeOn(Schedulers.io()),那他最终创建的是IoScheduler类,是不是很好记。)

NewThreadScheduler这个类先放在这,记住就好。

接下来我们看下subscribeOn方法,这个方法最终是创建了ObservableSubscribeOn类,他继承了Observable。

代码语言:javascript
复制
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
        observer.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream;
        final AtomicReference<Disposable> upstream;
        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<>();
        }
        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }
        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

        ..........
        @Override
        public void dispose() {
            DisposableHelper.dispose(upstream);
            DisposableHelper.dispose(this);
        }
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
}

有了前面文章的基础,我们应该清楚基本流程了。我们调用subscribe(myobserver)方法,实际调用ObservableSubscribeOn的subscribeActual方法,里面创建了SubscribeOnObserver,包装了myobserver。我们主要看下scheduleDirect方法,看下源码,最终调用的是接口Schedule的scheduleDirect:

代码语言:javascript
复制
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}

上面我们知道,当前Schedule的具体实现类是NewThreadScheduler,createWorker方法具体实现在NewThreadScheduler中,我们看下createWorker

代码语言:javascript
复制
public Worker createWorker() {
    return new NewThreadWorker(threadFactory);
}
代码语言:javascript
复制
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }

createWorker是创建了一个线程池。当前的Runnable是谁?慢慢调试,我们发现是SubscribeTask【我们还发现有一个DisposeTask,这个包装了SubscribeTask,添加了控制方法,控制了SubscribeTask生命周期。具体可以看下DisposeTask类,这里就不细看了】,其run方法执行了source.subscribe(parent);这句话,有了千年两篇文章的基础,我们知道他直接执行了

此时的myobserver是SubscribeOnObserver,接下来我们应该都知道发生了什么。但是在哪个线程执行的呢?我们知道scheduler.scheduleDirect(new SubscribeTask(parent))这句就是创建了一个线程池,里面只有一个线程,执行了SubscribeTask这个Runnable,这个线程中执行了source.subscribe(parent);

所以myobservable.subscribeOn(Schedulers.newThread()).subscribe(myobserver);这句是myobservable和myobserver都在新线程中运行

上面截图我们知道,onSubscribe不在新线程中执行。

我们简单总结下subscribeOn(Schedulers.newThread()),就是在创建新线程中执行订阅分发。

二、多次subscribeOn

我们来个调皮的操作,我们现在多次调用subscribeOn

代码语言:javascript
复制
myobservable.subscribeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.newThread()).subscribe(myobserver);

那你说订阅在哪个线程?主线程还是新线程?

没事,我们来看下源码,主要来看ObservableSubscribeOn

我们知道,subscribeOn这个操作符就是将上一层的ObservableSource(就是上一层的Observable)放到一个新的线程去发射元素。上面执行了两次subscribeOn,第一次会把订阅放在新线程中,第二次会把订阅放在主线程中,最终订阅是在主线程中执行。

这里我们先得出一个结论,多次subscribeOn,以第一个subscribeOn为准。

我们现在知道RxJava是逆向向上调用的,那我们就一步一步的调代码看看。

第一次.subscribe(myobserver)的时候

第一个上游

上游是subscribeOn(Schedulers.newThread()),直接看ObservableSubscribeOn的subscribeActual方法

代码语言:javascript
复制
@Override
public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

    observer.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

此时的observer是myobserver,observer.onSubscribe(parent);直接调用myobserver的onSubscribe实现

然后我们知道subscribeOn(Schedulers.newThread()) 是在新线程中执行一个runnble,也就是SubscribeTask。这个里面会调用source.subscribe(parent);,也就是上游的subscribe,此时上游是subscribeOn(AndroidSchedulers.mainThread()),

第二个上游

现在开始走第二个上游,他也是ObservableSubscribeOn。 同样,我们也重点看subscribeActual方法。

此时的observer是下游,也就是subscribeOn(Schedulers.newThread())创建的SubscribeOnObserver(命名为AObserver),那observer.onSubscribe(parent);直接调用AObserver的onSubscribe实现,但是此时SubscribeOnObserver中的onSubscribe

有具体实现,那就执行它。

代码语言:javascript
复制
@Override
public void onSubscribe(Disposable d) {
    DisposableHelper.setOnce(this.upstream, d);
}

同样,我们知道subscribeOn(AndroidSchedulers.mainThread()) 是在主线程中执行一个runnble,也就是SubscribeTask。这个里面会调用source.subscribe(parent);,此时上游source是ObservableCreate,那就是ObservableCreate在AndroidSchedulers.mainThread()线程中执行任务,有了前篇讲解,我们以已经了解了Rxjava基础订阅流程,知道了ObservableCreate如何执行任务,只不过我们现在是在指定线程中执行。

最上流ObservableCreate

那此时的Observer是谁?是myobserver吗?不是!是subscribeOn(AndroidSchedulers.mainThread())中创建的SubscribeOnObserver(命名为BObserver)。

代码语言:javascript
复制
public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

此时的observer是BObserver,observer.onSubscribe(parent);也就是调用

代码语言:javascript
复制
@Override
public void onSubscribe(Disposable d) {
    DisposableHelper.setOnce(this.upstream, d);
}

source.subscribe(parent);就是调用ObservableOnSubscribe的具体实现。执行ObservableOnSubscribe中的onNext方法,继而调用BObserver的onNext方法。BObserver就是SubscribeOnObserver中的SubscribeOnObserver中的myobserver结构。

三、总结

对于OnSubscribe方法而言,不管subscribeOn怎么切换线程,他都不受影响,他是最先开始执行的且只执行一次,只针对最下游有效,对于订阅而言,线程切换只是改变当前observer的所属线程,最后一个更改才算数(写法是第一个,执行流程是最后一个)。

这篇文章一些结论属于个人猜想,有说的不对的我及时纠正。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、subscribeOn
  • 二、多次subscribeOn
  • 三、总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档