前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java并发工具类(JUC)

Java并发工具类(JUC)

作者头像
chenchenchen
发布2022-01-05 14:13:23
7970
发布2022-01-05 14:13:23
举报
文章被收录于专栏:chenchenchenchenchenchen
  • 等待多线程完成CountDownLatch
  • 同步屏障CyclicBarrier
  • 控制并发线程数Semaphore
  • 线程间交换数据Exchanger

在J.U.C包中,提供了几个非常有用的并发工具类,CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数据的一种手段。

通过使用这些工具类,可以有效提高并发编程中,并发流程的控制,以提升效率和代码质量。

等待多线程完成的CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。

使用

CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。可以是N个线程,也可以是1个线程里的N个执行步骤。

代码语言:javascript
复制
CountDownLatch countDown=new CountDownLatch(N);

当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零。

如果有某个线程处理得比较慢,可以使用另外一个带指定时间的await方法——await(long time,TimeUnit unit),这个方法等待特定时间后,就会不再阻塞当前线程。join也有类似的方法。

注意:计数器必须大于等于0,等于0时相当于不会阻塞当前线程。**同时CountDownLatch不能重新初始化或者修改CountDownLatch对象的内部计数器。**一个线程调用countDown方法happen-before,另外一个线程调用await方法。

跟join()方法对比

在代码实现中,我们也可以使用join()方法,让当前执行线程等待join线程执行结束。join的实现原理是不断的去判断join的线程是否存活,如果存活,则让当前线程一直等待。

测试类

代码语言:javascript
复制
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
 
public class CountDownLatchTest {
	private static Random sr=new Random(47); 
	private static AtomicInteger result=new AtomicInteger(0);
	private static int threadCount=10;//线程数量
	private static CountDownLatch countDown=new CountDownLatch(threadCount);//CountDownLatch
	private static class Parser implements Runnable{ 
		String name;
		public Parser(String name){
			this.name=name;
		}
		@Override
		public void run() {
			int sum=0;
			int seed=Math.abs(sr.nextInt()) ;
			Random r=new Random(47); 
			for(int i=0;i<100;i++){  
				sum+=r.nextInt(seed);
			}  
			result.addAndGet(sum);
			System.out.println(name+"线程的解析结果:"+sum);
			countDown.countDown();//注意这里
		} 
	}
	public static void main(String[] args) throws InterruptedException {
		Thread[] threads=new Thread[threadCount];
		for(int i=0;i<threadCount;i++){
			threads[i]=new Thread(new Parser("Parser-"+i));
		} 
		for(int i=0;i<threadCount;i++){
			threads[i].start();
		} 
		/*
		for(int i=0;i<threadCount;i++){
			threads[i].join();
		}*/
		countDown.await();//将join改为使用CountDownLatch
		System.out.println("所有线程解析结束!");
		System.out.println("所有线程的解析结果:"+result);
	} 
}

输出:
Parser-0线程的解析结果:1336321192
Parser-1线程的解析结果:-2013585201
Parser-2线程的解析结果:-1675827227
Parser-4线程的解析结果:1638121055
Parser-3线程的解析结果:908136818
Parser-5线程的解析结果:1513365118
Parser-7线程的解析结果:489607354
Parser-6线程的解析结果:1513365118
Parser-8线程的解析结果:-1191966831
Parser-9线程的解析结果:-912399159
所有线程解析结束!
所有线程的解析结果:1605138237

源码

代码语言:javascript
复制
public class CountDownLatch {
    /**Synchronization control For CountDownLatch. Uses AQS state to represent count.*/
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
 
        Sync(int count) {
            setState(count);//初始化同步状态
        }
 
        int getCount() {
            return getState();
        }
 
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
 
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
 
    private final Sync sync;//组合一个同步器(AQS)
 
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);//初始化同步状态
    }
    /*Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.*/
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);//
    }
 
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    public void countDown() {
        sync.releaseShared(1);//释放同步状态
    }
 
    public long getCount() {
        return sync.getCount();
    }
 
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

