前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >AQS源码分析之Elasticsearch BaseFuture

AQS源码分析之Elasticsearch BaseFuture

作者头像
山行AI
发布2020-03-25 11:44:05
7150
发布2020-03-25 11:44:05
举报
文章被收录于专栏:山行AI

多线程模式

Java多线程编程中,常用的多线程设计模式包括:Future模式、Master-Worker模式、Guarded Suspeionsion模式、不变模式和生产者-消费者模式等

Future模式

Future模式对于多线程而言,如果线程A要等待线程B的结果,那么线程A没必要一直等待B,直到B有结果,可以先拿到一个未来的Future,等B有结果时再获取真实的结果。Future模式的核心在于:去除了主函数的等待时间,并使得原本需要等待的时间段可以用于处理其他业务逻辑。

Future模式有点类似于商品订单。在网上购物时,提交订单后,在收货的这段时间里无需一直在家里等候,可以先干别的事情。类推到程序设计中时,当提交请求时,期望得到答复时,如果这个答复可能很慢。传统的是一直持续等待直到这个答复收到之后再去做别的事情,但如果利用Future模式,其调用方式改为异步,而原先等待返回的时间段,在主调用函数中,则可以用于处理其他事务。

Future模式的应用

Future模式的应用场景非常多,比如在rxjava,guava,dubbo的client与server交互的response与request也是用future进行包装的,还有很多其他的场景中也都离不开Future的身影。下面我们顺便来看下Future在Elasticsearch中的应用。future的主要有以下几个方法:

  • cancel(boolean) :尝试取消任务的执行;
  • get():尝试获取任务执行的结果,是一个阻塞方法;
  • get(long,TimeUnit):获取任务执行结果的方法,阻塞到指定时间;
  • isDone():任务是否执行完成;
  • isCancelled():任务是否已经取消。

org.elasticsearch.common.util.concurrent.BaseFuture

