前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >(juc系列)phaser源码学习

(juc系列)phaser源码学习

作者头像
呼延十
发布2021-10-18 10:44:31
5150
发布2021-10-18 10:44:31
举报
文章被收录于专栏:呼延呼延

简介

老规矩,下面是官方注释的简单翻译版本,追求速度,都不一定通顺. 谨慎阅读.

一个可复用的同步屏障,功能上类似于CyclicBarrierCountDownLatch,但是支持更多灵活的用法.

登记

与其他同步屏障不同的是,Phaser的数量是可以各自不同的. 使用方应该使用register或者bulkRegister来进行注册.或者以构造方法的形式初始化数量。 然后在一些节点到达后可以进行取消注册.

与大多数基本的同步器构造方法一样,注册和取消注册仅影响内部计数. 他们不记录任何内部的名单, 任务无法查询他们是否已经登记了.

CountDownLatchCyclicBarrier,Semaphore等等都是指定数量后不能变化的,而Phaser的注册数量是可以随时变化的,因此更加灵活.

同步

CyclicBarrier一样,Phaser支持重复调用awaited. arriveAndAwaitAdvanceCyclicBarrier.await的作用类似.

Phaser的每一代拥有一个关联的编号. Phaser的阶段编号从零开始,所有的参与者到达后,阶段编号增加。到达int的最大值后,回归为0.

阶段编号可以独立的控制到达行为和等待行为,任何注册方可以调用以下两种方法:

  • arrival

arrivearriveAndDeregister两个方法记录到达. 这两个方法不阻塞,但是返回关联的到达阶段编号.

指定阶段编号的最后一个参与者到达,一个可选的行为会被执行,然后Phaser进行升级.

这两个操作由触发阶段升级的最后一个参与者触发,并由重写的onAdvance方法负责控制. 这个方法也负责控制终止, 重写这个方法和CyclicBarrier的屏障行为很相似,但是更加灵活一些.

  • waiting 等待

awaitAdvance要求一个参数,表示到达阶段的编号,或者当一个阶段升级到另一个不同的阶段时返回. 和CyclicBarrier的方法不一样,awaitAdvance方法继续等待,直到等待线程被中断. 可中断和带有超时的版本也是支持的. 但是超时或者中断了并不会影响Phaser的状态.

如果必要,你可以自己执行相关的恢复操作, 在调用forceTermination之后. 阶段还被用来执行ForkJoinPool.

终止

一个phaser可以进入终止状态, 使用isTerminated方法来检查. 如果终止了,所有的同步方法立即返回,不再等待. 返回一个负数值来表名这点.

相似的,在终止后尝试进行注册,也不会有反应. 当调用onAdvance返回true时, 终止被触发. 如果一个取消注册的行为,让注册数量为0了, 将会终止.

分层

Phasers可以分层以减少竞争(比如以树状结构初始化). 设置有较大数量的Phasers将会有比较严重的同步竞争,可以使用一组子Phaser共享一个公共的父节点, 来避免这种情况. 这将大大的提升吞吐量即使会导致每一个操作的浪费变大.

在一个分层phaser的树中, 子节点的注册和取消注册是自动管理的, 如果注册的数量变为非零值,子节点将注册至其父节点, 如果注册数量变为0. 子节点将从其父节点取消注册.

可以查看下方分层的示例来了解.

因为支持分层,因此一个Phaser有三种形态.

  • 非树形,单个节点

这是最简单的形态,只要自身的注册数等于到达数,就升级一次阶段编号即可.

  • 树形,叶子节点

只要自身的注册数量等于到达数量,就代表自己这个节点“到达”了,向父节点的到达数+1.

  • 树形,非叶子节点

自身到达数等于注册数,这里的到达数不是参与的任务数,而是自己的子节点的数量,自己的所有子节点全部到达,自己才算到达,向自己的父节点进行”到达”操作。 如果这个节点是根节点,那么整个Phaser树才算是全部到达,进行升级操作.

monitoring 监控

