聊一下为什么要使用线程池? 程序的运行本质,就是通过使用系统资源(CPU、内存、网络、磁盘等等)来完成信息的处理,比如在JVM中创建一个对象实例需要消耗CPU的和内存资源,如果你的程序需要频繁创建大量的对象,并且这些对象的存活时间短就意味着需要进行频繁销毁,那么很有可能这段代码就成为了性能的瓶颈。总结下来其实就以下几点。
简单言之,线程池就是将用过的对象保存起来,等下一次需要这个对象的时候,直接从对象池中拿出来重复使用,避免频繁的创建和销毁。在Java中万物皆对象,那么线程也是一个对象,Java线程是对于操作系统线程的封装,创建Java线程也需要消耗操作系统的资源,因此就有了线程池。
首先了解一下线程池创建以及工作原理。线程池创建主要由ThreadPoolExecutor
类完成。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
各个参数说明:
corePoolSize
:线程池中核心线程数的数量
maximumPoolSize
:在线程池中允许存在的最大线程数
keepAliveTime
:如果当前线程池中的线程数超过了corePoolSize,那么如果在keepAliveTime时间内都没有新的任务需要处理,那么超过corePoolSize的这部分线程就会被销毁。默认情况下是不会回收core线程的,可以通过设置allowCoreThreadTimeOut改变这一行为。
unit
:时间单位
workQueue
:工作队列,线程池中的当前线程数大于核心线程的话,那么接下来的任务会放入到队列中
threadFactory
:通过工厂模式来生产线程。创建线程都是通过ThreadFactory来实现的,如果没指定的话,默认会使用Executors.defaultThreadFactory(),一般来说,我们会在这里对线程设置名称、异常处理器等。
handler
:如果超过了最大线程数,那么就会执行我们设置的拒绝策略。
线程池工作流程图如下:
1-> 当任务提交时,线程池先检查当前线程数;如果当前线程数小于核心线程数(corePoolSize),则创建线程并执行任务;比如开始提交任务时,线程数为0;
2->当线程任务不断增加时,创建的线程数等于核心线程数(corePoolSize),则新增的任务将会被添加到工作队列中(workQueue),等待核心线程将当前任务执行结束,重新从工作队列中获取任务执行;
3->当任务非常多时,并且达到工作队列的最大容量,但是当前线程数小于最大线程数(maximumPoolSize),线程池会在核心线程的基础上继续创建线程(非核心线程)执行任务;
4->当任务继续增加时,线程池的线程数达到最大线程数;如果任务继续增加,此时线程池则会采取拒绝策略拒绝执行任务,默采用AbortPolicy策略
,抛出异常。
ThreadPoolExecutor提供了四种拒绝策略:
public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** *直接抛异常出去 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** *将此任务交给调用者直接执行 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * 丢弃最老的任务 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
对于当前任务不做任何操作
,简单言之:直接丢弃。public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** *不执行任何操作 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
除了以上四种拒绝策略,Java还支持自定义拒绝策略,我们实现RejectExecutionHandler
接口即可;
上面我们了解了线程池的参数、工作流程、拒绝策略,下面我们了解一下如何设置参数能够达到线程池的最大利用率呢? 正确的定制线程池的长度,需要了解当前计算机配置、所需资源的情况以及任务的特性。比如部署的计算机安装了多少个CPU?多少的内存?任务主要执行是IO密集型还是CPU密集型?所执行任务是否需要数据库连接这样的稀缺资源?
N:CPU的数量 U:目标CPU的使用率,0<=U<=1 W/C:等待时间与计算时间的比率 那么最优的线程池的大小就是=NU(1+W/C)
了解了上面四种线程池后,大家应该会明白阿里巴巴规约中为什么会推荐手动创建线程池。
解释
FixedThreadPool和SingleThreadExecutor:这两个线程池的实现方式,可以看到它设置的工作队列都是LinkedBlockingQueue,该队列是一个链表形式的队列,此队列是没有长度限制的,是一个无界队列
,那么此时如果有大量请求,就有可能造成OOM。
CachedThreadPool和ScheduledThreadPool:这两个线程池的实现方式,可以看到它设置的最大线程数都是Integer.MAX_VALUE
,那么就相当于允许创建的线程数量为Integer.MAX_VALUE
。此时如果有大量请求来的时候也有可能造成OOM。
手动创建一个线程池,并实现多线程执行任务。
public class ThreadPoolTask { private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolTask.class); public static void main(String[] args) { List<String> idList = new LinkedList<>(); for (int i =0; i < 20; i ++) { idList.add("i" + i); } //默认20个任务未被执行false Map<String, Object> taskMap = new LinkedHashMap<>(); idList.forEach(e -> taskMap.put(e, Boolean.FALSE)); //自定义线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(4)); //多线程执行任务 idList.forEach(id -> { Future<?> taskResult = threadPoolExecutor.submit(new HandlerTask(taskMap, id)); try { taskResult.get(); } catch (Exception e) { e.printStackTrace(); } }); //关闭线程池 threadPoolExecutor.shutdown(); } } class HandlerTask implements Runnable { private final Map<String, Object> taskMap; private final String id; public HandlerTask(Map<String, Object> taskMap, String id) { this.taskMap = taskMap; this.id = id; } @Override public void run() { System.out.println("start=" + Thread.currentThread().getName()); handler(id); System.out.println("end=" + Thread.currentThread().getName()); } private void handler(String id) { Object o = taskMap.get(id); //判断是否被执行过 if (!Boolean.TRUE.equals(o)) { //省略业务逻辑 taskMap.put(id, true); } } }
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句