同步屏障CyclicBarrier

使用

CyclicBarrier意思是可循环使用(Cyclic)的屏障(Barrier)。可以让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程通过调用CyclicBarrier的await()方法,通知CyclicBarrier当前线程已经到达屏障,然后当前线程被阻塞。

CyclicBarrier与CountDownLatch的区别

CyclicBarrier的计数器可以使用reset()进行重置,而CountDownLatch的计数器不可重置只能使用一次。如果计算发生错误,可以重置计数器,并让线程重新执行一次。

CyclicBarrier提供了比CountDownLatch更丰富的方法。isBroken()用于了解阻塞的线程是否被中断。getNumberWaiting()方法可以获取被CyclicBarrier阻塞的线程数。

测试类

代码语言:javascript
复制
import java.util.Random; 
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
 
 
public class CyclicBarrierTest {
 
	private static Random sr=new Random(47); 
	private static AtomicInteger result=new AtomicInteger(0);
	private static int threadCount=10;
	//屏障后面执行汇总
	private static CyclicBarrier barrier=new CyclicBarrier(threadCount,new Accumulate());
	private static class Parser implements Runnable{ 
		String name;
		public Parser(String name){
			this.name=name;
		}
		@Override
		public void run() {
			int sum=0;
			int seed=Math.abs(sr.nextInt()) ;
			Random r=new Random(47); 
			for(int i=0;i<(seed%100*100000);i++){  
				sum+=r.nextInt(seed); 
			}  
			result.addAndGet(sum);
			System.out.println(System.currentTimeMillis()+"-"+name+"线程的解析结果:"+sum);
			try { 
				barrier.await();
				System.out.println(System.currentTimeMillis()+"-"+name+"线程越过屏障!");
			} catch (Exception e) {
				e.printStackTrace();
			}
		} 
	}
	static class Accumulate implements Runnable{ 
		@Override
		public void run() { 
			System.out.println("所有线程解析结束!");
			System.out.println("所有线程的解析结果:"+result);
		} 
	}
	public static void main(String[] args) throws InterruptedException {
		Thread[] threads=new Thread[threadCount];
		for(int i=0;i<threadCount;i++){
			threads[i]=new Thread(new Parser("Parser-"+i));
		} 
		for(int i=0;i<threadCount;i++){
			threads[i].start();
		}   
	} 
}

输出:
1471866228774-Parser-4线程的解析结果:631026992
1471866228930-Parser-3线程的解析结果:-372785277
1471866228961-Parser-1线程的解析结果:-938473891
1471866229008-Parser-7线程的解析结果:-396620018
1471866229008-Parser-2线程的解析结果:-1159985406
1471866229024-Parser-5线程的解析结果:-664234808
1471866229070-Parser-6线程的解析结果:556534377
1471866229117-Parser-9线程的解析结果:-844558478
1471866229383-Parser-0线程的解析结果:919864023
1471866229430-Parser-8线程的解析结果:-2104111089
所有线程解析结束!
所有线程的解析结果:-78376279
1471866229430-Parser-8线程越过屏障!
1471866229430-Parser-2线程越过屏障!
1471866229430-Parser-9线程越过屏障!
1471866229430-Parser-7线程越过屏障!
1471866229430-Parser-1线程越过屏障!
1471866229430-Parser-3线程越过屏障!
1471866229430-Parser-0线程越过屏障!
1471866229430-Parser-6线程越过屏障!
1471866229430-Parser-4线程越过屏障!
1471866229430-Parser-5线程越过屏障!
// 各个线程解析完成的时间不一致,但是越过屏障的时间却是一致的。

控制并发线程数的信号量Semaphore

应用场景

Semaphore(信号量)用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。

