在J.U.C包中,提供了几个非常有用的并发工具类,CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数据的一种手段。
通过使用这些工具类,可以有效提高并发编程中,并发流程的控制,以提升效率和代码质量。
CountDownLatch允许一个或多个线程等待其他线程完成操作。
CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。可以是N个线程,也可以是1个线程里的N个执行步骤。
CountDownLatch countDown=new CountDownLatch(N);
当我们调用CountDownLatch的countDown
方法时,N就会减1,CountDownLatch的await
方法会阻塞当前线程,直到N变成零。
如果有某个线程处理得比较慢,可以使用另外一个带指定时间的await方法——await(long time,TimeUnit unit)
,这个方法等待特定时间后,就会不再阻塞当前线程。join也有类似的方法。
注意:计数器必须大于等于0,等于0时相当于不会阻塞当前线程。**同时CountDownLatch不能重新初始化或者修改CountDownLatch对象的内部计数器。**一个线程调用countDown方法happen-before,另外一个线程调用await方法。
在代码实现中,我们也可以使用join()方法,让当前执行线程等待join线程执行结束。join的实现原理是不断的去判断join的线程是否存活,如果存活,则让当前线程一直等待。
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class CountDownLatchTest {
private static Random sr=new Random(47);
private static AtomicInteger result=new AtomicInteger(0);
private static int threadCount=10;//线程数量
private static CountDownLatch countDown=new CountDownLatch(threadCount);//CountDownLatch
private static class Parser implements Runnable{
String name;
public Parser(String name){
this.name=name;
}
@Override
public void run() {
int sum=0;
int seed=Math.abs(sr.nextInt()) ;
Random r=new Random(47);
for(int i=0;i<100;i++){
sum+=r.nextInt(seed);
}
result.addAndGet(sum);
System.out.println(name+"线程的解析结果:"+sum);
countDown.countDown();//注意这里
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] threads=new Thread[threadCount];
for(int i=0;i<threadCount;i++){
threads[i]=new Thread(new Parser("Parser-"+i));
}
for(int i=0;i<threadCount;i++){
threads[i].start();
}
/*
for(int i=0;i<threadCount;i++){
threads[i].join();
}*/
countDown.await();//将join改为使用CountDownLatch
System.out.println("所有线程解析结束!");
System.out.println("所有线程的解析结果:"+result);
}
}
输出:
Parser-0线程的解析结果:1336321192
Parser-1线程的解析结果:-2013585201
Parser-2线程的解析结果:-1675827227
Parser-4线程的解析结果:1638121055
Parser-3线程的解析结果:908136818
Parser-5线程的解析结果:1513365118
Parser-7线程的解析结果:489607354
Parser-6线程的解析结果:1513365118
Parser-8线程的解析结果:-1191966831
Parser-9线程的解析结果:-912399159
所有线程解析结束!
所有线程的解析结果:1605138237
public class CountDownLatch {
/**Synchronization control For CountDownLatch. Uses AQS state to represent count.*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);//初始化同步状态
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;//组合一个同步器(AQS)
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);//初始化同步状态
}
/*Causes the current thread to wait until the latch has counted down to
* zero, unless the thread is {@linkplain Thread#interrupt interrupted}.*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);//
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);//释放同步状态
}
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
CyclicBarrier意思是可循环使用(Cyclic)的屏障(Barrier)。可以让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
CyclicBarrier默认的构造方法是CyclicBarrier(int parties)
,其参数表示屏障拦截的线程数量,每个线程通过调用CyclicBarrier的await()
方法,通知CyclicBarrier当前线程已经到达屏障,然后当前线程被阻塞。
CyclicBarrier的计数器可以使用reset()进行重置,而CountDownLatch的计数器不可重置只能使用一次。如果计算发生错误,可以重置计数器,并让线程重新执行一次。
CyclicBarrier提供了比CountDownLatch更丰富的方法。isBroken()
用于了解阻塞的线程是否被中断。getNumberWaiting()
方法可以获取被CyclicBarrier阻塞的线程数。
import java.util.Random;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
public class CyclicBarrierTest {
private static Random sr=new Random(47);
private static AtomicInteger result=new AtomicInteger(0);
private static int threadCount=10;
//屏障后面执行汇总
private static CyclicBarrier barrier=new CyclicBarrier(threadCount,new Accumulate());
private static class Parser implements Runnable{
String name;
public Parser(String name){
this.name=name;
}
@Override
public void run() {
int sum=0;
int seed=Math.abs(sr.nextInt()) ;
Random r=new Random(47);
for(int i=0;i<(seed%100*100000);i++){
sum+=r.nextInt(seed);
}
result.addAndGet(sum);
System.out.println(System.currentTimeMillis()+"-"+name+"线程的解析结果:"+sum);
try {
barrier.await();
System.out.println(System.currentTimeMillis()+"-"+name+"线程越过屏障!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
static class Accumulate implements Runnable{
@Override
public void run() {
System.out.println("所有线程解析结束!");
System.out.println("所有线程的解析结果:"+result);
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] threads=new Thread[threadCount];
for(int i=0;i<threadCount;i++){
threads[i]=new Thread(new Parser("Parser-"+i));
}
for(int i=0;i<threadCount;i++){
threads[i].start();
}
}
}
输出:
1471866228774-Parser-4线程的解析结果:631026992
1471866228930-Parser-3线程的解析结果:-372785277
1471866228961-Parser-1线程的解析结果:-938473891
1471866229008-Parser-7线程的解析结果:-396620018
1471866229008-Parser-2线程的解析结果:-1159985406
1471866229024-Parser-5线程的解析结果:-664234808
1471866229070-Parser-6线程的解析结果:556534377
1471866229117-Parser-9线程的解析结果:-844558478
1471866229383-Parser-0线程的解析结果:919864023
1471866229430-Parser-8线程的解析结果:-2104111089
所有线程解析结束!
所有线程的解析结果:-78376279
1471866229430-Parser-8线程越过屏障!
1471866229430-Parser-2线程越过屏障!
1471866229430-Parser-9线程越过屏障!
1471866229430-Parser-7线程越过屏障!
1471866229430-Parser-1线程越过屏障!
1471866229430-Parser-3线程越过屏障!
1471866229430-Parser-0线程越过屏障!
1471866229430-Parser-6线程越过屏障!
1471866229430-Parser-4线程越过屏障!
1471866229430-Parser-5线程越过屏障!
// 各个线程解析完成的时间不一致,但是越过屏障的时间却是一致的。
Semaphore(信号量)用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。
当需要批量处理大量的数据文件时,可以开启几十个线程并发读取文件。将文件读取到内存中之后,还需要将解析后的数据存储的数据库中,但是数据库有允许的最大连接数,比如是10个,就必须要控制只有10个线程可以获取到数据库连接。这个时候就可以用Semaphore来做流量的控制。
在构造函数中,可以传入两个参数:
permits
: 可用的许可证数量fair
: 是否公平获取许可证比如Semaphore(10,true)
,就表示允许10个线程获取许可证,也就是最大的并发量为10,线程可以通过公平竞争(即先进先出的顺序)的方式获取许可证。
线程使用Semaphore的acquire()
方法获取一个许可证,使用完之后调用release()
方法归还许可证。还可以用tryAcquire()
方法尝试获取许可证。
其他的API
public final boolean hasQueuedThreads()
: 是否还有线程正在等待获取许可证public final int getQueueLength()
:判断还有多少个等待获取许可证的线程public int availablePermits()
:返回此信号量中当前可用的许可证数量public void acquire()
: 从此信号量中请求一个许可证public void release()
: 从此信号量中释放一个许可证public boolean tryAcquire()
: 试图从信号量中请求一个许可证,无可用的许可证时,直接返回不阻塞;代码中有30个线程在执行,但是只允许10个并发执行。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
}
}
});
}
threadPool.shutdown();
}
}
Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,供两个线程交换数据。
如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)
设置最大等待时长。
下面来看一下Exchanger的应用场景。
1、Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。
2、Exchanger也可以用于校对工作,比如我们需要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致。
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerTest {
private static final Exchanger<String> exgr = new Exchanger<String>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String A = "银行流水100";// A录入银行流水数据
String B=exgr.exchange(A);
System.out.println("A的视角:A和B数据是否一致:" + A.equals(B) +
",A录入的是:" + A + ",B录入是:" + B);
} catch (InterruptedException e) {
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String B = "银行流水200";// B录入银行流水数据
String A = exgr.exchange(B);
System.out.println("B的视角:A和B数据是否一致:" + A.equals(B) +
",A录入的是:" + A + ",B录入是:" + B);
} catch (InterruptedException e) {
}
}
});
threadPool.shutdown();
}
}
输出:
B的视角:A和B数据是否一致:false,A录入的是:银行流水100,B录入是:银行流水200
A的视角:A和B数据是否一致:false,A录入的是:银行流水100,B录入是:银行流水200
LockSupport类,是JUC包中的一个工具类,是用来创建锁和其他同步类的基本线程阻塞原语,可以让线程在任意位置阻塞,也可以在任意位置唤醒。(Basic thread blocking primitives for creating locks and other synchronization classes)
核心方法有两个:park()
和unpark()
,其中park()
方法用来阻塞当前调用线程,unpark()
方法用于唤醒指定线程。
ReentrantLock,ReentReadWriteLocks在线程间等待/通知机制使用的Condition时都会调用LockSupport.park()方法和LockSupport.unpark()方法。
这和Object类的wait()和notify()方法有些类似,这里主要有两点区别:
(1)wait和notify都是Object中的方法,在调用这两个方法前必须先获得锁对象,但是park不需要获取某个对象的锁就可以锁住线程。
(2)notify只能随机选择一个线程唤醒,无法唤醒指定的线程,unpark却可以唤醒一个指定的线程。
public class WaitNotifyTest {
private static Object obj = new Object();
public static void main(String[] args) {
new Thread(new WaitThread()).start();
new Thread(new NotifyThread()).start();
}
static class WaitThread implements Runnable {
@Override
public void run() {
synchronized (obj) {
System.out.println("start wait!");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end wait!");
}
}
}
static class NotifyThread implements Runnable {
@Override
public void run() {
synchronized (obj) {
System.out.println("start notify!");
obj.notify();
System.out.println("end notify!");
}
}
}
}
输出:
start wait!
start notify!
end notify!
end wait!
public class LockSupportTest {
public static void main(String[] args) {
Thread parkThread = new Thread(new ParkThread());
parkThread.start();
System.out.println("开始线程唤醒");
LockSupport.unpark(parkThread);
System.out.println("结束线程唤醒");
}
static class ParkThread implements Runnable{
@Override
public void run() {
System.out.println("开始线程阻塞");
LockSupport.park();
System.out.println("结束线程阻塞");
}
}
}
输出:
开始线程阻塞
开始线程唤醒
结束线程阻塞
结束线程唤醒
经过测试我们发现,先唤醒线程,在阻塞线程,线程不会真的阻塞;但是先唤醒线程两次再阻塞两次时就会导致线程真的阻塞。
LockSupport使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能,通过控制变量_counter
进行控制的,类似于(0,1)信号量。
park()
方法时,会将_counter置为0,同时判断前值,小于1说明前面被unpark
过,则直接退出,否则将使该线程阻塞。unpark()
方法时,会将_counter置为1,同时判断前值,小于1会进行线程唤醒,否则直接退出。参考:
《Java并发编程的艺术》
Java并发工具类:https://blog.csdn.net/weixin_36208314/article/details/115077591
Java并发工具类详解:https://blog.csdn.net/sunxianghuang/article/details/52277394
LockSupport功能简介及原理浅析:https://www.cnblogs.com/takumicx/p/9328459.html