【JDK并发包基础】工具类详解

       在写并发代码来提升性能时,会遵循某些最佳写法,而不是只用基础的wait和notify来控制复杂的流程。Java.util.concurrent 包是专为 Java并发编程而设计的包,它下有很多编写好的工具,使用这些更高等的同步工具来编写代码,让我们的程序可以不费力气就得到优化。这些工具还在由一些优秀的工程师不断优化和完善,我们不必重复造轮子:

脑图地址,感谢深入浅出 Java Concurrency ,此脑图在这篇基础上修改而来。其中我们先看工具类部分:

       1.ReentrantLock

       2.Condition

       3.Semaphore

       4.ReentrantReadWriteLock

       5.CountDownLatch

       6.CyclicBarrrer

1.ReentrantLock

       1.1可重入

       单线程可以重复进入,但必须重复退出。

public class ReentrantLock1 implements Runnable{
	    private static int  a    = 0;
	    ReentrantLock lock = new ReentrantLock();

	    public void run() {
	    	for (int i = 0; i < 10000000; i++) {
	    		//一个线程拿到几个许可 就得释放几次,不然就造成等待 可在命令行查看jps -->jstack 9016
		        lock.lock(); //获取锁
		        lock.lock(); //获取锁
		        try {
		            a++;
		        } finally {
		            lock.unlock(); //释放锁
		            //lock.unlock(); //释放锁
		        }
	    	}
	    }

	  public static void main(String[] args) throws InterruptedException {
		  ReentrantLock1 rt = new ReentrantLock1();
		  Thread  t1 = new Thread(rt);
		  Thread  t2 = new Thread(rt);
		  t1.start();t2.start();
		  t1.join();t2.join();
		  System.out.println(a);
	}
}

一个线程拿到几个许可,就得释放几次,不然就造成线程等待,可在命令行查看等待的线程:jps -->jstack  [ option ] pid  

       1.2可中断

       长期线程在锁上等待问题,可以通过中断来解决:

public class ReentrantLock2 implements Runnable{
	    int  lock ;
	    private static  ReentrantLock lock2 = new ReentrantLock();
	    private static  ReentrantLock lock1 = new ReentrantLock();

	    public ReentrantLock2(int lock){
	    	this.lock=lock;
	    }
	    //两个线程控制加锁顺序,构造死锁现象.
        //lock1去申请lock2,lock2会申请lock1,如果使用lock方法,不太有办法把它解开.
        //使用lockInterruptibly加锁,可以使用中断导致线程正常结束
	    public void run() {
	    	try {
				if(lock==1){
                   //可中断的加锁.如果不加这个,只是简单的lock,是不会响应中断的
					lock1.lockInterruptibly();
						Thread.sleep(500);
					lock2.lockInterruptibly();
				}else{
					lock2.lockInterruptibly();
						Thread.sleep(500);
					lock1.lockInterruptibly();
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
				//实际业务中可以在中断后做一些补救措施....
			}finally{
				if(lock1.isHeldByCurrentThread()){//如果拿了这把锁
					lock1.unlock();//解锁
				}
				if(lock2.isHeldByCurrentThread()){
					lock2.unlock();
				}
				System.out.println(Thread.currentThread().getId()+":线程退出");
			}
	    	
	    }

	  public static void main(String[] args) throws InterruptedException {
		  ReentrantLock2 rt1 = new ReentrantLock2(1);
		  ReentrantLock2 rt2 = new ReentrantLock2(2);
		  Thread  t1 = new Thread(rt1);
		  Thread  t2 = new Thread(rt2);
		  t1.start();t2.start();
		  Thread.sleep(1000);
		  //中断其中一个线程
		  //DeadLockCheckChecker.check();放开它就可以中断线程
	}
}
class DeadLockCheckChecker {
	  private  final static ThreadMXBean mbean=ManagementFactory.getThreadMXBean();
	    final static Runnable deadLockCheck=new Runnable() {
	        @Override
	        public void run() {
	            while (true){
	                long[] deadLockedThreadIds=mbean.findDeadlockedThreads();
	                if(deadLockedThreadIds!=null){
	                    ThreadInfo[] threadInfos=mbean.getThreadInfo(deadLockedThreadIds);
	                    for(Thread t: Thread.getAllStackTraces().keySet()){
	                        for(int i=0;i<threadInfos.length;i++){
	                            if(t.getId()==threadInfos[i].getThreadId()){//如果检查到了死锁
	                                t.interrupt();//中断当前线程
	                            }
	                        }
	                    }
	                }
	                try {
	                    Thread.sleep(1000);
	                } catch (InterruptedException e) {
	                }
	            }

	        }
	    };