当需要批量处理大量的数据文件时,可以开启几十个线程并发读取文件。将文件读取到内存中之后,还需要将解析后的数据存储的数据库中,但是数据库有允许的最大连接数,比如是10个,就必须要控制只有10个线程可以获取到数据库连接。这个时候就可以用Semaphore来做流量的控制。

使用

在构造函数中,可以传入两个参数:

  • permits: 可用的许可证数量
  • fair: 是否公平获取许可证

比如Semaphore(10,true),就表示允许10个线程获取许可证,也就是最大的并发量为10,线程可以通过公平竞争(即先进先出的顺序)的方式获取许可证。

线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以用tryAcquire()方法尝试获取许可证。

其他的API

  • public final boolean hasQueuedThreads(): 是否还有线程正在等待获取许可证
  • public final int getQueueLength():判断还有多少个等待获取许可证的线程
  • public int availablePermits():返回此信号量中当前可用的许可证数量
  • public void acquire(): 从此信号量中请求一个许可证
  • public void release(): 从此信号量中释放一个许可证
  • public boolean tryAcquire(): 试图从信号量中请求一个许可证,无可用的许可证时,直接返回不阻塞;

测试类

代码中有30个线程在执行,但是只允许10个并发执行。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。

代码语言:javascript
复制
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
 
public class SemaphoreTest { 
	private static final int THREAD_COUNT = 30;
	private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
	private static Semaphore s = new Semaphore(10);
	
	public static void main(String[] args) {
		for (int i = 0; i < THREAD_COUNT; i++) {
			threadPool.execute(new Runnable() {
				@Override
				public void run() {
					try { 
						s.acquire();
						System.out.println("save data");
						s.release();
					} catch (InterruptedException e) {
					}
				}
			});
		}
		threadPool.shutdown();
	}
}

线程间交换数据的交换者Exchanger

使用

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,供两个线程交换数据。

如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长。

应用场景

下面来看一下Exchanger的应用场景。

1、Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。

2、Exchanger也可以用于校对工作,比如我们需要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致。

测试类

代码语言:javascript
复制
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ExchangerTest {
 
	private static final Exchanger<String> exgr = new Exchanger<String>();
	private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
 
	public static void main(String[] args) {
		threadPool.execute(new Runnable() {
			@Override
			public void run() {
				try {
					String A = "银行流水100";// A录入银行流水数据
					String B=exgr.exchange(A);
					System.out.println("A的视角:A和B数据是否一致:" + A.equals(B) + 
",A录入的是:" + A + ",B录入是:" + B);
				} catch (InterruptedException e) {
				}
			}
		});
		threadPool.execute(new Runnable() {
			@Override
			public void run() {
				try {
					String B = "银行流水200";// B录入银行流水数据
					String A = exgr.exchange(B);
					System.out.println("B的视角:A和B数据是否一致:" + A.equals(B) + 
",A录入的是:" + A + ",B录入是:" + B);
				} catch (InterruptedException e) {
				}
			}
		});
		threadPool.shutdown();
	}
}

输出:

B的视角:A和B数据是否一致:false,A录入的是:银行流水100,B录入是:银行流水200
A的视角:A和B数据是否一致:false,A录入的是:银行流水100,B录入是:银行流水200

LockSupport

使用