我们先来分析一下它的内部类org.elasticsearch.common.util.concurrent.BaseFuture.Sync:
代码语言:javascript
复制
  1. /**
  2. 1. 遵循{@link AbstractQueuedSynchronizer}的约定,我们创建了一个私有子类来保存同步器。该同步器用于实现阻塞和等待调用以及以线程安全的方式处理状态更改。将来的当前状态保持为“同步”状态,并且只要状态更改为{@link #COMPLETED}或{@link #CANCELLED},就会释放锁定。
  3. 2. 为了避免进行释放和获取的线程之间的竞争,我们分两步过渡到最终状态。一个线程将成功地将CAS从RUNNING转换为COMPLETING,然后该线程将设置计算结果,然后才转换为COMPLETED或CANCELLED。
  4. 3. 我们不使用在aqs中acquire方法之间传递的整数参数,因此我们在各处传递-1来填充这个参数。
  5. */
  6. static final class Sync<V> extends AbstractQueuedSynchronizer {
  7. /* Valid states. */
  8. static final int RUNNING = 0;// 初始状态,AQS的state默认为0
  9. static final int COMPLETING = 1;
  10. static final int COMPLETED = 2;
  11. static final int CANCELLED = 4;
  12. private V value;
  13. private Throwable exception;
  14. /*
  15. * Acquisition succeeds if the future is done, otherwise it fails.
  16. */
  17. @Override
  18. protected int tryAcquireShared(int ignored) {//当future完成时获取成功,否则获取失败
  19. if (isDone()) {// future完成
  20. return 1;
  21. }
  22. return -1;
  23. }
  24. /*
  25. * We always allow a release to go through, this means the state has been
  26. * successfully changed and the result is available.
  27. */
  28. @Override
  29. protected boolean tryReleaseShared(int finalState) {// 释放
  30. // 设置最终状态
  31. setState(finalState);
  32. return true;
  33. }
  34. /**
  35. * Blocks until the task is complete or the timeout expires. Throws a
  36. * {@link TimeoutException} if the timer expires, otherwise behaves like
  37. * {@link #get()}.
  38. */
  39. V get(long nanos) throws TimeoutException, CancellationException,
  40. ExecutionException, InterruptedException {//阻塞直到任务完成或者时间超时
  41. // Attempt to acquire the shared lock with a timeout.
  42. /**
  43. 这里调用的是java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquireSharedNanos方法
  44. public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
  45. throws InterruptedException {
  46. if (Thread.interrupted())
  47. throw new InterruptedException();
  48. return tryAcquireShared(arg) >= 0 ||
  49. doAcquireSharedNanos(arg, nanosTimeout);
  50. }
  51. 1. 这里调用的是上面的tryAcquireShared方法,在任务完成时返回1,否则返回-1,当返回1时会直接返回true,不会执行doAcquireSharedNanos方法;
  52. 2. 如果任务没有完成,则tryAcquireShared(arg) >= 0为false,那么会进入doAcquireSharedNanos方法,在该方法中会在未超时的时间内调用tryAcquireShared方法再次尝试,也是在任务执行完成时返回true,否则进行阻塞直到超时释放并返回false。当两个结果都为false时会进入下面这个判断。
  53. */
  54. if (!tryAcquireSharedNanos(-1, nanos)) {
  55. throw new TimeoutException("Timeout waiting for task.");
  56. }
  57. // 如果能进行到这里,证明任务已经执行完成,可以获取到结果
  58. return getValue();
  59. }
  60. /**
  61. * Blocks until {@link #complete(Object, Throwable, int)} has been
  62. * successfully called. Throws a {@link CancellationException} if the task
  63. * was cancelled, or a {@link ExecutionException} if the task completed with
  64. * an error.
  65. */
  66. V get() throws CancellationException, ExecutionException,
  67. InterruptedException {
  68. /**
  69. 这里我们看下方法:
  70. public final void acquireSharedInterruptibly(int arg)
  71. throws InterruptedException {
  72. if (Thread.interrupted())
  73. throw new InterruptedException();
  74. if (tryAcquireShared(arg) < 0)
  75. doAcquireSharedInterruptibly(arg);
  76. }
  77. 1. tryAcquireShared方法在任务未完成时会返回-1,然后进入doAcquireSharedInterruptibly方法;
  78. 2. 再来看doAcquireSharedInterruptibly方法,里面会调用tryAcquireShared方法进行判断,如果任务完成则返回值为1,会直接返回,否则代表任务未完成,线程会被park,进行等待状态,直到complete方法被调用。
  79. */
  80. // Acquire the shared lock allowing interruption.
  81. acquireSharedInterruptibly(-1);
  82. return getValue();
  83. }
  84. /**
  85. * Implementation of the actual value retrieval. Will return the value
  86. * on success, an exception on failure, a cancellation on cancellation, or
  87. * an illegal state if the synchronizer is in an invalid state.
  88. */
  89. private V getValue() throws CancellationException, ExecutionException {
  90. int state = getState();
  91. switch (state) {
  92. case COMPLETED:
  93. if (exception != null) {
  94. throw new ExecutionException(exception);
  95. } else {
  96. // 返回执行结果
  97. return value;
  98. }
  99. case CANCELLED:
  100. throw new CancellationException("Task was cancelled.");
  101. default:
  102. throw new IllegalStateException(
  103. "Error, synchronizer in invalid state: " + state);
  104. }
  105. }
  106. /**
  107. * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
  108. */
  109. boolean isDone() {
  110. // 是否执行完成
  111. return (getState() & (COMPLETED | CANCELLED)) != 0;
  112. }
  113. /**
  114. * Checks if the state is {@link #CANCELLED}.
  115. */
  116. boolean isCancelled() {
  117. // 是否处于取消状态
  118. return getState() == CANCELLED;
  119. }
  120. /**
  121. * Transition to the COMPLETED state and set the value.
  122. */
  123. boolean set(@Nullable V v) {//过渡到COMPLETED状态并设置值
  124. // 设置状态
  125. return complete(v, null, COMPLETED);
  126. }
  127. /**
  128. * Transition to the COMPLETED state and set the exception.
  129. */
  130. boolean setException(Throwable t) {// 过渡到COMPLETED状态并设置异常
  131. return complete(null, t, COMPLETED);
  132. }
  133. /**
  134. * Transition to the CANCELLED state.
  135. */
  136. boolean cancel() {//过渡到CANCELLED状态
  137. return complete(null, null, CANCELLED);
  138. }
  139. /**
  140. * Implementation of completing a task. Either {@code v} or {@code t} will
  141. * be set but not both. The {@code finalState} is the state to change to
  142. * from {@link #RUNNING}. If the state is not in the RUNNING state we
  143. * return {@code false} after waiting for the state to be set to a valid
  144. * final state ({@link #COMPLETED} or {@link #CANCELLED}).
  145. *
  146. * @param v the value to set as the result of the computation.
  147. * @param t the exception to set as the result of the computation.
  148. * @param finalState the state to transition to.
  149. */
  150. private boolean complete(@Nullable V v, @Nullable Throwable t,
  151. int finalState) {
  152. // cas AQS的state从初始的RUNNING到COMPLETING
  153. boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
  154. if (doCompletion) {//如果cas成功了
  155. // If this thread successfully transitioned to COMPLETING, set the value
  156. // and exception and then release to the final state.
  157. this.value = v;//设置结果
  158. this.exception = t;// 设置异常
  159. /**
  160. public final boolean releaseShared(int arg) {
  161. if (tryReleaseShared(arg)) {
  162. doReleaseShared();// 在里面执行线程唤醒的任务,unparkSuccessor
  163. return true;
  164. }
  165. return false;
  166. }
  167. */
  168. releaseShared(finalState);// 设置最终状态
  169. } else if (getState() == COMPLETING) {// 如果是完成中
  170. // If some other thread is currently completing the future, block until
  171. // they are done so we can guarantee completion.
  172. acquireShared(-1);// 如果有线程处于完成中,阻塞直到他们全部完成
  173. }
  174. // 返回结果
  175. return doCompletion;
  176. }
  177. }
  • 遵循{@link AbstractQueuedSynchronizer}的约定,我们创建了一个私有子类来保存同步器。该同步器用于实现阻塞和等待调用以及以线程安全的方式处理状态更改。将来的当前状态保持为“同步”状态,并且只要状态更改为{@link #COMPLETED}或{@link #CANCELLED},就会释放锁定。
  • 为了避免进行释放和获取的线程之间的竞争,我们分两步过渡到最终状态。一个线程将成功地将CAS从RUNNING转换为COMPLETING,然后该线程将设置计算结果,然后才转换为COMPLETED或CANCELLED。
  • 我们不使用在aqs中acquire方法之间传递的整数参数,因此我们在各处传递-1来填充这个参数。
  • get(long nanos)方法,get()方法和complete方法都是可能会阻塞的,这些可以理解成都是属于client方法,即外层的方法,具体细节参考代码注释内容。
  • 唤醒操作依赖于 set(@Nullable V v)、setException(Throwable t)、cancel()等方法的调用,因为在这些方法中都调用了complete()方法并进行了state的更新。这些可以理解成处理任务的server端的方法,即内层通知外层任务执行完成的方法。
  • 有一点需要注意,这里可能会存在外层多个线程同时get的情况,如果此时任务已经完成,则结果立即返回,否则多个线程都会进入阻塞,然后在complete方法的releaseShared方法中进行unpark。
BaseFuture中的属性与方法一览:
代码语言:javascript
复制
  1. public abstract class BaseFuture<V> implements Future<V> {
  2. private static final String BLOCKING_OP_REASON = "Blocking operation";
  3. /**
  4. * Synchronization control for AbstractFutures.
  5. */
  6. private final Sync<V> sync = new Sync<>();
  7. @Override
  8. public V get(long timeout, TimeUnit unit) throws InterruptedException,
  9. TimeoutException, ExecutionException {
  10. assert timeout <= 0 || blockingAllowed();
  11. // 这里调用的是sync的get(timeout)方法
  12. return sync.get(unit.toNanos(timeout));
  13. }
  14. /**
  15. * {@inheritDoc}
  16. * <p>
  17. * The default {@link BaseFuture} implementation throws {@code
  18. * InterruptedException} if the current thread is interrupted before or during
  19. * the call, even if the value is already available.
  20. *
  21. * @throws InterruptedException if the current thread was interrupted before
  22. * or during the call (optional but recommended).
  23. * @throws CancellationException {@inheritDoc}
  24. */
  25. @Override
  26. public V get(long timeout, TimeUnit unit) throws InterruptedException,
  27. TimeoutException, ExecutionException {
  28. assert timeout <= 0 || blockingAllowed();
  29. // 调用的是sync的get(timeout)方法
  30. return sync.get(unit.toNanos(timeout));
  31. }
  32. /*
  33. * Improve the documentation of when InterruptedException is thrown. Our
  34. * behavior matches the JDK's, but the JDK's documentation is misleading.
  35. */
  36. /**
  37. * {@inheritDoc}
  38. * <p>
  39. * The default {@link BaseFuture} implementation throws {@code
  40. * InterruptedException} if the current thread is interrupted before or during
  41. * the call, even if the value is already available.
  42. *
  43. * @throws InterruptedException if the current thread was interrupted before
  44. * or during the call (optional but recommended).
  45. * @throws CancellationException {@inheritDoc}
  46. */
  47. @Override
  48. public V get() throws InterruptedException, ExecutionException {
  49. assert blockingAllowed();
  50. return sync.get();
  51. }
  52. // protected so that it can be overridden in specific instances
  53. protected boolean blockingAllowed() {// 用于es集群状态、处理线程等的校验,可以被使用的地方重写
  54. return Transports.assertNotTransportThread(BLOCKING_OP_REASON) &&
  55. ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) &&
  56. ClusterApplierService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON) &&
  57. MasterService.assertNotMasterUpdateThread(BLOCKING_OP_REASON);
  58. }
  59. @Override
  60. public boolean isDone() {
  61. return sync.isDone();
  62. }
  63. @Override
  64. public boolean isCancelled() {
  65. return sync.isCancelled();
  66. }
  67. @Override
  68. public boolean cancel(boolean mayInterruptIfRunning) {
  69. if (!sync.cancel()) {
  70. return false;
  71. }
  72. done();
  73. if (mayInterruptIfRunning) {
  74. interruptTask();
  75. }
  76. return true;
  77. }
  78. // 模板方法
  79. protected void interruptTask() {
  80. }
  81. /**
  82. * Subclasses should invoke this method to set the result of the computation
  83. * to {@code value}. This will set the state of the future to
  84. * {@link BaseFuture.Sync#COMPLETED} and call {@link #done()} if the
  85. * state was successfully changed.
  86. *
  87. * @param value the value that was the result of the task.
  88. * @return true if the state was successfully changed.
  89. */
  90. protected boolean set(@Nullable V value) {
  91. boolean result = sync.set(value);
  92. if (result) {
  93. done();
  94. }
  95. return result;
  96. }

可以看到BaseFuture中的方法大都依赖于Sync来实现的。

总结

全篇设计的最妙的地方是用Sync包装了Future的方法,然后对tryAcquireShared方法和tryReleaseShared方法的重写也是整个设计的核心。以上纯属个人观点,不当之处还请指正。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-03-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 多线程模式
  • Future模式
  • Future模式的应用
  • org.elasticsearch.common.util.concurrent.BaseFuture
  • 总结
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档