前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【JDK并发包基础】工具类详解

【JDK并发包基础】工具类详解

作者头像
我叫刘半仙
发布2018-04-16 16:39:41
1.2K0
发布2018-04-16 16:39:41
举报
文章被收录于专栏:我叫刘半仙我叫刘半仙

       在写并发代码来提升性能时,会遵循某些最佳写法,而不是只用基础的wait和notify来控制复杂的流程。Java.util.concurrent 包是专为 Java并发编程而设计的包,它下有很多编写好的工具,使用这些更高等的同步工具来编写代码,让我们的程序可以不费力气就得到优化。这些工具还在由一些优秀的工程师不断优化和完善,我们不必重复造轮子:

脑图地址,感谢深入浅出 Java Concurrency ,此脑图在这篇基础上修改而来。其中我们先看工具类部分:

       1.ReentrantLock

       2.Condition

       3.Semaphore

       4.ReentrantReadWriteLock

       5.CountDownLatch

       6.CyclicBarrrer

1.ReentrantLock

       1.1可重入

       单线程可以重复进入,但必须重复退出。

代码语言:javascript
复制
public class ReentrantLock1 implements Runnable{
	    private static int  a    = 0;
	    ReentrantLock lock = new ReentrantLock();

	    public void run() {
	    	for (int i = 0; i < 10000000; i++) {
	    		//一个线程拿到几个许可 就得释放几次,不然就造成等待 可在命令行查看jps -->jstack 9016
		        lock.lock(); //获取锁
		        lock.lock(); //获取锁
		        try {
		            a++;
		        } finally {
		            lock.unlock(); //释放锁
		            //lock.unlock(); //释放锁
		        }
	    	}
	    }

	  public static void main(String[] args) throws InterruptedException {
		  ReentrantLock1 rt = new ReentrantLock1();
		  Thread  t1 = new Thread(rt);
		  Thread  t2 = new Thread(rt);
		  t1.start();t2.start();
		  t1.join();t2.join();
		  System.out.println(a);
	}
}

一个线程拿到几个许可,就得释放几次,不然就造成线程等待,可在命令行查看等待的线程:jps -->jstack  [ option ] pid  

       1.2可中断

       长期线程在锁上等待问题,可以通过中断来解决:

代码语言:javascript
复制
public class ReentrantLock2 implements Runnable{
	    int  lock ;
	    private static  ReentrantLock lock2 = new ReentrantLock();
	    private static  ReentrantLock lock1 = new ReentrantLock();

	    public ReentrantLock2(int lock){
	    	this.lock=lock;
	    }
	    //两个线程控制加锁顺序,构造死锁现象.
        //lock1去申请lock2,lock2会申请lock1,如果使用lock方法,不太有办法把它解开.
        //使用lockInterruptibly加锁,可以使用中断导致线程正常结束
	    public void run() {
	    	try {
				if(lock==1){
                   //可中断的加锁.如果不加这个,只是简单的lock,是不会响应中断的
					lock1.lockInterruptibly();
						Thread.sleep(500);
					lock2.lockInterruptibly();
				}else{
					lock2.lockInterruptibly();
						Thread.sleep(500);
					lock1.lockInterruptibly();
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
				//实际业务中可以在中断后做一些补救措施....
			}finally{
				if(lock1.isHeldByCurrentThread()){//如果拿了这把锁
					lock1.unlock();//解锁
				}
				if(lock2.isHeldByCurrentThread()){
					lock2.unlock();
				}
				System.out.println(Thread.currentThread().getId()+":线程退出");
			}
	    	
	    }

	  public static void main(String[] args) throws InterruptedException {
		  ReentrantLock2 rt1 = new ReentrantLock2(1);
		  ReentrantLock2 rt2 = new ReentrantLock2(2);
		  Thread  t1 = new Thread(rt1);
		  Thread  t2 = new Thread(rt2);
		  t1.start();t2.start();
		  Thread.sleep(1000);
		  //中断其中一个线程
		  //DeadLockCheckChecker.check();放开它就可以中断线程
	}
}
class DeadLockCheckChecker {
	  private  final static ThreadMXBean mbean=ManagementFactory.getThreadMXBean();
	    final static Runnable deadLockCheck=new Runnable() {
	        @Override
	        public void run() {
	            while (true){
	                long[] deadLockedThreadIds=mbean.findDeadlockedThreads();
	                if(deadLockedThreadIds!=null){
	                    ThreadInfo[] threadInfos=mbean.getThreadInfo(deadLockedThreadIds);
	                    for(Thread t: Thread.getAllStackTraces().keySet()){
	                        for(int i=0;i<threadInfos.length;i++){
	                            if(t.getId()==threadInfos[i].getThreadId()){//如果检查到了死锁
	                                t.interrupt();//中断当前线程
	                            }
	                        }
	                    }
	                }
	                try {
	                    Thread.sleep(1000);
	                } catch (InterruptedException e) {
	                }
	            }

	        }
	    };


