前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >java并发之同步辅助类Phaser

java并发之同步辅助类Phaser

作者头像
BUG弄潮儿
发布2022-06-30 15:07:23
3040
发布2022-06-30 15:07:23
举报
文章被收录于专栏:JAVA乐园

Phaser含义:

更加复杂和强大的同步辅助类。它允许并发执行多阶段任务。当我们有并发任务并且需要分解成几步执行时,(CyclicBarrier是分成两步),就可以选择使用Phaser。Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。

跟其他同步工具一样,必须对Phaser类中参与同步操作的任务数进行初始化,不同的是,可以动态的增加或者减少任务数。

函数:

arriveAndAwaitAdvance():类似于CyclicBarrier的await()方法,等待其它线程都到来之后同步继续执行。

arriveAndDeregister():把执行到此的线程从Phaser中注销掉。

isTerminated():判断Phaser是否终止。

register():将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程。

forceTermination():强制Phaser进入终止态。

... ...

例子

使用Phaser类同步三个并发任务。这三个任务将在三个不同的文件夹及其子文件夹中查找过去24小时内修改过扩展为为.log的文件。这个任务分成以下三个步骤:

1、在执行的文件夹及其子文件夹中获取扩展名为.log的文件

2、对每一步的结果进行过滤,删除修改时间超过24小时的文件

3、将结果打印到控制台

在第一步和第二步结束的时候,都会检查所查找到的结果列表是不是有元素存在。如果结果列表是空的,对应的线程将结束执行,并从Phaser中删除。(也就是动态减少任务数)

文件查找类:

import java.io.File;

import java.util.ArrayList;

import java.util.Date;

import java.util.List;

import java.util.concurrent.Phaser;

import java.util.concurrent.TimeUnit;

public class FileSearch implements Runnable {

private String initPath;

private String end;

private List<String> results;

private Phaser phaser;

public FileSearch(String initPath, String end, Phaser phaser) {

this.initPath = initPath;

this.end = end;

this.phaser=phaser;

results=new ArrayList<>();

}

@Override

public void run() {

phaser.arriveAndAwaitAdvance();//等待所有的线程创建完成,确保在进行文件查找的时候所有的线程都已经创建完成了

System.out.printf("%s: Starting.\n",Thread.currentThread().getName());

// 1st Phase: 查找文件

File file = new File(initPath);

if (file.isDirectory()) {

directoryProcess(file);

}

// 如果查找结果为false,那么就把该线程从Phaser中移除掉并且结束该线程的运行

if (!checkResults()){

return;

}

// 2nd Phase: 过滤结果,过滤出符合条件的(一天内的)结果集

filterResults();

// 如果过滤结果集结果是空的,那么把该线程从Phaser中移除,不让它进入下一阶段的执行

if (!checkResults()){

return;

}

// 3rd Phase: 显示结果

showInfo();

phaser.arriveAndDeregister();//任务完成,注销掉所有的线程

System.out.printf("%s: Work completed.\n",Thread.currentThread().getName());

}

private void showInfo() {

for (int i=0; i<results.size(); i++){

File file=new File(results.get(i));

System.out.printf("%s: %s\n",Thread.currentThread().getName(),file.getAbsolutePath());

}

// Waits for the end of all the FileSearch threads that are registered in the phaser

phaser.arriveAndAwaitAdvance();

}

private boolean checkResults() {

if (results.isEmpty()) {

System.out.printf("%s: Phase %d: 0 results.\n",Thread.currentThread().getName(),phaser.getPhase());

System.out.printf("%s: Phase %d: End.\n",Thread.currentThread().getName(),phaser.getPhase());

//结果为空,Phaser完成并把该线程从Phaser中移除掉

phaser.arriveAndDeregister();

return false;

} else {

// 等待所有线程查找完成

System.out.printf("%s: Phase %d: %d results.\n",Thread.currentThread().getName(),phaser.getPhase(),results.size());

phaser.arriveAndAwaitAdvance();

return true;

}

}

private void filterResults() {

List<String> newResults=new ArrayList<>();

long actualDate=new Date().getTime();

for (int i=0; i<results.size(); i++){

File file=new File(results.get(i));

long fileDate=file.lastModified();

if (actualDate-fileDate<TimeUnit.MILLISECONDS.convert(1,TimeUnit.DAYS)){

newResults.add(results.get(i));

}

}

results=newResults;

}

private void directoryProcess(File file) {

// Get the content of the directory

File list[] = file.listFiles();

if (list != null) {

for (int i = 0; i < list.length; i++) {

if (list[i].isDirectory()) {

// If is a directory, process it

directoryProcess(list[i]);

} else {

// If is a file, process it

fileProcess(list[i]);

}

}

}

}

private void fileProcess(File file) {

if (file.getName().endsWith(end)) {

results.add(file.getAbsolutePath());

}

}

}

测试主类:

import java.util.concurrent.Phaser;

public class PhaserMain {

public static void main(String[] args) {

Phaser phaser = new Phaser(3);

FileSearch system = new FileSearch("C:\\Windows", "log", phaser);

FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser);

FileSearch documents = new FileSearch("C:\\Documents And Settings", "log", phaser);

Thread systemThread = new Thread(system, "System");

systemThread.start();

Thread appsThread = new Thread(apps, "Apps");

appsThread.start();

Thread documentsThread = new Thread(documents, "Documents");

documentsThread.start();

try {

systemThread.join();

appsThread.join();

documentsThread.join();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.printf("Terminated: %s\n", phaser.isTerminated());

}

}

备注:《java并发之同步辅助类CountDownLatch》文章有个代码错误PrintQueue类改成Participant 类,谢谢网友Go_away指正

import java.util.concurrent.TimeUnit;

public class Participant implements Runnable {

private VideoConference conference;

private String name;

public Participant(VideoConference conference, String name) {

this.conference=conference;

this.name=name;

}

@Override

public void run() {

Long duration=(long)(Math.random()*10);

try {

TimeUnit.SECONDS.sleep(duration);

} catch (InterruptedException e) {

e.printStackTrace();

}

conference.arrive(name);//每到一个人员,CountDownLatch计数器就减少1

}

}

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-03-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 BUG弄潮儿 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Phaser含义:
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档