专栏首页我是攻城师理解Java并发工具类Phaser

理解Java并发工具类Phaser

Phaser(移相器,一种电子元件)是JDK7中引入的新的并发工具辅助类,oralce官网文档描述Phaser是一个可重复使用的同步栅栏,功能上与 CountDownLatch 和 CyclicBarrier类似但支持的场景更加灵活,这个类可能是目前并发包里面实现最复杂的一个了。

Phaser的灵活性主要体现在在构造函数时不需要强制指定目前有多少参与协作的线程,可以在运行时动态改变。

下面看一下关于Phaser常见的方法;

Phaser() //默认的构造方法,初始化注册的线程数量为0
Phaser(int parties)//一个指定线程数量的构造方法

此外Phaser还支持Tiering类型具有父子关系的构造方法,主要是为了减少在注册者数量庞大的时候,通过分组的形式复用Phaser从而减少竞争,提高吞吐,这种形式一般不常见,所以这里不再提及,有兴趣的可以参考官网文档。

其他几个常见方法:

register()//添加一个新的注册者
bulkRegister(int parties)//添加指定数量的多个注册者
arrive()// 到达栅栏点直接执行,无须等待其他的线程
arriveAndAwaitAdvance()//到达栅栏点,必须等待其他所有注册者到达
arriveAndDeregister()//到达栅栏点,注销自己无须等待其他的注册者到达
onAdvance(int phase, int registeredParties)//多个线程达到注册点之后,会调用该方法。

(1)下面我们先看一个简单的替代CountDownLatch实现一次性的共享锁例子

void runTasks(List<Runnable> tasks) {
   final Phaser phaser = new Phaser(1); // "1" to register self
   // create and start threads
   for (final Runnable task : tasks) {
     phaser.register();
     new Thread() {
       public void run() {
         phaser.arriveAndAwaitAdvance(); // await all creation
         task.run();
       }
     }.start();
   }

   // allow threads to start and deregister self
   phaser.arriveAndDeregister();
 }

这个方法中,首先调用者线程注册了自己,然后接着分别注册并启动了多个线程,在每个线程中又调用了

phaser.arriveAndAwaitAdvance()

方法,这个方法会阻塞直到所有的线程都启动,然后继续执行,最后在方法的最后一行调用了到达时注销自己的方法,执行所有线程到达栅栏点,然后开始执行后续的task.run方法。

(2)接着我们在看一个模拟CyclicBarrier的例子。

这个例子我们以实际场景作为说明,假设小张,小李,小王,三个人约好共同去旅游,旅游路线是北京,上海,杭州,规则是他们都可以采用自己的路线去到达目的地,但是必须是所有人都到达某一个城市集合后,他们才能再次出发下一个城市。

这其实就是一个典型的循环栅栏的例子,我们直接来看如何使用Phaser来完成:

package concurrent.tools.phaser;

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

/**
 * Created by Administrator on 2018/8/27.
 */
public class PhaserDemo5 {

    public static void main(String[] args) throws InterruptedException {

        Phaser phaser=new Phaser(){
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("=================step-"+phase+"==================="+registeredParties);
                return super.onAdvance(phase, registeredParties);
            }
        };

        Bus bus1=new Bus(phaser,"小张");
        Bus bus2=new Bus(phaser,"小李");
        Bus bus3=new Bus(phaser,"小王");

        bus1.start();
        bus2.start();
        bus3.start();


