前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入RxJava2 源码解析(二)

深入RxJava2 源码解析(二)

作者头像
aoho求索
发布2019-03-07 11:18:00
6890
发布2019-03-07 11:18:00
举报
文章被收录于专栏:aoho求索aoho求索aoho求索

本文作者JasonChen,原文地址: http://chblog.me/2018/12/19/rxjava2%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90(%E4%B8%80)/

前一篇文章我们讲述到RxJava2 的内部设计模式与原理机制,包括观察者模式和装饰者模式,其本质上都是RxJava2的事件驱动,那么本篇文章将会讲到RxJava2 的另外一个重要功能:异步。

RxJava2 深入解析

依旧是从源码实现开始,带着疑惑去读,前一篇文章我们讲到subcribeOn方法内部的实现涉及线程池:Scheduler.Worker w = scheduler.createWorker() 这边涉及两个重要组件:

  1. scheduler调度器
  2. 自定义线程池

scheduler调度器源码解析

 1public final class Schedulers {
 2    @NonNull
 3    static final Scheduler SINGLE;
 4
 5    @NonNull
 6    static final Scheduler COMPUTATION;
 7
 8    @NonNull
 9    static final Scheduler IO;
10
11    @NonNull
12    static final Scheduler TRAMPOLINE;
13
14    @NonNull
15    static final Scheduler NEW_THREAD;

一共有如下的五种调度器,分别对应不同的场景,当然企业可以针对自身的场景设置自己的调度器。

  • SINGLE,针对单一任务设置的单个定时线程池
  • COMPUTATION,针对计算任务设置的定时线程池的资源池(数组)
  • IO,针对IO任务设置的单个可复用的定时线程池
  • TRAMPOLINE,trampoline翻译是蹦床(佩服作者的脑洞)。这个调度器的源码注释是:任务在当前线程工作(不是线程池)但是不会立即执行,任务会被放入队列并在当前的任务完成之后执行。简单点说其实就是入队然后慢慢线性执行(这里巧妙的方法其实和前面我们所讲的回压实现机制基本是一致的,值得借鉴)
  • NEW_THREAD,单个的周期线程池和single基本一致唯一不同的是single对thread进行了一个简单的NonBlocking封装,这个封装从源码来看基本没有作用,只是一个marker interface标志接口
computation调度器源码分析

computation调度器针对大量计算场景,在后端并发场景会更多的用到,那么其是如何实现的呢?接下来带着疑惑进行源码分析。

 1  public final class ComputationScheduler extends Scheduler implements SchedulerMultiWorkerSupport {
 2    // 资源池
 3    final AtomicReference<FixedSchedulerPool> pool;
 4
 5    // 这是computationScheduler类中实现的createWork()方法
 6    public Worker createWorker() {
 7      // 创建EventLoop工作者,入参是一个PoolWorker
 8        return new EventLoopWorker(pool.get().getEventLoop());
 9    }
10
11  static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
12          final int cores;
13          // 资源池工作者,每个工作者其实都是一个定时线程池
14          final PoolWorker[] eventLoops;
15          long n;
16          // 对应前面的函数调用
17          public PoolWorker getEventLoop() {
18            int c = cores;
19            if (c == 0) {
20                return SHUTDOWN_WORKER;
21            }
22            // simple round robin, improvements to come
23            // 这里其实就是从工作者数组中轮询选出一个工作者
24            这里其实拥有提升和优化的空间,这里笔者可能会向开源社区提交一个pr
25            以此进行比较好的调度器调度
26            return eventLoops[(int)(n++ % c)];
27          }
28  // 此处是一个简单的封装        
29  static final class PoolWorker extends NewThreadWorker {
30          PoolWorker(ThreadFactory threadFactory) {
31              super(threadFactory);
32          }
33      }
34
35  public class NewThreadWorker extends Scheduler.Worker implements Disposable {
36    private final ScheduledExecutorService executor;
37
38    volatile boolean disposed;
39
40    public NewThreadWorker(ThreadFactory threadFactory) {
41        // 进行定时线程池的初始化
42        executor = SchedulerPoolFactory.create(threadFactory);
43    }
44
45    public static ScheduledExecutorService create(ThreadFactory factory) {
46      final ScheduledExecutorService exec =
47      // 初始化一个定时线程池
48      Executors.newScheduledThreadPool(1, factory);
49      tryPutIntoPool(PURGE_ENABLED, exec);
50      return exec;
51    }

上述代码清晰的展示了computation调度器的实现细节,这里需要说明的是定时线程池的core设置为1,线程池的个数最多为cpu数量,这里涉及到ScheduledThreadPoolExecutor定时线程池的原理,简单的说起内部是一个可自动增长的数组(队列)类似于ArrayList,也就是说队列永远不会满,线程池中的线程数不会增加。

接下来结合订阅线程和发布线程分析其之间如何进行沟通的本质。

发布线程在上一篇的文章已经提到,内部是一个worker,那么订阅线程也是么,很显然必须是的,接下来我们来看下源代码:

 1// 还是从subscribeActul开始(原因见上一篇文章)
 2public void subscribeActual(Subscriber<? super T> s) {
 3    Worker worker = scheduler.createWorker();
 4
 5    if (s instanceof ConditionalSubscriber) {
 6        source.subscribe(new ObserveOnConditionalSubscriber<T>(
 7                (ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
 8    } else {
 9        // 
10        source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
11    }
12}

其内部封装了一个ObserveOnsubcriber,这是个对下流订阅者的封装,主要什么作用呢,为什么要这个呢?其实这个涉及订阅线程内部的机制,接着看源代码了解其内部机制。

 1  // 基类
 2  abstract static class BaseObserveOnSubscriber<T> extends BasicIntQueueSubscription<T>
 3  implements FlowableSubscriber<T>, Runnable {
 4      private static final long serialVersionUID = -8241002408341274697L;
 5
 6      final Worker worker;
 7
 8      final boolean delayError;
 9
10      final int prefetch;
11
12      //...
13
14      @Override
15      public final void onNext(T t) {
16          if (done) {
17              return;
18          }
19          if (sourceMode == ASYNC) {
20              trySchedule();
21              return;
22          }
23
24          if (!queue.offer(t)) {
25              upstream.cancel();
26
27              error = new MissingBackpressureException("Queue is full?!");
28              done = true;
29          }
30          // 开启订阅者线程池模式的调度,具体实现在子类中实现
31          trySchedule();
32      }
33
34      @Override
35      public final void onError(Throwable t) {
36          if (done) {
37              RxJavaPlugins.onError(t);
38              return;
39          }
40          error = t;
41          done = true;
42          trySchedule();
43      }
44
45      @Override
46      public final void onComplete() {
47          if (!done) {
48              done = true;
49              trySchedule();
50          }
51      }
52
53      // 这里并没有向上传递request请求,而是把自己当做数据发射者进行request计数
54      @Override
55      public final void request(long n) {
56          if (SubscriptionHelper.validate(n)) {
57              BackpressureHelper.add(requested, n);
58              // 开启调度
59              trySchedule();
60          }
61      }
62
63      // 调度代码
64      final void trySchedule() {
65          // 上一篇文章讲过这个的用法
66          if (getAndIncrement() != 0) {
67              return;
68          }
69          // 启用一个work来进行任务的执行 this对象说明实现了runable接口
70          worker.schedule(this);
71      }
72
73      // 调度实现的代码
74      @Override
75      public final void run() {
76          if (outputFused) {
77              runBackfused();
78          } else if (sourceMode == SYNC) {
79              runSync();
80          } else {
81              // 一般会调用runAsync方法
82              runAsync();
83          }
84      }
85
86      abstract void runBackfused();
87
88      abstract void runSync();
89
90      abstract void runAsync();
91      //...
92  }

当上游的装饰者(上一篇提到的装饰者模式)调用onNext方法时,这时并没有类似的去调用下游的onNext方法,那这个时候其实就是订阅者线程模式的核心原理:采用queue队列进行数据的store,这里尝试将数据放进队列。

ObserveOnSubscriber的具体实现类部分实现如下。

 1  static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
 2  implements FlowableSubscriber<T> {
 3
 4      private static final long serialVersionUID = -4547113800637756442L;
 5
 6      final Subscriber<? super T> downstream;
 7
 8      ObserveOnSubscriber(
 9              Subscriber<? super T> actual,
10              Worker worker,
11              boolean delayError,
12              int prefetch) {
13          super(worker, delayError, prefetch);
14          this.downstream = actual;
15      }
16
17      //这是上游回调这个subscriber时调用的方法,详情见上一篇文章
18      @Override
19      public void onSubscribe(Subscription s) {
20          if (SubscriptionHelper.validate(this.upstream, s)) {
21              this.upstream = s;
22
23              if (s instanceof QueueSubscription) {
24                  @SuppressWarnings("unchecked")
25                  QueueSubscription<T> f = (QueueSubscription<T>) s;
26
27                  int m = f.requestFusion(ANY | BOUNDARY);
28
29                  if (m == SYNC) {
30                      sourceMode = SYNC;
31                      queue = f;
32                      done = true;
33
34                      downstream.onSubscribe(this);
35                      return;
36                  } else
37                  if (m == ASYNC) {
38                      sourceMode = ASYNC;
39                      queue = f;
40
41                      downstream.onSubscribe(this);
42
43                      s.request(prefetch);
44
45                      return;
46                  }
47              }
48              // 设置缓存队列
49              // 这里涉及一个特别之处就是预获取(提前获取数据)
50              queue = new SpscArrayQueue<T>(prefetch);
51              // 触发下游subscriber 如果有request则会触发下游对上游数据的request
52              downstream.onSubscribe(this);
53              // 请求上游数据 上面的代码和这行代码就是起到承上启下的一个作用,也就是预获取,放在队列中
54              s.request(prefetch);
55          }
56      }
57
58      //...

下面看一下抽象方法runAsync()的实现。

 1      @Override
 2      void runAsync() {
 3          int missed = 1;
 4
 5          final Subscriber<? super T> a = downstream;
 6          final SimpleQueue<T> q = queue;
 7
 8          long e = produced;
 9
10          for (;;) {
11
12              long r = requested.get();
13
14              while (e != r) {
15                  boolean d = done;
16                  T v;
17
18                  try {
19                      // 获取数据
20                      v = q.poll();
21                  } catch (Throwable ex) {
22                      Exceptions.throwIfFatal(ex);
23
24                      cancelled = true;
25                      upstream.cancel();
26                      q.clear();
27
28                      a.onError(ex);
29                      worker.dispose();
30                      return;
31                  }
32
33                  boolean empty = v == null;
34
35                  if (checkTerminated(d, empty, a)) {
36                      return;
37                  }
38
39                  if (empty) {
40                      break;
41                  }
42
43                  a.onNext(v);
44
45                  e++;
46                  // limit = prefetch - (prefetch >> 2)
47                  // prefetch  = BUFFER_SIZE(上一篇文章提到的默认128)
48                  if (e == limit) {
49                      if (r != Long.MAX_VALUE) {
50                          r = requested.addAndGet(-e);
51                      }
52                      upstream.request(e);
53                      e = 0L;
54                  }
55              }
56
57              if (e == r && checkTerminated(done, q.isEmpty(), a)) {
58                  return;
59              }
60
61              // 下面的代码机制在上一篇讲过主要涉及异步编程技巧
62              int w = get();
63              if (missed == w) {
64                  produced = e;
65                  missed = addAndGet(-missed);
66                  if (missed == 0) {
67                      break;
68                  }
69              } else {
70                  missed = w;
71              }
72          }
73      }
74    //...
75  }

前面说过,订阅者把自己当成一个发射者,那数/据从哪里来呢,而且还要持续有数据,那么后面的代码说明了数据来源,当数据达到limit,开始新的数据的prefetch,每次preftch的数量是limit。

为何要将订阅者这样区别设置呢,其实原因很简单,订阅者和发布者需要不同的线程机制异步地执行,比如订阅者需要computation的线程机制来进行大量的耗时数据计算,但又要保持一致的装修者模式,所以源码的做法是订阅者这边打破回调的调用流,采用数据队列进行两个线程池之间的数据传送

本文总结

笔者喜欢总结,总结意味着我们反思和学习前面的知识点,应用点以及自身的不足。

  1. rxjava2线程调度的原理机制,不同场景下线程机制需要进行定制
  2. rxjava2生产和消费的异步原理和实现方式
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-01-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 aoho求索 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RxJava2 深入解析
    • scheduler调度器源码解析
      • computation调度器源码分析
  • 本文总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档