	    public static void check(){
	        Thread thread=new Thread(deadLockCheck);
	        thread.setDaemon(true);//如果说整个程序都退出了,我就没有必要做死锁检查,所以要设置为守护线程
	        thread.start();
	    }
}

       可在命令行查看死锁:jps -->jstack  [ option ] pid  

       1.3可限时

       可限时也是一个避免永久等待构成死锁的解决方法:

代码语言:javascript
复制
public class ReentrantLock3 implements Runnable{
	    ReentrantLock lock = new ReentrantLock();

    public void run() {
	        try {
	        	if(lock.tryLock(2,TimeUnit.SECONDS)){
	        		Thread.sleep(3000);
	        	}else{
	        		System.out.println("申请锁失败");
	        	}
	        }catch(InterruptedException e){
	        	e.printStackTrace();
	        }finally {
	        	if(lock.isHeldByCurrentThread()){//如果线程持有这把锁
	        		 lock.unlock(); //释放锁
	        	}
	        }
	    }

	  public static void main(String[] args) throws InterruptedException {
		  ReentrantLock3 rt = new ReentrantLock3();
		  Thread  t1 = new Thread(rt);
		  Thread  t2 = new Thread(rt);
		  t1.start();t2.start();
	}
}

       1.4公平锁

       先来的线程先得到锁。如果先来的线程一直拿不到锁,则会产生饥饿现象,公平锁虽然不会产生饥饿现象,因为产生排队问题,会导致程序效率差。通过阅读ReentrantLock的源码发现:默认是非公平的,如果传入true,则是公平锁:

2.Condition

       Condition与ReentrantLock的关系就类似于synchronized与Object.wait()/notify()。但是它与ReentrantLock结合使用,有await和signal与之对应.

       利用Condition实现顺序执行:

代码语言:javascript
复制
public class ReentrantLockOfCondition {

	volatile private static int nextPrintWho = 1;
	private static ReentrantLock lock = new ReentrantLock();
	final private static Condition conditionA = lock.newCondition();
	final private static Condition conditionB = lock.newCondition();
	final private static Condition conditionC = lock.newCondition();

	public static void main(String[] args) {

		Thread threadA = new Thread() {
			public void run() {
				try {
					lock.lock();
					while (nextPrintWho != 1) {
						conditionA.await();
					}
					for (int i = 0; i < 3; i++) {
						System.out.println("ThreadA " + (i + 1));
					}
					nextPrintWho = 2;
					conditionB.signalAll();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} finally {
					lock.unlock();
				}
			}
		};

		Thread threadB = new Thread() {
			public void run() {
				try {
					lock.lock();
					while (nextPrintWho != 2) {
						conditionB.await();
					}
					for (int i = 0; i < 3; i++) {
						System.out.println("ThreadB " + (i + 1));
					}
					nextPrintWho = 3;
					conditionC.signalAll();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} finally {
					lock.unlock();
				}
			}
		};

		Thread threadC = new Thread() {
			public void run() {
				try {
					lock.lock();
					while (nextPrintWho != 3) {
						conditionC.await();
					}
					for (int i = 0; i < 3; i++) {
						System.out.println("ThreadC " + (i + 1));
					}
					nextPrintWho = 1;
					conditionA.signalAll();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} finally {
					lock.unlock();
				}
			}
		};
		Thread[] aArray = new Thread[5];
		Thread[] bArray = new Thread[5];
		Thread[] cArray = new Thread[5];

		for (int i = 0; i < 5; i++) {
			aArray[i] = new Thread(threadA);
			bArray[i] = new Thread(threadB);
			cArray[i] = new Thread(threadC);

			aArray[i].start();
			bArray[i].start();
			cArray[i].start();
		}

	}
}

3.Semaphore