	    public static void check(){
	        Thread thread=new Thread(deadLockCheck);
	        thread.setDaemon(true);//如果说整个程序都退出了,我就没有必要做死锁检查,所以要设置为守护线程
	        thread.start();
	    }
}

       可在命令行查看死锁:jps -->jstack  [ option ] pid  

       1.3可限时

       可限时也是一个避免永久等待构成死锁的解决方法:

public class ReentrantLock3 implements Runnable{
	    ReentrantLock lock = new ReentrantLock();

    public void run() {
	        try {
	        	if(lock.tryLock(2,TimeUnit.SECONDS)){
	        		Thread.sleep(3000);
	        	}else{
	        		System.out.println("申请锁失败");
	        	}
	        }catch(InterruptedException e){
	        	e.printStackTrace();
	        }finally {
	        	if(lock.isHeldByCurrentThread()){//如果线程持有这把锁
	        		 lock.unlock(); //释放锁
	        	}
	        }
	    }

	  public static void main(String[] args) throws InterruptedException {
		  ReentrantLock3 rt = new ReentrantLock3();
		  Thread  t1 = new Thread(rt);
		  Thread  t2 = new Thread(rt);
		  t1.start();t2.start();
	}
}

       1.4公平锁

       先来的线程先得到锁。如果先来的线程一直拿不到锁,则会产生饥饿现象,公平锁虽然不会产生饥饿现象,因为产生排队问题,会导致程序效率差。通过阅读ReentrantLock的源码发现:默认是非公平的,如果传入true,则是公平锁:

2.Condition

       Condition与ReentrantLock的关系就类似于synchronized与Object.wait()/notify()。但是它与ReentrantLock结合使用,有await和signal与之对应.

       利用Condition实现顺序执行:

public class ReentrantLockOfCondition {

	volatile private static int nextPrintWho = 1;
	private static ReentrantLock lock = new ReentrantLock();
	final private static Condition conditionA = lock.newCondition();
	final private static Condition conditionB = lock.newCondition();
	final private static Condition conditionC = lock.newCondition();

	public static void main(String[] args) {

		Thread threadA = new Thread() {
			public void run() {
				try {
					lock.lock();
					while (nextPrintWho != 1) {
						conditionA.await();
					}
					for (int i = 0; i < 3; i++) {
						System.out.println("ThreadA " + (i + 1));
					}
					nextPrintWho = 2;
					conditionB.signalAll();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} finally {
					lock.unlock();
				}
			}
		};

		Thread threadB = new Thread() {
			public void run() {
				try {
					lock.lock();
					while (nextPrintWho != 2) {
						conditionB.await();
					}
					for (int i = 0; i < 3; i++) {
						System.out.println("ThreadB " + (i + 1));
					}
					nextPrintWho = 3;
					conditionC.signalAll();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} finally {
					lock.unlock();
				}
			}
		};

		Thread threadC = new Thread() {
			public void run() {
				try {
					lock.lock();
					while (nextPrintWho != 3) {
						conditionC.await();
					}
					for (int i = 0; i < 3; i++) {
						System.out.println("ThreadC " + (i + 1));
					}
					nextPrintWho = 1;
					conditionA.signalAll();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} finally {
					lock.unlock();
				}
			}
		};
		Thread[] aArray = new Thread[5];
		Thread[] bArray = new Thread[5];
		Thread[] cArray = new Thread[5];

		for (int i = 0; i < 5; i++) {
			aArray[i] = new Thread(threadA);
			bArray[i] = new Thread(threadB);
			cArray[i] = new Thread(threadC);

			aArray[i].start();
			bArray[i].start();
			cArray[i].start();
		}

	}
}

3.Semaphore

       翻译为信号量,允许多个线程进入临界区。说白了就是一个广义上的锁,相当于共享锁。比如信号量中我可以给它指定10个许可,每一个许可可以分配给若干个线程(当然一个线程也可以拿多个许可),拿到许可线程可以执行,如果许可分发完了,后面的线程就和锁一样去做等待。换句话说,当信号量等于1的时候,就相当于一把锁。