        System.out.println(phaser.getRegisteredParties());



    }


    static public class Bus extends Thread{

        private Phaser phaser;
        private Random random;

        public Bus(Phaser phaser,String name){
            this.phaser=phaser;
            setName(name);
            random=new Random();
            phaser.register();
        }


        private void trip(int sleepRange,String cityName){
            System.out.println(this.getName()+" 准备去"+cityName+"....");
            int sleep=random.nextInt(sleepRange);
            try {
                TimeUnit.SECONDS.sleep(sleep);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(this.getName()+" 达到"+cityName+"...... ");
            if(this.getName().equals("小王1")){ //  测试掉队的情况
                try {
                    TimeUnit.SECONDS.sleep(7);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                phaser.arriveAndDeregister();
            }else {
                phaser.arriveAndAwaitAdvance();
            }
        }





        @Override
        public void run() {

            try {
                int s=random.nextInt(3);
                TimeUnit.SECONDS.sleep(s);
                System.out.println(this.getName()+"  准备好了,旅行路线=北京=>上海=>杭州 ");
                phaser.arriveAndAwaitAdvance();// 等待所有的汽车准备好
            } catch (InterruptedException e) {
                e.printStackTrace();
            }


            trip(5,"北京");
            trip(5,"上海");
            trip(3,"杭州");

        }
    }

}

输出结果如下:

小王  准备好了,旅行路线=北京=>上海=>杭州 
小李  准备好了,旅行路线=北京=>上海=>杭州 
小张  准备好了,旅行路线=北京=>上海=>杭州 
=================step-0===================3
小王 准备去北京....
小张 准备去北京....
小李 准备去北京....
小张 达到北京...... 
小李 达到北京...... 
小王 达到北京...... 
=================step-1===================3
小王 准备去上海....
小张 准备去上海....
小李 准备去上海....
小李 达到上海...... 
小王 达到上海...... 
小张 达到上海...... 
=================step-2===================3
小张 准备去杭州....
小张 达到杭州...... 
小李 准备去杭州....
小王 准备去杭州....
小王 达到杭州...... 
小李 达到杭州...... 
=================step-3===================3

结果符合预期,在这例中Phaser的构造函数我们并没有指定数量,而是在运行时动态注册就去的,然后里面又使用了onAdvance方法,可以在每次到达栅栏点时输出当前的step阶段序号,这个值最大是Integer.MAX_VALUE,超过之后会重新从0开始。

此外Phaser还提供了一个arriveAndDeregister方法,如果中间小张在到达某个城市之后下起了暴雨行程无期,默认情况下其他的线程都需要等待小张,如果调用了arriveAndDeregister方法,可以在任务列表注销自己,然后自己单独运行,这样不会影响其他的正常运行线程。

本文主要了介绍了JDK7引入的并发工具类Phaser,这个类的功能与CountDownLatch 和 CyclicBarrier类似但更灵活,这个类底层相对比较复杂并没有采用AQS同步框架实现,而是单独定义了相关功能api,其中state采用64位的long类型表示,然后64bit又分成4个定义分别代表没有到达栅栏的数量(0-15bit),注册的数量(16-31bit),栅栏的代数量(32-62bit),最后一位(63bit)代表当前的Phaser是否是终止状态,这也意味着我们能够注册的最大数量不能超过65535,否则会抛出不合法参数异常,这一点在使用时需要注意。

本文分享自微信公众号 - 我是攻城师(woshigcs)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-08-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 多个线程如何轮流打印ABC特定的次数?

    这类问题其实并不难,只要掌握了Java里面线程协作和锁的知识,就可以轻而易举的搞定:

    我是攻城师
  • Java线程的基本知识总结

    关于yield方法,是指当前线程可能运行不太重要的任务,可以通过这个方法暗示操作系统线程调度我可以晚点执行,先把CPU资源让给优先级给我一样或者大于我的任务,如...

    我是攻城师
  • Java并发之高级自旋锁CLH锁和MCS锁

    自旋锁(spin lock)是一个典型的对临界资源的互斥手段,自旋锁是基于CAS原语的,所以它是轻量级的同步操作,它的名称来源于它的特性。自旋锁是指当一个线程尝...

    我是攻城师
  • 2019 南昌网络赛 H The Nth Item

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

    用户2965768
  • 学好 MP4,让直播更给力

    villainhr
  • LWC 70: 778. Swim in Rising Water

    思路: N很小,只有50,可以暴力解决。对于每一个时刻t,grid变成max(t, grid[i][j]),对grid更新后,所有<t的值都变成了t,在这些...

    用户1147447
  • 从一个问题来解释下什么是mysql的可重复读

    补充解释下这个问题,mysql环境,innodb引擎,事务的隔离级别是可重复读,一个表只有两个字段,然后插入4条数据,希望你构造上图中的一种情况,就是明明upd...

    用户7634691
  • 为 DevOps 构建新的运营模型

    我一直在撰写有关企业面临的 DevOps 挑战的文章。从根本上讲,它是关于规模的:当企业尝试将其扩展到大型企业通常拥有的 800 多个应用程序中时,它们正努力实...

    LinuxSuRen
  • WordPress - 基于 Ubuntu 16.04 搭建个人博客

    购买了腾讯云服务器后, 准备开始搭建个人博客了, 这里根据网上的各种资源整理一下.(终端下进行) 如果是学习的话强烈推荐 基于 Ubuntu 搭建 Word...

    AIHGF
  • DevOps 帮助数字化转型的10个最佳实践

    从模式识别到新财源的发现,DevOps 在数字化转型过程中总是重要的角色。事实上,专家们总是说必不可少。

    DevOps持续交付

扫码关注云+社区

领取腾讯云代金券