LockSupport类,是JUC包中的一个工具类,是用来创建锁和其他同步类的基本线程阻塞原语,可以让线程在任意位置阻塞,也可以在任意位置唤醒。(Basic thread blocking primitives for creating locks and other synchronization classes

核心方法有两个:park()unpark(),其中park()方法用来阻塞当前调用线程,unpark()方法用于唤醒指定线程。

ReentrantLock,ReentReadWriteLocks在线程间等待/通知机制使用的Condition时都会调用LockSupport.park()方法和LockSupport.unpark()方法。

与wait/notify对比

这和Object类的wait()和notify()方法有些类似,这里主要有两点区别:

(1)wait和notify都是Object中的方法,在调用这两个方法前必须先获得锁对象,但是park不需要获取某个对象的锁就可以锁住线程。

(2)notify只能随机选择一个线程唤醒,无法唤醒指定的线程,unpark却可以唤醒一个指定的线程。

代码语言:javascript
复制
public class WaitNotifyTest {
    private static Object obj = new Object();
    public static void main(String[] args) {
        new Thread(new WaitThread()).start();
        new Thread(new NotifyThread()).start();
    }
    static class WaitThread implements Runnable {
        @Override
        public void run() {
            synchronized (obj) {
                System.out.println("start wait!");
                try {
                    obj.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("end wait!");
            }
        }
    }
    static class NotifyThread implements Runnable {
        @Override
        public void run() {
            synchronized (obj) {
                System.out.println("start notify!");
                obj.notify();
                System.out.println("end notify!");
            }
        }
    }
}

输出:
start wait!
start notify!
end notify!
end wait!

测试类

代码语言:javascript
复制
public class LockSupportTest {

    public static void main(String[] args) {
        Thread parkThread = new Thread(new ParkThread());
        parkThread.start();
        System.out.println("开始线程唤醒");
        LockSupport.unpark(parkThread);
        System.out.println("结束线程唤醒");

    }

    static class ParkThread implements Runnable{

        @Override
        public void run() {
            System.out.println("开始线程阻塞");
            LockSupport.park();
            System.out.println("结束线程阻塞");
        }
    }
}

输出:
开始线程阻塞
开始线程唤醒
结束线程阻塞
结束线程唤醒

原理

经过测试我们发现,先唤醒线程,在阻塞线程,线程不会真的阻塞;但是先唤醒线程两次再阻塞两次时就会导致线程真的阻塞。

LockSupport使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能,通过控制变量_counter进行控制的,类似于(0,1)信号量。

  • 当调用park()方法时,会将_counter置为0,同时判断前值,小于1说明前面被unpark过,则直接退出,否则将使该线程阻塞。
  • 当调用unpark()方法时,会将_counter置为1,同时判断前值,小于1会进行线程唤醒,否则直接退出。
  • 形象的理解,线程阻塞需要消耗凭证(permit),这个凭证最多只有1个。当调用park方法时,如果有凭证,则会直接消耗掉这个凭证然后正常退出;但是如果没有凭证,就必须阻塞等待凭证可用;而unpark则相反,它会增加一个凭证,但凭证最多只能有1个。
  • 为什么可以先唤醒线程后阻塞线程? 因为unpark获得了一个凭证,之后调用park因为有凭证消费,故不会阻塞。
  • 为什么唤醒两次后阻塞两次会阻塞线程。 因为凭证的数量最多为1,连续调用两次unpark和调用一次unpark效果一样,只会增加一个凭证;而调用两次park却需要消费两个凭证。

参考:

《Java并发编程的艺术》

Java并发工具类:https://blog.csdn.net/weixin_36208314/article/details/115077591

Java并发工具类详解:https://blog.csdn.net/sunxianghuang/article/details/52277394

LockSupport功能简介及原理浅析:https://www.cnblogs.com/takumicx/p/9328459.html

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-06-13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 等待多线程完成的CountDownLatch
    • 使用
      • 跟join()方法对比
        • 测试类
          • 源码
          • 同步屏障CyclicBarrier
            • 使用
              • CyclicBarrier与CountDownLatch的区别
                • 测试类
                • 控制并发线程数的信号量Semaphore
                  • 应用场景
                    • 使用
                      • 测试类
                      • 线程间交换数据的交换者Exchanger
                        • 使用
                          • 应用场景
                            • 测试类
                            • LockSupport
                              • 使用
                                • 与wait/notify对比
                                  • 测试类
                                    • 原理
                                    相关产品与服务
                                    数据保险箱
                                    数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
                                    领券
                                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档