前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【JDK并发包基础】线程池详解

【JDK并发包基础】线程池详解

作者头像
我叫刘半仙
发布2018-04-16 16:37:01
1.3K0
发布2018-04-16 16:37:01
举报
文章被收录于专栏:我叫刘半仙我叫刘半仙

为了更好的控制多线程,JDK提供了一套线程框架Executor来帮助程序员有效的进行线程控制。Java.util.concurrent 包是专为 Java并发编程而设计的包,它下有很多编写好的工具:

脑图地址,感谢深入浅出 Java Concurrency ,此脑图在这篇基础上修改而来。其中有一个比较重要的线程工厂类:Executors。 Executors工厂会提供常用四类线程池的创建。

       以前当我们每次执行一个任务时用new Thread,频繁创建对象会导致系统性能差,线程缺乏统一管理,可能无限制新建线程,相互之间竞争导致系统耗尽,并且缺乏定时任务,中断等功能。线程池可以有效的提高系统资源的使用率,同时避免过多资源竞争,重用存在的线程,减少对象创建。Java通过Executors创建不同功能的线程池,若Executors无法满足需求,我们也可以创建自定义的线程池。文章分为以下部分讲解:

       1.newFixedThreadPool()方法

       2. newSingleThreadExecutor()方法

       3.newCachedThreadPool()方法

       4.newScheduledThreadPool()方法

       5.自定义线程池

在讲述之前,因为上面5条均会用到ThreadPoolExecutor这个类,所以我们先来看看ThreadPoolExecutor中线程执行任务的示意图,它的执行任务分两种情况:

     1).Execute()方法会创建一个线程然后执行一个任务。

     2).这个线程在执行完1之后,会反复从BlockingQueue队列中获取任务来执行。如果图中所示三个线程同时间在执行任务,还有任务进来则会放入BlockingQueue队列中暂缓起来等待线程空闲去执行。再者,这3个线程正在使用,队列也满了的话(有界队列的情况),还有任务进来,则会实行拒绝策略。(take()和poll()都是取头元素节点,区别在于前者会删除元素,后者不会)

1.newFixedThreadPool()方法

       创建一个固定数量的线程池,里面的线程数始终不变,当有一个线程提交时,若线程池中有空闲的线程,则立即执行。若没有,则会暂缓在一个阻塞队列LinkedBlockingQueue中等待有空闲的线程去执行。newFixedThreadPool()方法的源码如下(LinkedBlockingQueue的详解可以看博主的上一篇文章:【JDK并发包基础】并发容器详解):

代码语言:javascript
复制
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, //核心线程数
                                      nThreads,//最大线程数
                                      0L, //空闲时保持线程活着的时间
                                      TimeUnit.MILLISECONDS,//上述时间的单位
                                      new LinkedBlockingQueue<Runnable>());//线程池没空闲,则新任务放在这个队列里
    }

       现在我们思考一下:假如有Thread1、Thread2、Thread3、Thread4四条线程分别统计C、D、E、F四个盘的大小,所有线程都统计完毕交给Thread5线程去做汇总,应当如何实现?

       第一种方式是用join()来做,不推荐:

       推荐使用线程池的方式:

代码语言:javascript
复制
public static void main(String[] args) throws InterruptedException { 
  //用CountDownLatch实现,CountDownLatch传入4相当于一个计时器,一个await需要4次countDown才能唤醒
  final CountDownLatch countDownLatch= new CountDownLatch(4);
	        Runnable run1= new Runnable() {
	            @Override
	            public void run() {
	                try {
	                    Thread.sleep(3000);
	                    System.out.println("统计C盘");
	                    countDownLatch.countDown();//单任务,把计数器减1
	                } catch (InterruptedException e) {
	                    e.printStackTrace();
	                }
	            }
	        };
	        Runnable run2= new Runnable() {
	            @Override
	            public void run() {
	                try {
	                    Thread.sleep(3000);
	                    System.out.println("统计D盘");
	                    countDownLatch.countDown();
	                } catch (InterruptedException e) {
	                    e.printStackTrace();
	                }
	            }
	        };
	        Runnable run3= new Runnable() {
	            @Override
	            public void run() {
	                try {
	                    Thread.sleep(3000);
	                    System.out.println("统计E盘");
	                    countDownLatch.countDown();
	                } catch (InterruptedException e) {
	                    e.printStackTrace();
	                }
	            }
	        };
	        Runnable run4= new Runnable() {
	            @Override
	            public void run() {
	                try {
	                    Thread.sleep(3000);
	                    System.out.println("统计F盘");
	                    countDownLatch.countDown();
	                } catch (InterruptedException e) {
	                    e.printStackTrace();
	                }
	            }
	        };
            //创建固定线程的线程池
	        ExecutorService service= Executors.newFixedThreadPool(4);
	        service.submit(run1);
	        service.submit(run2);
	        service.submit(run3);
	        service.submit(run4);
//	        new Thread(run1).start();
//	        new Thread(run2).start();
//	        new Thread(run3).start();
//	        new Thread(run4).start();
	        countDownLatch.await();//主线程,即第5线程等待
	        System.out.println("合计C,D,E,F");
	        service.shutdown();
}

       运行结果如下,统计前四个盘大小可以没有顺序,但合计始终在最后:

2. newSingleThreadExecutor()方法

       创建只有一个线程的线程池,若线程池中有空闲的线程,则立即执行。若没有,则会暂缓在一个阻塞队列LinkedBlockingQueue中等待有空闲的线程去执行,它保证所有任务按照提交顺序执行。我们来看看newSingleThreadExecutor方法的源码:

代码语言:javascript
复制
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService//先不用关注这个
            (new ThreadPoolExecutor(1, //核心线程数
                                    1,//最大线程数
                                    0L,//空闲时保持线程活着的时间
                                    TimeUnit.MILLISECONDS,//上面时间的单位
                                    new LinkedBlockingQueue<Runnable>()));//当线程池没有空闲线程,就放在这个队列里
    }

       应用场景:这个线程池会在仅有的一个线程发生异常时,重新启动一个线程来替代原来的线程执行下去。

3.newCachedThreadPool()方法

       创建一个可根据实际情况调整线程个数的线程池,不限制线程数量。若有任务,则创建线程。若无任务,则不创建线程,并且每一个空闲的线程会在60秒后自动回收。我们来看看源码:

代码语言:javascript
复制
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0,//核心线程数,0表示初始化不创建线程
                                      Integer.MAX_VALUE,//int的最大值,表示不限制线程池容量
                                      60L,//缓存线程60秒
                                      TimeUnit.SECONDS,//单位
                                      new SynchronousQueue<Runnable>());
    }

       源码中的SynchronousQueue这个没有容量的队列一创建,内部就使用take()方法阻塞着,当有一个线程来了直接就执行。

4.newScheduledThreadPool()方法

       创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求。它的源码如下:

代码语言:javascript
复制
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
   return new ScheduledThreadPoolExecutor(corePoolSize);
}
public class ScheduledThreadPoolExecutor  extends ThreadPoolExecutor//注意这里继承了ThreadPoolExecutor
        implements ScheduledExecutorService {

   public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize,//核心线程数,传入
              Integer.MAX_VALUE,//int的最大值,表示不限制线程池容量
              0, //表示没有延迟
              TimeUnit.NANOSECONDS,//单位是纳秒
              new DelayedWorkQueue());
   }

}

       源码中的DelayedWorkQueue是带有延迟时间的一个队列,其中元素只有当指定时间到了,才能够从队列中获取元素,可以做定时的功能。

       创建一个任务,等3秒初始化后每隔1秒打印一句话:

代码语言:javascript
复制
public class ScheduledThread {
	public static void main(String args[]) throws Exception {
    	Temp command = new Temp();
    	//创建一个实现定时器的线程池
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        //command表示具体的任务对象,第一个数字表示初始化的时间,第二个数字表示轮询的时间
        ScheduledFuture<?> scheduleTask = scheduler.scheduleWithFixedDelay(command, 3, 1, TimeUnit.SECONDS);
    }
}
class Temp extends Thread {
    public void run() {
        System.out.println("run");
    }
}

       这个类似于Java的Timer定时器,但项目中用Quartz,跟Spring整合的话,最好用@Scheduled注解。ref:Spring Schedule 任务调度实现

5.自定义线程池

        在上述Executors工厂类创建线程池时,它的创建线程方法内部实现均用了ThreadPoolExecutor这个类,ThreadPoolExecutor可以实现自定义线程池,它的构造方法如下:

代码语言:javascript
复制
 public ThreadPoolExecutor(int corePoolSize,//核心线程数
                           int maximumPoolSize,//最大线程数
                           long keepAliveTime,//线程保持多久
                           TimeUnit unit,//单位
                           BlockingQueue<Runnable> workQueue,//线程池功能
                           ThreadFactory threadFactory,//先不关注这个
                           RejectedExecutionHandler handler)//拒绝策略,比如超过最大线程数了,可以告诉客户服务器繁忙
                             {...}

       这个构造方法对于BlockingQueue队列是什么类型比较关键,它关乎这个自定义线程池的功能。

       1.使用有界队列ArrayBlockingQueue时,实际线程数小于corePoolSize时,则创建线程。若大于corePoolSize时,则任务会加入BlockingQueue队列中,若队列已满,则在实际线程总数不大于maximumPoolSize时,创建新线程。若还大于maximumPoolSize,则执行拒绝策略,或者自定义的其他方式。

       2.使用无界队列LinkedBlockingQueue时,缓冲队列,当实际线程超过corePoolSize核心线程数后放置等待的线程,最后等系统空闲了在这个队列里取,maximumPoolSize参数在这里就没有作用了。因为它是无界队列,所以除非系统资源耗尽,否则不会出现任务入队失败的情况。比如创建任务的速度和处理速度差异很大,无界队列会保持快速增长,直到系统内存耗尽。

       有界队列和无界队列实例如下:

代码语言:javascript
复制
public class ThreadPoolExecutorDemo implements Runnable{
	private static AtomicInteger count = new AtomicInteger(0);
	
	@Override
	public void run() {
		try {
			int temp = count.incrementAndGet();
			System.out.println("任务" + temp);
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	public static void main(String[] args) throws Exception{
		BlockingQueue<Runnable> queue = 
				new LinkedBlockingQueue<Runnable>();
				//new ArrayBlockingQueue<Runnable>(10);
		ExecutorService executor  = new ThreadPoolExecutor(
					5, 		//corePoolSize
					10, 	//使用无界队列LinkedBlockingQueue时,maximumPoolSize这个参数值不起作用
					120L, 	//2分钟
					TimeUnit.SECONDS,
					queue);
		
		for(int i = 0 ; i < 15; i++){//提交15个任务
			executor.execute(new ThreadPoolExecutorDemo());
		}
		Thread.sleep(1000);
		System.out.println("queue size:" + queue.size());
		executor.shutdown();
	}
}

       用LinkedBlockingQueue无界队列执行后结果是每过一段时间5个任务一执行:

对于拒绝策略,即当任务数量超过了系统实际承载能力时该如何处理呢?JDK提供了几种实现策略:

       AbortPolicy:直接抛出异常来阻止系统正常工作。

       CallerRunsPolicy:只要线程池未关闭,会把丢弃的任务先执行。

       DiscardOledestPolicy:丢弃最老的一个请求,尝试再次提交当前任务

       DiscardPolicy:丢弃无法处理的任务,不给于任何处理。

这四种策略个人觉得都不太好,我们可以实现一个自定义策略,在这里实现RejectedExecutionHandler接口就好了:

代码语言:javascript
复制
public class MyThreadPoolExecutor {
	public static void main(String[] args) {
		ThreadPoolExecutor pool = new ThreadPoolExecutor(
				1, 				//coreSize
				2, 				//MaxSize
				60, 			//60
				TimeUnit.SECONDS, 
				new ArrayBlockingQueue<Runnable>(3)			//指定一种队列 (有界队列)
				//new LinkedBlockingQueue<Runnable>()
				, new MyRejected()
				//, new DiscardOldestPolicy()//直接抛出异常
				);
		
		MyTask mt1 = new MyTask(1, "任务1");//第一个任务会直接执行
		MyTask mt2 = new MyTask(2, "任务2");//第二个任务会放入队列里,等第一个任务执行完以后才执行
		MyTask mt3 = new MyTask(3, "任务3");//因为队列里有三个容量,所以任务3也会放入队列里
		MyTask mt4 = new MyTask(4, "任务4");//因为队列里有三个容量,所以任务4也会放入队列里
		MyTask mt5 = new MyTask(5, "任务5");//假如有5个任务,任务1和5同时执行,任务234放在队列里
		MyTask mt6 = new MyTask(6, "任务6");//队列满了,线程池的最大线程数也超过了,则会实行拒绝策略
		
		pool.execute(mt1);
		pool.execute(mt2);
		pool.execute(mt3);
		pool.execute(mt4);
		pool.execute(mt5);
		pool.execute(mt6);
		
		pool.shutdown();
	}
}
class MyRejected implements RejectedExecutionHandler{
	@Override
	//传入当前任务对象和当前线程池对象
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		//1.可以做一些处理,比如用http再创建请求给传数据的客户端,让它重新发送任务。高峰期的时候,系统已经超负荷了,不建议再发送请求
		//2.只是记录日志:id及相关重要的信息,暂缓到磁盘上,在不是高峰期的时候跑一些定时的job解析日志,把没处理的任务再处理一遍或者批处理下,一般用这个
		System.out.println("自定义处理..");
		System.out.println("当前被拒绝任务为:" + r.toString());
	}
}
class MyTask implements Runnable {
	private int taskId;
	private String taskName;
	
	public MyTask(int taskId, String taskName){this.taskId = taskId;this.taskName = taskName;}
	public int getTaskId() {return taskId;}
	public void setTaskId(int taskId) {this.taskId = taskId;}
	public String getTaskName() {return taskName;}
	public void setTaskName(String taskName) {this.taskName = taskName;}
	
	@Override
	public void run() {
		try {
			System.out.println("run taskId =" + this.taskId);
			Thread.sleep(3000);
			//System.out.println("end taskId =" + this.taskId);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}		
	}
	public String toString(){
		return Integer.toString(this.taskId);
	}
}

        运行结果如下:

        到这里已经介绍完了Java并发包下的线程池,博主是个普通的程序猿,水平有限,文章难免有错误,欢迎牺牲自己宝贵时间的读者,就本文内容直抒己见。

系列:

【JDK并发包基础】线程池详解

【JDK并发包基础】并发容器详解

【JDK并发包基础】工具类详解

【JDK并发基础】Java内存模型详解

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.newFixedThreadPool()方法
  • 2. newSingleThreadExecutor()方法
  • 3.newCachedThreadPool()方法
  • 4.newScheduledThreadPool()方法
  • 5.自定义线程池
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档