面试官:怎样去运用线程池?工作中如何使用?
工作中,我们有时候需要实现一些耗时的任务。比如:将 Word 转换成 PDF 存储的需求。
假设我们不使用线程池。那么每次请求都会开启新的线程,如果请求过多,就会导致资源耗尽,系统宕机。
import org.junit.Test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadVs { /** * 老的处理方式 */ @Test public void oldHandle() throws InterruptedException { /** * 使用循环来模拟许多用户请求的场景 */ for (int request = 1; request <= 100; request++) { new Thread(() -> { System.out.println("文档处理开始!"); try { // 将Word转换为PDF格式:处理时长很长的耗时过程 Thread.sleep(1000L * 30); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("文档处理结束!"); }).start(); } Thread.sleep(1000L * 1000); } }
在这招场景下,我们就可以使用线程池了。
/** * 新的处理方式 */ @Test public void newHandle() throws InterruptedException { /** * 开启了一个线程池:线程个数是10个 */ ExecutorService threadPool = Executors.newFixedThreadPool(10); /** * 使用循环来模拟许多用户请求的场景 */ for (int request = 1; request <= 100; request++) { threadPool.execute(() -> { System.out.println("文档处理开始!"); try { // 将Word转换为PDF格式:处理时长很长的耗时过程 Thread.sleep(1000L * 30); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("文档处理结束!"); }); } Thread.sleep(1000L * 1000); }
现在使用场景有了,但我们应该还需求向面试官解释线程池是怎么使用的?
先解释一番什么是线程池。
❝ 线程池顾名思义就是事先创建若干个可执行的线程放入一个池中(容器),需要的时候从池中获取线程不用自行创建,使用完毕不需要销毁线程而是放回池中,从而减少创建和销毁线程对象的开销。任何池化技术都是减低资源消耗,例如我们常用的数据库连接池。 ❞
从上面我们也可以看出,为什么要使用线程池了。
❝ 降低资源消耗;提高响应速度;提高线程的可管理性。 ❞
到这里,我认为整个问题的回答还不算完美。我们还应该讲一讲线程池是如何实现的?或者说让你自己写一个线程池,你会如何实现?
设计过程中我们需要思考的问题
第一次需求分析:简陋版本
下图是最简陋的线程池版本:具有的功能有
该设计方案如下图所示:
看完上图,我们需要考虑下面几个问题:
第二次需求分析:改进版本
改进版依然需要解决的三个问题
这个时候,面试官已经看出你的整个思考过程了。虽然不完美,但是说明你确实是熟悉线程池的。在这个时候,我们就需要参考巨人的设计了:ThreadPoolExecutor
。
corePoolSize 核心线程数量; maximumPoolSize 最大线程数量; keepAliveTime 线程空闲后的存活时间(没有任务后); unit 时间单位; workQueue 用于存放任务的阻塞队列; threadFactory 线程工厂类; handler 当队列和最大线程池都满了之后的饱和策略
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); // 这几个参数都是必须要有的 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
接着再分析一下 java 源码里面设计的线程池的处理流程。
注意1个问题:
❝ 阻塞队列未满,是不会创建新的线程的 ❞
第二个,线程池可选择的阻塞队列。
插入移除操作:插入操作和移除操作
下面是三种阻塞队列的 Java 代码实现。
import org.junit.Test; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; public class QueueTest { @Test public void arrayBlockingQueue() throws InterruptedException { /** * 基于数组的有界阻塞队列,队列容量为10 */ ArrayBlockingQueue queue = new ArrayBlockingQueue<Integer>(10); // 循环向队列添加元素 for (int i = 0; i < 20; i++) { queue.put(i); System.out.println("向队列中添加值:" + i); } } @Test public void linkedBlockingQueue() throws InterruptedException { /** * 基于链表的有界/无界阻塞队列,队列容量为10 */ LinkedBlockingQueue queue = new LinkedBlockingQueue<Integer>(); // 循环向队列添加元素 for (int i = 0; i < 20; i++) { queue.put(i); System.out.println("向队列中添加值:" + i); } } @Test public void test() throws InterruptedException { /** * 同步移交阻塞队列 */ SynchronousQueue queue = new SynchronousQueue<Integer>(); // 插入值 new Thread(() -> { try { queue.put(1); System.out.println("插入成功"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 删除值 /* new Thread(() -> { try { queue.take(); System.out.println("删除成功"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); */ Thread.sleep(1000L * 60); } }
第三个问题,线程池可选择的饱和策略。
当阻塞队列满和最大线程数满了的时候,饱和策略就会发挥作用
线程池的执行示意图
// 线程数量无限大的线程池,需要小心 /***创建一个线程池,该线程池根据需要创建新线程将重用先前构造的可用的线程。 *这些池通常可以提高性能执行许多短暂的异步任务的程序。 *调用{@code execute}将重用以前构造的线程(如果有)。 * 如果没有现有线程可用,则新线程将被创建并添加到池中。 *具有的线程 *六十秒未使用将终止并从缓存中删除 *因此,闲置足够长时间的池将不消耗任何资源。请注意,类似的池属性, *但细节不同(例如,超时参数)可以使用{@link ThreadPoolExecutor}构造函数创建。 * @返回新创建的线程池 */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
/** * 线程数量固定的线程池 * nThreads 核心线程数和最大核心线程数 * LinkedBlockingQueue 无界阻塞队列,注意这个是无界的无限长的任务队列 */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
/** * 单一线程的线程池 * LinkedBlockingQueue 无界阻塞队列,注意这个是无界的无限长的任务队列 */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
接下来需要注意的是:如何向线程池提交任务?
向线程池提交任务的两种方式:
@Test public void submitTest() throws ExecutionException, InterruptedException { // 创建线程池 ExecutorService threadPool = Executors.newCachedThreadPool(); /** * 利用submit方法提交任务,接收任务的返回结果 */ Future<Integer> future = threadPool.submit(() -> { Thread.sleep(1000L * 10); return 2 * 5; }); /** * 阻塞方法,直到任务有返回值后,才向下执行 */ Integer num = future.get(); System.out.println("执行结果:" + num); }
@Test public void executeTest() throws InterruptedException { // 创建线程池 ExecutorService threadPool = Executors.newCachedThreadPool(); /** * 利用execute方法提交任务,没有返回结果 */ threadPool.execute(() -> { try { Thread.sleep(1000L * 10); } catch (InterruptedException e) { e.printStackTrace(); } Integer num = 2 * 5; System.out.println("执行结果:" + num); }); Thread.sleep(1000L * 1000); }
最后再总结一下线程池的状态流转。
如果本文的所有内容你都能够回答总结出来,我相信你一定会获得面试官的认可。肯定不缺高薪 Offer 了!
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句