      比如早期做限流时,我们系统是8核cpu,设置同时请求任务为8个,超过8个,可以用信号量让线程等待来加以控制:

public class UseSemaphore {  
  
    public static void main(String[] args) {  
        // 线程池  
        ExecutorService exec = Executors.newCachedThreadPool();  
        // 只能8个线程同时访问  
        final Semaphore semp = new Semaphore(8);  
        // 模拟20个客户端访问  
        for (int index = 0; index < 20; index++) {  
            final int NO = index;  
            Runnable run = new Runnable() {  
                public void run() {  
                    try {  
                        // 获取许可  
                        semp.acquire();  
                        System.out.println("Accessing: " + NO);  
                        //模拟实际业务逻辑
                        Thread.sleep((long) (Math.random() * 10000));  
                    } catch (InterruptedException e) {
                    	e.printStackTrace();
                    }finally{
                    	 // 访问完后,释放  
                        semp.release();  
                    }
                }  
            };  
            exec.execute(run);  
        } 
        
        try {
			Thread.sleep(10);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
        //System.out.println(semp.getQueueLength());
        // 退出线程池  
        exec.shutdown();  
    }  
}  

       一般限流java用的不多,实际中一种思路是用redis,存用户的session到redis中,统计一分钟之内访问url的次数count,如果count大于额定次数,则是恶意攻击。redis里有个expried,每过60秒,把index清零,然后重新统计。现在互联网公司成熟的限流方案是使用:nginx + lua + redis 防刷和限流

4.ReentrantReadWriteLock

       ReentrantReadWriteLock,首先要做的是与ReentrantLock划清界限, 它和后者都是单独的实现,彼此之间没有继承或实现的关系。读写锁可以很好的提高程序效率,如果读也加锁的话,每次只有一个线程能访问,不符合高并发程序设计。ReentrantLock和synchronized都属于阻塞的并行,会把线程挂起,而ReadWriteLock属于无等待的并发。

       访问情况:读读共享,读写互斥,写写互斥

public class ReadWriteLockTest {
	public static void main(String[] args) throws InterruptedException {
		final Queue3 q3 = new Queue3();
		Thread t1 = new Thread("t1"){
			public void run() {
				q3.get();
		  }
		};
		Thread t2 = new Thread("t2"){
			public void run() {
				q3.get();
		  }
		};
		Thread t3 = new Thread("t3"){
			public void run() {
				q3.get();
		  }
		};
		
        Thread t4 = new Thread("t4"){
			public void run() {
				q3.put(new Random().nextInt(10000));
		  }
		};
		Thread t5 = new Thread("t5"){
			public void run() {
				q3.put(new Random().nextInt(10000));
		  }
		};
		Thread t6 = new Thread("t6"){
			public void run() {
				q3.put(new Random().nextInt(10000));
		  }
		};
		t4.start();t5.start();t6.start();
        t4.join();t5.join();t6.join();
    	t1.start();t2.start();t3.start();
	}
}

class Queue3 {
	private Object data = null;// 共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。
	private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

	public void get() {
		rwl.readLock().lock();// 上读锁,其他线程只能读不能写
		System.out.println(Thread.currentThread().getName()
				+ " be ready to read data!");
		try {
			Thread.sleep((long) (Math.random() * 1000));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(Thread.currentThread().getName()
				+ " have read data :" + data);
		rwl.readLock().unlock(); // 释放读锁,最好放在finnaly里面
	}

	public void put(Object data) {
		rwl.writeLock().lock();// 上写锁,不允许其他线程读也不允许写
		System.out.println(Thread.currentThread().getName()
				+ " be ready to write data!");
		try {
			Thread.sleep((long) (Math.random() * 1000));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		this.data = data;
		System.out.println(Thread.currentThread().getName()
				+ " have write data: " + data);
		rwl.writeLock().unlock();// 释放写锁
	}
}

5.CountDownLatch

       实际开发中经常用于监听某些初始化操作,等待初始化完成后,通知主线程继续工作,它相当于一个栅栏

public class UseCountDownLatch {

	public static void main(String[] args) {
		
		//new CountDownLatch(2)表示await过后,得有两个countDown才能唤醒
		final CountDownLatch countDown = new CountDownLatch(2);
		
		Thread t1 = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("进入线程t1" + "等待其他线程处理完成...");
					countDown.await();
					System.out.println("t1线程继续执行...");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		},"t1");
		
		Thread t2 = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("t2线程进行初始化操作...");
					Thread.sleep(3000);
					System.out.println("t2线程初始化完毕,通知t1线程继续...");
					countDown.countDown();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
		Thread t3 = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("t3线程进行初始化操作...");
					Thread.sleep(4000);
					System.out.println("t3线程初始化完毕,通知t1线程继续...");
					countDown.countDown();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
		
		t1.start();
		t2.start();
		t3.start();
	}
}

6.CyclicBarrrer

