前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava subscribeOn和observeOn源码介绍

RxJava subscribeOn和observeOn源码介绍

作者头像
103style
发布2022-12-19 13:24:35
3430
发布2022-12-19 13:24:35
举报

转载请以链接形式标明出处: 本文出自:103style的博客

Base on RxJava 2.X

简介

首先我们来看subscribeOnobserveOn这两个方法的实现:

subscribeOn

代码语言:javascript
复制
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

observeOn

代码语言:javascript
复制
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
    return observeOn(scheduler, delayError, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

我们可以看到分别返回了ObservableSubscribeOnObservableObserveOn对象,下面对这两个类分别介绍。


ObservableSubscribeOn 源码解析
代码语言:javascript
复制
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        observer.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    ....
}

通过之前的 Rxjava之create操作符源码解析 的介绍,我们知道subscribe(observer)实际上是调用前一步返回对象的subscribeActual(observer);方法。

这里首先构造了一个 SubscribeOnObserver对象,然后执行 观察者onSubscribe 方法。

然后将在传入的Scheduler中执行任务完成返回的结果传入 SubscribeOnObserversetDisposable方法。

scheduler.scheduleDirect(new SubscribeTask(parent)),这里通过之前 RxJava之Schedulers源码介绍 我们知道,实际时候执行了 SubscribeTask(parent)run方法。通过下面的源代码source.subscribe(parent),我们知道 实际上 run 方法 就是 调用了subscribeOn前一步操作符返回对象的 subscribeActual(observer);方法。

代码语言:javascript
复制
final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

SubscribeOnObserver源码:

代码语言:javascript
复制
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    private static final long serialVersionUID = 8094547886072529208L;
    final Observer<? super T> downstream;
    final AtomicReference<Disposable> upstream;

    SubscribeOnObserver(Observer<? super T> downstream) {
        this.downstream = downstream;
        this.upstream = new AtomicReference<Disposable>();
    }

    @Override
    public void onSubscribe(Disposable d) {
        DisposableHelper.setOnce(this.upstream, d);
    }

    @Override
    public void onNext(T t) {
        downstream.onNext(t);
    }

    @Override
    public void onError(Throwable t) {
        downstream.onError(t);
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
    }
    ...
    void setDisposable(Disposable d) {
        DisposableHelper.setOnce(this, d);
    }
}

我们可以看到 onNextonErroronComplete 实际上还是调用了 观察者的 对应方法。

DisposableHelper.setOnce(this, d); 即为设置SubscribeOnObservervalue值为线程池执行的任务结果。

代码语言:javascript
复制
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
    ObjectHelper.requireNonNull(d, "d is null");
    if (!field.compareAndSet(null, d)) {
        d.dispose();
        if (field.get() != DISPOSED) {
            reportDisposableSet();
        }
        return false;
    }
    return true;
}

我们来个示例介绍下:

代码语言:javascript
复制
Observable
        .create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                System.out.println("subscribe = " + Thread.currentThread().getName());
                for (int i = 0; i < 3; i++) {
                    emitter.onNext(String.valueOf(i));
                }
                emitter.onComplete();
            }
        })
        .subscribeOn(Schedulers.single())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext s = " + s + " thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete thread name = " + Thread.currentThread().getName());
            }
        });

输出结果:

代码语言:javascript
复制
onSubscribe thread name = main
subscribe = RxSingleScheduler-1
onNext s = 0 thread name = RxSingleScheduler-1
onNext s = 1 thread name = RxSingleScheduler-1
onNext s = 2 thread name = RxSingleScheduler-1
onComplete thread name = RxSingleScheduler-1

通过输出结果我们可以看到 任务处理都是在 Schedulers.single()构建的线程池中执行的。 现在来一步一步介绍,顺便复习一下: 流程图大致如下:

SubscribeOnObserver示例流程图
SubscribeOnObserver示例流程图
  • (1.0) create 操作符 返回的是 ObservableCreate对象。
  • (2.0) 然后 ObservableCreate.subscribeOn(Schedulers.single())返回 sourceObservableCreateschedulerSingleSchedulerObservableSubscribeOn对象。
  • (3.0) 然后 ObservableSubscribeOn.subscribe(new Observer<T>(){}),即调用 ObservableSubscribeOnsubscribeActual(observer)
  • (4.0) 然后执行 observer.onSubscribe(parent);,即执行观察者的 onSubscribe(...)方法。
  • (5.0) 接着在SingleScheduler构建的线程池中执行 SubscribeTaskrun方法(source.subscribe(parent))。 即执行 ObservableCreate.subscribe(new SubscribeOnObserver<T>(observer))。 即为 ObservableCreate.subscribeActual(new SubscribeOnObserver<T>(observer))
  • (6.0) 然后执行 SubscribeOnObserveronSubscribe(...)
  • (7.0) 然后执行create操作符传进来的ObservableOnSubscribesubscribe(ObservableEmitter<String> emitter)方法。
  • (8.0) 接着我们在subscribe(...)中依次执行了 三次onNext和 一次onComplete。 即调用 new SubscribeOnObserver<T>(observer)的三次onNext和 一次onComplete。 即为subscribe传入的observer的三次onNext和 一次onComplete

ObservableObserveOn 源码解析
  • observeOn函数中的bufferSize,在2.X中默认为 128.
代码语言:javascript
复制
public static int bufferSize() {
    return Flowable.bufferSize();
}
public static int bufferSize() {
    return BUFFER_SIZE;
}
static final int BUFFER_SIZE;
static {
    BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}

observeOn 方法:

代码语言:javascript
复制
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

ObservableObserveOn主要的方法:

代码语言:javascript
复制
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    ...
}
  • 赋值source为链式调用上一步返回的对象。
  • 保存传进来的 SchedulerdelayErrorbufferSize的值。
  • 然后在 subscribe 的时候 调用 subscribeActual, 先判断 scheduler是否是 TrampolineScheduler的子类:
    • 是的话直接把 observer 传给 链式调用上一步返回的对象的 subscribeActual方法。
    • 不是的话 就把observer 包装成一个ObserveOnObserver 对象传给 链式调用上一步返回的对象的 subscribeActual方法。
  • 通过上面 subscribeOn 的介绍, 我们知道接下来就是调用 观察者的 onSubscribe 方法,以及后续的调用逻辑 onNextonComplete以及onError,即ObserveOnObserver 对象对应的方法。

接下来我们看看 ObserveOnObserver 的源码:

代码语言:javascript
复制
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
    final Observer<? super T> downstream;
    final Scheduler.Worker worker;
    final boolean delayError;
    final int bufferSize;
    ...
    ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
        this.downstream = actual;
        this.worker = worker;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.validate(this.upstream, d)) {
            ...
            queue = new SpscLinkedArrayQueue<T>(bufferSize);
            downstream.onSubscribe(this);
        }
    }
    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);
        }
        schedule();
    }

    @Override
    public void onError(Throwable t) {
        if (done) {
            RxJavaPlugins.onError(t);
            return;
        }
        error = t;
        done = true;
        schedule();
    }

    @Override
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;
        schedule();
    }
    ...
}

重写的onSubscribe 即调用观察者的 onSubscribe

onNextonErroronComplete都是调用 schedule()

我们来看看schedule()的实现:即在传进来的 Scheduler 对象构建的线程池里执行当前类的 run()

代码语言:javascript
复制
void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

run()的代码实现:

代码语言:javascript
复制
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

outputFused 默认是 false,我们看看 drainNormal()的代码实现: 当outputFusedtrue是,则下面调用的onNext 改成 onComplete

