本文作者JasonChen,原文地址: http://chblog.me/2018/12/19/rxjava2%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90(%E4%B8%80)/
前一篇文章我们讲述到RxJava2 的内部设计模式与原理机制,包括观察者模式和装饰者模式,其本质上都是RxJava2的事件驱动,那么本篇文章将会讲到RxJava2 的另外一个重要功能:异步。
依旧是从源码实现开始,带着疑惑去读,前一篇文章我们讲到subcribeOn方法内部的实现涉及线程池:Scheduler.Worker w = scheduler.createWorker()
这边涉及两个重要组件:
1public final class Schedulers {
2 @NonNull
3 static final Scheduler SINGLE;
4
5 @NonNull
6 static final Scheduler COMPUTATION;
7
8 @NonNull
9 static final Scheduler IO;
10
11 @NonNull
12 static final Scheduler TRAMPOLINE;
13
14 @NonNull
15 static final Scheduler NEW_THREAD;
一共有如下的五种调度器,分别对应不同的场景,当然企业可以针对自身的场景设置自己的调度器。
computation调度器针对大量计算场景,在后端并发场景会更多的用到,那么其是如何实现的呢?接下来带着疑惑进行源码分析。
1 public final class ComputationScheduler extends Scheduler implements SchedulerMultiWorkerSupport {
2 // 资源池
3 final AtomicReference<FixedSchedulerPool> pool;
4
5 // 这是computationScheduler类中实现的createWork()方法
6 public Worker createWorker() {
7 // 创建EventLoop工作者,入参是一个PoolWorker
8 return new EventLoopWorker(pool.get().getEventLoop());
9 }
10
11 static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
12 final int cores;
13 // 资源池工作者,每个工作者其实都是一个定时线程池
14 final PoolWorker[] eventLoops;
15 long n;
16 // 对应前面的函数调用
17 public PoolWorker getEventLoop() {
18 int c = cores;
19 if (c == 0) {
20 return SHUTDOWN_WORKER;
21 }
22 // simple round robin, improvements to come
23 // 这里其实就是从工作者数组中轮询选出一个工作者
24 这里其实拥有提升和优化的空间,这里笔者可能会向开源社区提交一个pr
25 以此进行比较好的调度器调度
26 return eventLoops[(int)(n++ % c)];
27 }
28 // 此处是一个简单的封装
29 static final class PoolWorker extends NewThreadWorker {
30 PoolWorker(ThreadFactory threadFactory) {
31 super(threadFactory);
32 }
33 }
34
35 public class NewThreadWorker extends Scheduler.Worker implements Disposable {
36 private final ScheduledExecutorService executor;
37
38 volatile boolean disposed;
39
40 public NewThreadWorker(ThreadFactory threadFactory) {
41 // 进行定时线程池的初始化
42 executor = SchedulerPoolFactory.create(threadFactory);
43 }
44
45 public static ScheduledExecutorService create(ThreadFactory factory) {
46 final ScheduledExecutorService exec =
47 // 初始化一个定时线程池
48 Executors.newScheduledThreadPool(1, factory);
49 tryPutIntoPool(PURGE_ENABLED, exec);
50 return exec;
51 }
上述代码清晰的展示了computation调度器的实现细节,这里需要说明的是定时线程池的core设置为1,线程池的个数最多为cpu数量,这里涉及到ScheduledThreadPoolExecutor定时线程池的原理,简单的说起内部是一个可自动增长的数组(队列)类似于ArrayList,也就是说队列永远不会满,线程池中的线程数不会增加。
接下来结合订阅线程和发布线程分析其之间如何进行沟通的本质。
发布线程在上一篇的文章已经提到,内部是一个worker,那么订阅线程也是么,很显然必须是的,接下来我们来看下源代码:
1// 还是从subscribeActul开始(原因见上一篇文章)
2public void subscribeActual(Subscriber<? super T> s) {
3 Worker worker = scheduler.createWorker();
4
5 if (s instanceof ConditionalSubscriber) {
6 source.subscribe(new ObserveOnConditionalSubscriber<T>(
7 (ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
8 } else {
9 //
10 source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
11 }
12}
其内部封装了一个ObserveOnsubcriber
,这是个对下流订阅者的封装,主要什么作用呢,为什么要这个呢?其实这个涉及订阅线程内部的机制,接着看源代码了解其内部机制。
1 // 基类
2 abstract static class BaseObserveOnSubscriber<T> extends BasicIntQueueSubscription<T>
3 implements FlowableSubscriber<T>, Runnable {
4 private static final long serialVersionUID = -8241002408341274697L;
5
6 final Worker worker;
7
8 final boolean delayError;
9
10 final int prefetch;
11
12 //...
13
14 @Override
15 public final void onNext(T t) {
16 if (done) {
17 return;
18 }
19 if (sourceMode == ASYNC) {
20 trySchedule();
21 return;
22 }
23
24 if (!queue.offer(t)) {
25 upstream.cancel();
26
27 error = new MissingBackpressureException("Queue is full?!");
28 done = true;
29 }
30 // 开启订阅者线程池模式的调度,具体实现在子类中实现
31 trySchedule();
32 }
33
34 @Override
35 public final void onError(Throwable t) {
36 if (done) {
37 RxJavaPlugins.onError(t);
38 return;
39 }
40 error = t;
41 done = true;
42 trySchedule();
43 }
44
45 @Override
46 public final void onComplete() {
47 if (!done) {
48 done = true;
49 trySchedule();
50 }
51 }
52
53 // 这里并没有向上传递request请求,而是把自己当做数据发射者进行request计数
54 @Override
55 public final void request(long n) {
56 if (SubscriptionHelper.validate(n)) {
57 BackpressureHelper.add(requested, n);
58 // 开启调度
59 trySchedule();
60 }
61 }
62
63 // 调度代码
64 final void trySchedule() {
65 // 上一篇文章讲过这个的用法
66 if (getAndIncrement() != 0) {
67 return;
68 }
69 // 启用一个work来进行任务的执行 this对象说明实现了runable接口
70 worker.schedule(this);
71 }
72
73 // 调度实现的代码
74 @Override
75 public final void run() {
76 if (outputFused) {
77 runBackfused();
78 } else if (sourceMode == SYNC) {
79 runSync();
80 } else {
81 // 一般会调用runAsync方法
82 runAsync();
83 }
84 }
85
86 abstract void runBackfused();
87
88 abstract void runSync();
89
90 abstract void runAsync();
91 //...
92 }
当上游的装饰者(上一篇提到的装饰者模式)调用onNext方法时,这时并没有类似的去调用下游的onNext方法,那这个时候其实就是订阅者线程模式的核心原理:采用queue队列进行数据的store,这里尝试将数据放进队列。
ObserveOnSubscriber的具体实现类部分实现如下。
1 static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
2 implements FlowableSubscriber<T> {
3
4 private static final long serialVersionUID = -4547113800637756442L;
5
6 final Subscriber<? super T> downstream;
7
8 ObserveOnSubscriber(
9 Subscriber<? super T> actual,
10 Worker worker,
11 boolean delayError,
12 int prefetch) {
13 super(worker, delayError, prefetch);
14 this.downstream = actual;
15 }
16
17 //这是上游回调这个subscriber时调用的方法,详情见上一篇文章
18 @Override
19 public void onSubscribe(Subscription s) {
20 if (SubscriptionHelper.validate(this.upstream, s)) {
21 this.upstream = s;
22
23 if (s instanceof QueueSubscription) {
24 @SuppressWarnings("unchecked")
25 QueueSubscription<T> f = (QueueSubscription<T>) s;
26
27 int m = f.requestFusion(ANY | BOUNDARY);
28
29 if (m == SYNC) {
30 sourceMode = SYNC;
31 queue = f;
32 done = true;
33
34 downstream.onSubscribe(this);
35 return;
36 } else
37 if (m == ASYNC) {
38 sourceMode = ASYNC;
39 queue = f;
40
41 downstream.onSubscribe(this);
42
43 s.request(prefetch);
44
45 return;
46 }
47 }
48 // 设置缓存队列
49 // 这里涉及一个特别之处就是预获取(提前获取数据)
50 queue = new SpscArrayQueue<T>(prefetch);
51 // 触发下游subscriber 如果有request则会触发下游对上游数据的request
52 downstream.onSubscribe(this);
53 // 请求上游数据 上面的代码和这行代码就是起到承上启下的一个作用,也就是预获取,放在队列中
54 s.request(prefetch);
55 }
56 }
57
58 //...
下面看一下抽象方法runAsync()
的实现。
1 @Override
2 void runAsync() {
3 int missed = 1;
4
5 final Subscriber<? super T> a = downstream;
6 final SimpleQueue<T> q = queue;
7
8 long e = produced;
9
10 for (;;) {
11
12 long r = requested.get();
13
14 while (e != r) {
15 boolean d = done;
16 T v;
17
18 try {
19 // 获取数据
20 v = q.poll();
21 } catch (Throwable ex) {
22 Exceptions.throwIfFatal(ex);
23
24 cancelled = true;
25 upstream.cancel();
26 q.clear();
27
28 a.onError(ex);
29 worker.dispose();
30 return;
31 }
32
33 boolean empty = v == null;
34
35 if (checkTerminated(d, empty, a)) {
36 return;
37 }
38
39 if (empty) {
40 break;
41 }
42
43 a.onNext(v);
44
45 e++;
46 // limit = prefetch - (prefetch >> 2)
47 // prefetch = BUFFER_SIZE(上一篇文章提到的默认128)
48 if (e == limit) {
49 if (r != Long.MAX_VALUE) {
50 r = requested.addAndGet(-e);
51 }
52 upstream.request(e);
53 e = 0L;
54 }
55 }
56
57 if (e == r && checkTerminated(done, q.isEmpty(), a)) {
58 return;
59 }
60
61 // 下面的代码机制在上一篇讲过主要涉及异步编程技巧
62 int w = get();
63 if (missed == w) {
64 produced = e;
65 missed = addAndGet(-missed);
66 if (missed == 0) {
67 break;
68 }
69 } else {
70 missed = w;
71 }
72 }
73 }
74 //...
75 }
前面说过,订阅者把自己当成一个发射者,那数/据从哪里来呢,而且还要持续有数据,那么后面的代码说明了数据来源,当数据达到limit,开始新的数据的prefetch,每次preftch的数量是limit。
为何要将订阅者这样区别设置呢,其实原因很简单,订阅者和发布者需要不同的线程机制异步地执行,比如订阅者需要computation的线程机制来进行大量的耗时数据计算,但又要保持一致的装修者模式,所以源码的做法是订阅者这边打破回调的调用流,采用数据队列进行两个线程池之间的数据传送。
笔者喜欢总结,总结意味着我们反思和学习前面的知识点,应用点以及自身的不足。