       倒数计时器。假设有一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才能一起出发,只要有一个人没有准备好,大家都等待:

public class UseCyclicBarrier {

	static class Runner implements Runnable {  
	    private CyclicBarrier barrier;  
	    private String name;  
	    
	    public Runner(CyclicBarrier barrier, String name) {  
	        this.barrier = barrier;  
	        this.name = name;  
	    }  
	    @Override  
	    public void run() {  
	        try {  
	            Thread.sleep(1000 * (new Random()).nextInt(5));  
	            System.out.println(name + " 准备OK.");  
	            barrier.await();  
	        } catch (InterruptedException e) {  
	            e.printStackTrace();  
	        } catch (BrokenBarrierException e) {  
	            e.printStackTrace();  
	        }  
	        System.out.println(name + " Go!!");  
	    }  
	} 
	
    public static void main(String[] args) throws IOException, InterruptedException {  
        CyclicBarrier barrier = new CyclicBarrier(3);  // 3 
        ExecutorService executor = Executors.newFixedThreadPool(3);  
        
        executor.submit(new Thread(new Runner(barrier, "张三")));  
        executor.submit(new Thread(new Runner(barrier, "李四")));  
        executor.submit(new Thread(new Runner(barrier, "王五")));  
  
        executor.shutdown();  
    }  
}  

      最后,我们解决高并发可以从以下几个方面入手:1.Nginx网络带宽硬件方面等。2.业务细粒度化。3.限流。4.集群。5.异步。6.缓存。其中最主要的还是业务细粒度化。

系列:

【JDK并发包基础】线程池详解

【JDK并发包基础】并发容器详解

【JDK并发包基础】工具类详解

【JDK并发基础】Java内存模型详解

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏iOS开发

iOS开发之 Method Swizzling 深入浅出

如果产品经理突然说:"在所有页面添加统计功能,也就是用户进入这个页面就统计一次"。我们会想到下面的一些方法:

49870
来自专栏mini188

java中的锁

java中有哪些锁 这个问题在我看了一遍<java并发编程>后尽然无法回答,说明自己对于锁的概念了解的不够。于是再次翻看了一下书里的内容,突然有点打开脑门的感觉...

56690
来自专栏数据结构与算法

BZOJ1269: [AHOI2006]文本编辑器editor

Descriptio 这些日子,可可不和卡卡一起玩了,原来可可正废寝忘食的想做一个简单而高效的文本编辑器。你能帮助他吗? 为了明确任务目标,可可对“文本编辑器...

30270
来自专栏行者常至

018.多线程-悲观锁、乐观锁、重入锁、读写锁、自旋锁、CAS无锁机制

顾名思义,就是很悲观。每次去拿数据的时候都认为别人会修改,所以都会上锁。这样别人想拿这个数据就会阻塞(block)直到它拿到锁。传统的关系型数据库里面就用到了很...

84510
来自专栏生信小驿站

数据处理第3部分:选择行的基本和高级的方法

原文地址:https://suzan.rbind.io/2018/02/dplyr-tutorial-3/ 作者:Suzan Baert 这是系列dplyr...

10410
来自专栏TechBox

一份走心的iOS开发规范前言约定(一)命名规范(二)编码规范2.14 内存管理规范本文参考文章其他有价值的文章

70290
来自专栏黄Java的地盘

[翻译]WebSocket协议第二章——Conformance Requirements

本文为WebSocket协议的第二章,本文翻译的主要内容为WebSocket协议中相关术语的介绍。

10310
来自专栏一个会写诗的程序员的博客

《Kotlin 程序设计》第七章 Kotlin 编译过程分析第七章 Kotlin 编译过程分析

http://mp.weixin.qq.com/s/lEFRH523W7aNWUO1QE6ULQ

21520
来自专栏jeremy的技术点滴

JVM的Finalization Delay引起的OOM

43880
来自专栏木木玲

ConcurrentHashMap (JDK7) 详解

48390

扫码关注云+社区

领取腾讯云代金券