       翻译为信号量,允许多个线程进入临界区。说白了就是一个广义上的锁,相当于共享锁。比如信号量中我可以给它指定10个许可,每一个许可可以分配给若干个线程(当然一个线程也可以拿多个许可),拿到许可线程可以执行,如果许可分发完了,后面的线程就和锁一样去做等待。换句话说,当信号量等于1的时候,就相当于一把锁。

      比如早期做限流时,我们系统是8核cpu,设置同时请求任务为8个,超过8个,可以用信号量让线程等待来加以控制:

代码语言:javascript
复制
public class UseSemaphore {  
  
    public static void main(String[] args) {  
        // 线程池  
        ExecutorService exec = Executors.newCachedThreadPool();  
        // 只能8个线程同时访问  
        final Semaphore semp = new Semaphore(8);  
        // 模拟20个客户端访问  
        for (int index = 0; index < 20; index++) {  
            final int NO = index;  
            Runnable run = new Runnable() {  
                public void run() {  
                    try {  
                        // 获取许可  
                        semp.acquire();  
                        System.out.println("Accessing: " + NO);  
                        //模拟实际业务逻辑
                        Thread.sleep((long) (Math.random() * 10000));  
                    } catch (InterruptedException e) {
                    	e.printStackTrace();
                    }finally{
                    	 // 访问完后,释放  
                        semp.release();  
                    }
                }  
            };  
            exec.execute(run);  
        } 
        
        try {
			Thread.sleep(10);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
        //System.out.println(semp.getQueueLength());
        // 退出线程池  
        exec.shutdown();  
    }  
}  

       一般限流java用的不多,实际中一种思路是用redis,存用户的session到redis中,统计一分钟之内访问url的次数count,如果count大于额定次数,则是恶意攻击。redis里有个expried,每过60秒,把index清零,然后重新统计。现在互联网公司成熟的限流方案是使用:nginx + lua + redis 防刷和限流

4.ReentrantReadWriteLock

       ReentrantReadWriteLock,首先要做的是与ReentrantLock划清界限, 它和后者都是单独的实现,彼此之间没有继承或实现的关系。读写锁可以很好的提高程序效率,如果读也加锁的话,每次只有一个线程能访问,不符合高并发程序设计。ReentrantLock和synchronized都属于阻塞的并行,会把线程挂起,而ReadWriteLock属于无等待的并发。

       访问情况:读读共享,读写互斥,写写互斥

代码语言:javascript
复制
public class ReadWriteLockTest {
	public static void main(String[] args) throws InterruptedException {
		final Queue3 q3 = new Queue3();
		Thread t1 = new Thread("t1"){
			public void run() {
				q3.get();
		  }
		};
		Thread t2 = new Thread("t2"){
			public void run() {
				q3.get();
		  }
		};
		Thread t3 = new Thread("t3"){
			public void run() {
				q3.get();
		  }
		};
		
        Thread t4 = new Thread("t4"){
			public void run() {
				q3.put(new Random().nextInt(10000));
		  }
		};
		Thread t5 = new Thread("t5"){
			public void run() {
				q3.put(new Random().nextInt(10000));
		  }
		};
		Thread t6 = new Thread("t6"){
			public void run() {
				q3.put(new Random().nextInt(10000));
		  }
		};
		t4.start();t5.start();t6.start();
        t4.join();t5.join();t6.join();
    	t1.start();t2.start();t3.start();
	}
}

class Queue3 {
	private Object data = null;// 共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。
	private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