代码语言:javascript
复制
void drainNormal() {
    int missed = 1;
    final SimpleQueue<T> q = queue; //1.0
    final Observer<? super T> a = downstream;
    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }
        for (;;) {
            boolean d = done;
            T v;
            try {
                v = q.poll();//2.0
            } catch (Throwable ex) {
                ...
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;
            if (checkTerminated(d, empty, a)) {//2.1
                return;
            }
            if (empty) {//2.2
                break;
            }
            a.onNext(v);//2.3
        }
        missed = addAndGet(-missed);//3.0
        if (missed == 0) {//3.1
            break;
        }
    }
}
  • (1.0): 我们在上面的 onNext() 中看到,每次调用都会把传入的对象存入queue中。
  • (2.0): 在循环中依次获取存入的对象,(2.1)如果 已经是done状态 或者 disposed则直接结束。(2.2)如果 队列中没有对象了,即终止循环。(2.3)否则调用 观察者onNext 方法。
  • (3.0): addAndGet(-missed);即通过原子操作把·missed·的值置为0(3.1)然后结束onNext

来我们继续举个例子:给subscribeOn例子加上observeOn 方法:

代码语言:javascript
复制
Observable
        .create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                System.out.println("subscribe = " + Thread.currentThread().getName());
                for (int i = 0; i < 5; i++) {
                    emitter.onNext(String.valueOf(i));
                }
                emitter.onComplete();
            }
        })
        .subscribeOn(Schedulers.single())
        .observeOn(Schedulers.io())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("d.classname = " + d.getClass().getSimpleName());
                System.out.println("onSubscribe thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext s = " + s + " thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete thread name = " + Thread.currentThread().getName());
            }
        });

输出结果:

代码语言:javascript
复制
System.out: d.classname = ObserveOnObserver
System.out: onSubscribe thread name = main
System.out: subscribe = RxSingleScheduler-1
System.out: onNext s = 0 thread name = RxCachedThreadScheduler-1
System.out: onNext s = 1 thread name = RxCachedThreadScheduler-1
System.out: onNext s = 2 thread name = RxCachedThreadScheduler-1
System.out: onComplete thread name = RxCachedThreadScheduler-1

通过输出结果我们可以看到 :

  • create操作符 传入的ObservableOnSubscribesubscribe方法是在Schedulers.single()构建的线程池中执行的。
  • onNextonComplete 则是在Schedulers.io()构建的线程池中执行的 。

继续来看下subscribeOn流程图:

SubscribeOnObserver示例流程图
SubscribeOnObserver示例流程图

上述示例相对于 subscribeOn来说只是 把 subscribe(observer) 里得参数改成了 ObserveOnObserver对象。

(4.0:) 执行ObserveOnObserveronSubscribe方法。即observer.onSubscribe(ObserveOnObserver) 即下面方法的 Disposable对象为ObserveOnObserver对象。

代码语言:javascript
复制
new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
     ...
});

(5.0:)SingleScheduler构建的线程池中执行source.subscribe(parent);,即运行如下代码:

代码语言:javascript
复制
ObservableCreate.subscribeActual(
        new ObserveOnObserver<T>(
                observer,
                new EventLoopWorker(new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory)),
                delayError,
                bufferSize)
);

我们再来回顾下ObservableCreate.subscribeActual(observer)

代码语言:javascript
复制
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
    ...
    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException(...));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }
    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }
    ...
}

(8.0:) 所以调用 onNext(T t)onComplete()即调用 ObserveOnObserver对象的 onNext(T t)onComplete()。 即切换到Schedulers.io()构建的线程池执行onNext(T t)onComplete()


小结

subscribeOn返回得即ObservableSubscribeOn对象。 ObservableSubscribeOnsubscribeActual即为在 传入的 XXXScheduler中 执行 上一步返回对象的 subscribeActual方法。

observeOn返回得即ObservableObserveOn对象。 ObservableObserveOnsubscribeActual即为把 传入的 XXXSchedulerobserver包装成一个 Observer 传给上一步返回对象的 subscribeActual方法,让 onNextonCompleteonNext都在传入的 XXXScheduler 构建的线程池中执行。

所以,你知道RxJava是如何完成线程切换的了吗?

以上

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-06-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • ObservableSubscribeOn 源码解析
  • ObservableObserveOn 源码解析
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档