即使同步方法只能由注册方进行调用,一个phaser的当前状态可以被任何调用方监控. 在一个给定的时间,getRegisteredParties返回总数, getArriveParties返回到达的数量. getUnarrivedParties返回没有到达的数量. 这些方法返回值都是瞬态的,因此可能在同步控制中不是特别有用. toString方法返回这些状态的一个快照.

简单示例

代替CountDownLatch

Phaser可以用来替换掉CountDownLatch. 控制一个行为, 服务于一些部分. 通常的操作是, 设置当前线程为第一个注册者, 然后启动所有的行为,之后取消注册当前线程.

  1. 注册当前线程(此时注册数量为1)
  2. 启动所有线程,首先注册一次(全部完成后,此时注册数量为tasks.size() + 1) ,之后让他们arriveAndAwaitAdvance. 到达并且等待升级(此时到达数量为tasks.size()`.
  3. 取消注册当前线程(注册数量变成tasks.size()), Phaser的注册数量等于到达数量。因此进行升级,所有等待的线程唤醒,继续执行任务.
重复执行一组任务指定次数

让一组线程,重复执行某些行为一定的次数,可以重写onAdvance.

  1. 初始化一个Phaser,并重写onAdvance. 让阶段编号大于给定次数时,Phaser进行终止.
  2. 当前线程注册. (此时注册数量为1)
  3. 每个任务线程,注册一次. (此时注册数量为tasks.size() + 1)
  4. 如果Phaser没有终止,其他所有线程执行任务, 然后等待 (此时到达数量为tasks.size().
  5. 当前线程取消注册,让注册数等于等待数,其他线程等待结束,进行升级或者终止.

让所有任务互相等待,以完成一组任务,整体完成给定次数后,Phaser终止,程序结束.

等待终止

如果主任务必须在终止后发生,他可以注册然后执行一个相似的循环.

首先进行注册,然后在Phaser没有终止前,不断的到达,等待升级.知道Phaser终止了,再进行主任务的执行.

等待特定的阶段编号

如果你确定在你的上下文中,Phaser的数量不会超过int的最大值,你可以使用这些相关的构造器来等待特定的某个阶段编号.

分层的示例

上面讲到Phaser支持分层以获得更好的并发性,这是一个简单的例子.

创建一组任务,使用一个树形的Phasers. 假设一个Task的类,他的构造参数接受一个Phaser. 在调用下方代码的build之后,这些任务会开始.

TASKS_PER_PHASER 的最佳值取决于你期望的同步效率. 越小的值,会让每个阶段的执行块变小,因此速率高. 如果需要更大的执行快,可以设置为高达几百.

注意事项

实现控制最大的参与者数量为65535. 如果尝试去注册更多,会导致错误. 但是你可以通过使用树形的Phasers来实现更多的参与者.

源码阅读

构造方法

共提供了4个构造方法,本质上都是调用最后一个. 详情看注释.

主要是区分是否是树形,然后对State,父节点,等待队列等进行初始化赋值.

变量

这是一些变量和常量.

  • State 状态定义
  • parent 父节点
  • root 根节点
  • evenQ 等待线程栈的偶数版本
  • addQ 等待线程栈的奇数版本

其他还有一些常量,主要是用来辅助对于State的定义的,比较常见的一些shift,one等等,不再介绍.

QNode

内部的等待节点. 定义结构比较简单, 主要是保存了当前的Phaser信息和对应的线程信息,以及一个指向下一个节点的next指针.

提供了两个方法.

isReleasable 是否可释放

如果内部的信息有一些不对劲, 比如线程为空,或者被中断了, 或者Phaser被别人改了,等等, 都返回true. 否则返回false. 支持中断和超时.

block 阻塞等待

根据是否超时, 阻塞当前线程一段时间.

register系列

用于向Phaser注册参与者.

register系列提供了两个方法,registerbulkRegister两个方法,本质上都是调用doRegister方法.

这是核心的注册方法,主要有三个分支

  • 没有父节点,且第一次注册.

这是最简单的,直接将注册后的State更新进去即可.

  • 有父节点,但是不是第一次注册

检查下注册后,当前节点是否全部到达了,如果是, 当前节点升级,并且告诉父节点.

  • 有父节点,是第一次注册.

首先将当前节点注册到父节点,之后更新当前节点的参与者信息等.

arrive系列

arrivearriveAndDeregister 到达,不等待其他参与者

这两个方法都实现到达相关逻辑, 调用doArrive来实现.

doArrive

主要作用是对State中的未到达数量进行递减, 如果递减完,还有未到达的参与者,直接返回当前阶段,如果递减完,当前所有参与者都到达了. 有三个分支:

  • 非树形结构

直接计算下一个状态,进行写入.

  • 树形结构,且因为注销,没有参与者了.

向父节点注销当前节点, 当前节点置为空.

  • 树形结构,且还有参与者

向父节点传递当前节点完全到达的消息.

arriveAndAwaitAdvance 到达然后等待其他参与者

到达一个参与者,且阻塞等待Phaser的升级行为. 首先将当前Phaser的状态进行递减,之后主要有三个分支:

  • 不是最后一个到达的.

从跟进点进行等待升级

  • 是最后一个到达的,且有父节点

调用父节点的arriveAndAwaitAdvance,向父节点报告当前节点已经完全到达,开始等待升级.

  • 是最后一个到达的,且是根节点

计算新的状态,设置状态然后唤醒等待者.

await系列

awaitAdvance, awaitAdvanceInterruptibly, awaitAdvanceInterruptibly 三个await系列的方法,本质上都是调用的 父节点的internalAwaitAdvance. 只是支持了中断和超时而已.

让当前线程自旋或者进入队列等待Phaser的升级,也就是等待其他所有参与者的到达.

强行终止

比较简单, 只要根节点的状态不为0,就强行设置为终止了. 释放所有的等待节点.

onAdvance

这是预留给子类的一个方法, 可以定义Phaser升级时执行的动作,还可以定义锁是否要升级. 默认实现是注册的参与者为0, 就终止整个Phaser.

监控方法

还有很多负责监控当前Phaser状态 的方法,这里简单记录一下 .

  • getPhase 拿到阶段编号
  • getRegisteredParites 拿到当前的参与者数量
  • getArrivedParties 拿到当前到达的参与者数量
  • getUnarrivedParties 未到达的参与者数量
  • getParent 返回当前节点的父节点
  • getRoot 获取根节点
  • isTerminated 是否被终止

总结

Phaser是一个用于多阶段任务的同步器,没有使用AQS框架来实现,而是自己实现的。

内部的核心还是State的定义.

高32位记录当前的阶段编号,16-32为记录共有多少个参与者, 低16位记录还有多少个参与者没有到达.

提供三类方法:

  • 注册

修改16-32位,与其他同步器相比,提供了更多的灵活性,可以修改参与者的数量

  • 到达

修改低16位,当全部到达后,进行升级,升级通过修改高32位来记录阶段编号

  • 等待

让先到达的线程,阻塞等待所有参与者的到达,也就是升级行为完成后,被唤醒.

为了支持更大的并发度,Phaser支持以树结构创建,叶子节点接受所有参与者的到达,控制所有注册到自己的参与者. 父节点控制自己的子节点. 根节点控制所有是否进行放行,唤醒所有等待线程.

完.

完。

联系我

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。 也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。

以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
    • 登记
      • 同步
        • 终止
          • 分层
            • monitoring 监控
              • 简单示例
                • 代替CountDownLatch
                • 重复执行一组任务指定次数
                • 等待终止
                • 等待特定的阶段编号
                • 分层的示例
              • 注意事项
              • 源码阅读
                • 构造方法
                  • 变量
                    • QNode
                      • isReleasable 是否可释放
                      • block 阻塞等待
                    • register系列
                      • arrive系列
                        • arrive 和 arriveAndDeregister 到达,不等待其他参与者
                        • doArrive
                        • arriveAndAwaitAdvance 到达然后等待其他参与者
                      • await系列
                        • 强行终止
                          • onAdvance
                            • 监控方法
                            • 总结
                            • 联系我
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档