	public void get() {
		rwl.readLock().lock();// 上读锁,其他线程只能读不能写
		System.out.println(Thread.currentThread().getName()
				+ " be ready to read data!");
		try {
			Thread.sleep((long) (Math.random() * 1000));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(Thread.currentThread().getName()
				+ " have read data :" + data);
		rwl.readLock().unlock(); // 释放读锁,最好放在finnaly里面
	}

	public void put(Object data) {
		rwl.writeLock().lock();// 上写锁,不允许其他线程读也不允许写
		System.out.println(Thread.currentThread().getName()
				+ " be ready to write data!");
		try {
			Thread.sleep((long) (Math.random() * 1000));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		this.data = data;
		System.out.println(Thread.currentThread().getName()
				+ " have write data: " + data);
		rwl.writeLock().unlock();// 释放写锁
	}
}

5.CountDownLatch

       实际开发中经常用于监听某些初始化操作,等待初始化完成后,通知主线程继续工作,它相当于一个栅栏

代码语言:javascript
复制
public class UseCountDownLatch {

	public static void main(String[] args) {
		
		//new CountDownLatch(2)表示await过后,得有两个countDown才能唤醒
		final CountDownLatch countDown = new CountDownLatch(2);
		
		Thread t1 = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("进入线程t1" + "等待其他线程处理完成...");
					countDown.await();
					System.out.println("t1线程继续执行...");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		},"t1");
		
		Thread t2 = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("t2线程进行初始化操作...");
					Thread.sleep(3000);
					System.out.println("t2线程初始化完毕,通知t1线程继续...");
					countDown.countDown();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
		Thread t3 = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("t3线程进行初始化操作...");
					Thread.sleep(4000);
					System.out.println("t3线程初始化完毕,通知t1线程继续...");
					countDown.countDown();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
		
		t1.start();
		t2.start();
		t3.start();
	}
}

6.CyclicBarrrer

       倒数计时器。假设有一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才能一起出发,只要有一个人没有准备好,大家都等待:

代码语言:javascript
复制
public class UseCyclicBarrier {

	static class Runner implements Runnable {  
	    private CyclicBarrier barrier;  
	    private String name;  
	    
	    public Runner(CyclicBarrier barrier, String name) {  
	        this.barrier = barrier;  
	        this.name = name;  
	    }  
	    @Override  
	    public void run() {  
	        try {  
	            Thread.sleep(1000 * (new Random()).nextInt(5));  
	            System.out.println(name + " 准备OK.");  
	            barrier.await();  
	        } catch (InterruptedException e) {  
	            e.printStackTrace();  
	        } catch (BrokenBarrierException e) {  
	            e.printStackTrace();  
	        }  
	        System.out.println(name + " Go!!");  
	    }  
	} 
	
    public static void main(String[] args) throws IOException, InterruptedException {  
        CyclicBarrier barrier = new CyclicBarrier(3);  // 3 
        ExecutorService executor = Executors.newFixedThreadPool(3);  
        
        executor.submit(new Thread(new Runner(barrier, "张三")));  
        executor.submit(new Thread(new Runner(barrier, "李四")));  
        executor.submit(new Thread(new Runner(barrier, "王五")));  
  
        executor.shutdown();  
    }  
}  

      最后,我们解决高并发可以从以下几个方面入手:1.Nginx网络带宽硬件方面等。2.业务细粒度化。3.限流。4.集群。5.异步。6.缓存。其中最主要的还是业务细粒度化。

系列:

【JDK并发包基础】线程池详解

【JDK并发包基础】并发容器详解

【JDK并发包基础】工具类详解

【JDK并发基础】Java内存模型详解

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.ReentrantLock
  • 2.Condition
  • 3.Semaphore
  • 4.ReentrantReadWriteLock
  • 5.CountDownLatch
  • 6.CyclicBarrrer
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档