探索JAVA并发 - 线程池详解

线程池是并发编程中必不可少的一种工具,也是面试高频话题。

线程池,即管理着若干线程的资源池(字面意思)。相比于为每个任务分配一个线程,在线程池中执行任务优势更多:

1.线程复用:线程池中的线程是可以复用的,省去了创建、销毁线程的开销,提高了资源利用率(创建、销毁等操作都是要消耗系统资源的)和响应速度(任务提交过来线程已存在就不用等待线程创建了);

2.合理利用资源:通过调整线程池大小,让所有处理器尽量保持忙碌,又能防止过多线程产生过多竞争浪费资源;

常用的线程池主要是ThreadPoolExecutor 和 ScheduledThreadPoolExecutor(定时任务线程池,继承ThreadPoolExecutor)。

Executor框架

在JAVA中,任务执行的主要抽象不是Thread,而是Executor。Executor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。

所谓Executor框架,其实就是定义了一个接口,我们常用的线程池ThreadPoolExecutor 就是对这个接口的一种实现。

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command 可执行的任务
     * @throws RejectedExecutionException 任务可能被拒绝(当Executor处理不了的时候)
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

Executors与常用线程池

Executors 其实就是Executor(加s)

Executors是一个Executor的工厂,有很多定义好的工厂方法,可以帮助懒惰的 开发者快速创建一个线程池。下面是几个常用的工厂方法:

  • newFixedThreadPool 固定长度线程池,每次提交任务都会创建一个新线程,直到线程数量达到指定阈值则不再创建新的;
  • newCachedThreadPool 可缓存线程池,每次提交任务都会创建一个新线程(理论上无限制),部分任务执行完后如果没有新的任务,导致某些线程无用武之地,它们将被终结;
  • newSingleThreadExecutor 只有一个线程的线程池;
  • newScheduledThreadPool 可以延时或者定时执行任务的线程池。 public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } }

如果查看上述工厂方法的源码,会发现只是 new 了一个线程池对象返回给调用 者而已,没什么花里胡哨的东西。不过看看构造参数还真不少,通过这种方式 比起我们自己 new 一个线程池要简单多了(才怪)。

线程池构造参数

了解线程池构造参数的意义,能让我们更清楚程序执行逻辑。

  • int corePoolSize : 核心线程数,有新任务来时,如果当前线程小于核心线程,则新建一个线程来执行该任务
  • int maximumPoolSize : 最大线程数,线程池最多拥有的线程数
  • long keepAliveTime : 空闲线程存活时间
  • TimeUnit unit : 空闲线程存活时间的单位
  • BlockingQueue workQueue : 存放待执行任务的阻塞队列,新任务来时,若当前线程数>=最大核心线程数,则放到这个队列(具体逻辑更复杂,请看下面源码分析)
  • ThreadFactory threadFactory : 创建新线程的工厂,一般用来给线程取个名字方便排查问题
  • RejectedExecutionHandler handler : 任务被拒绝后的处理器,默认的处理器会直接抛出异常,建议重新实现
  • 配合源码,效果更佳:
  • public class ThreadPoolExecutor extends AbstractExecutorService { // 构造函数 public ThreadPoolExecutor(int corePoolSize, // 核心线程数 int maximumPoolSize, // 最大线程数 long keepAliveTime, // 空闲线程存活时间 TimeUnit unit, // 空闲线程存活时间的单位 BlockingQueue<Runnable> workQueue, // 存放待执行任务的阻塞队列 ThreadFactory threadFactory, // 创建新线程的工厂 RejectedExecutionHandler handler // 任务被拒绝后的处理器 ) { // ... } // 提交任务 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 没翻,懒得翻 * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // 当前状态值 int c = ctl.get(); // 当前线程数 = workerCountOf(c) 小于 核心线程数 的上限时 // 直接创建一个线程来执行任务 if (workerCountOf(c) < corePoolSize) { // 并发提交场景下可能会失败 if (addWorker(command, true)) return; // 新增成功就可以结束了 // 失败就更新下线程池状态 c = ctl.get(); } // 不能创建核心线程来执行,并不会直接创建非核心线程,而是把任务暂存到阻塞队列 // isRunning(c)判断线程池是否还在运行 // workQueue.offer(command)返回值表示是否成功提交到队列 if (isRunning(c) && workQueue.offer(command)) { // 成功放到队列里了,再检查一下线程池状态 int recheck = ctl.get(); // 如果线程池已经没有运行了,则尝试把新增的任务从队列移除 // remove(command)返回值表示是否移除成功 if (! isRunning(recheck) && remove(command)) reject(command); // 移除成功后,执行拒绝策略 // 检查下当前线程数是否为0,如果是的话新建一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 线程池没有运行,或者放入队列失败(比如队列已满) // 则创建非核心线程去执行任务,这也失败就只能拒绝了 else if (!addWorker(command, false)) reject(command); }

当对线程池的构造参数和任务处理逻辑有了以上大致的了解后,回想Executors 提供的几个工厂方法,或许会感到所谓提供便利性的方法并不那么便利。因为从方法的名字上来看很难和线程池的配置准确关联,想要清除地知道这些方法创建的线程池如何运作,就需要知道他们用了怎样的构造参数,那为什么不直接使用构造方法呢?

所以尽量使用构造方法是更好的编程习惯,这样不管是作者还是其他开发者,只要看看传了什么参数,就知道这个线程池是怎么运作的了。

线程池创建示例

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {

    public static void main(String[] args) throws Exception {
        AtomicInteger threadCount = new AtomicInteger();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,  // 核心线程数
                10, // 最大线程数
                1,  // 空闲线程存活时间
                TimeUnit.MINUTES, // 空闲线程存活时间单位
                new ArrayBlockingQueue<>(100), // 一个指定上限的阻塞队列,存放待执行任务
                new ThreadFactory() {
                    // 自定义一个线程工厂来给线程池里的线程取名字
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "pool-thread-" 
                            + threadCount.incrementAndGet());
                    }
                },
                new RejectedExecutionHandler() {
                    // 自定义一个拒绝处理策略,安慰被线程池拒之门外的小可怜
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println("线程池拒绝了任务: " + r);
                    }
                }
        );
    }

}

有返回值的提交方式

submit

ThreadPoolExecutor.execute() 方法是没有返回值的,也就是说把任务提交给线程池后,我们就失去了它的消息,除非你还保留着它的引用,并且在里面有维护状态。如果不想这么麻烦,可以使用ThreadPoolExecutor.submit()来提交任务,这个方法会返回一个 Future 对象,通过这个对象可以知道任务何时被执行完。

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {

    public static void main(String[] args) throws Exception {
        // 线程池定义
        // ...

        Future<?> future = executor.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("我要关注: 一杯82年的JAVA");
            }
        });
        Object r = future.get();
        System.out.println("返回:" + r);
        executor.shutdown();
    }

}

/* 输出: 

我要关注: 一杯82年的JAVA
返回:null

*/

可以看到 Future.get() 是有返回值的,但是上面的例子返回了 null,因为任务是 一个Runnable 实现,run 方法没有返回值。

submit Callable

如果想任务有返回值,可以使用 Callable 作为任务定义。

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {

    public static void main(String[] args) throws Exception {
        // 线程池定义
        // ...

        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("I'm fine, and you?");
                return "我要关注: 一杯82年的JAVA";
            }
        });
        String r = future.get();
        System.out.println("返回:" + r);
        executor.shutdown();
    }

}

/* 返回:

I'm fine, and you?
返回:我要关注: 一杯82年的JAVA

*/

submit实现原理

为什么 submit 就可以让用户等待、获取任务返回?从源码讲起:

public abstract class AbstractExecutorService implements ExecutorService {

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 把任务用一个RunnableFuture又给包装了一下
        RunnableFuture<T> ftask = newTaskFor(task);
        // 最后还是调用了没有返回值的execute
        execute(ftask);
        return ftask;
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
}

// 看看这个包装类
public class FutureTask<V> implements RunnableFuture<V> {

     private Callable<V> callable;
     private volatile int state;

     // 也是Runable的一种实现,所以能在线程池中被执行
     public void run() {
        // 有个表示状态的标识 
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 执行用户的逻辑,获得返回值
                    // 这个步骤可能需要点时间
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    // 获取执行结果,阻塞直到状态改变
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
}

小结:submit 时用一个FutureTask 把用户提交的Callable包装起来,再把FutureTask 提交给线程池执行,FutureTask.run 运行时会执行 Callable 中的业务代码,并且过程中 FutureTask 会维护一个状态标识,根据状态标识,可以知道任务是否执行完成,也可以阻塞到状态为完成获取返回值。

关闭线程池

为什么需要关闭线程池?

  1. 如果线程池里的线程一直存活,而且这些线程又不是守护线程,那么会导致虚拟机无法正常退出;
  2. 如果直接粗暴地结束应用,线程池中的任务可能没执行完,业务将处于未知状态;
  3. 线程中有些该释放的资源没有被释放。

怎么关闭线程池?

  1. shutdown 停止接收新任务(继续提交会被拒绝,执行拒绝策略),但已提交的任务会继续执行,全部完成后线程池彻底关闭;
  2. shutdownNow 立即停止线程池,并尝试终止正在进行的线程(通过中断),返回没执行的任务集合;
  3. awaitTermination 阻塞当前线程,直到全部任务执行完,或者等待超时,或者被中断。

由于 shutdownNow 的终止线程是通过中断,这个方式并不能保证线程会提前停止。(关于中断: 如何处理线程中断)

一般先调用 shutdown 让线程池停止接客,然后调用 awaitTermination 等待正在工作的线程完事。

// 你的池子对我打了烊
executor.shutdown();

// 等待一首歌的时间(bei~bei~~)
// 如果超时还没结束返回false,你可以选择再等一首长点的歌,或者不等了
boolean ok = executor.awaitTermination(4, TimeUnit.SECONDS);

扩展线程池

线程池提供了一些扩展的方法,通过重写这些方法可以添加前置、后置操作,让使用更灵活。如 beforeExecute、afterExecute、terminated …

总结

线程池很好用,但使用不当会造成严重的后果,了解它各个属性表示的含义以及执行的流程能帮助我们少踩坑。

举个例子:如果设置了核心线程 < 最大线程数不等(一般都这么设置),但是又设置了一个很大的阻塞队列,那么很可能只有几个核心线程在工作,普通线程一直没机会被创建,因为核心线程满了会优先放到队列里,而不是创建普通线程。

文章来源:

https://acupt.cn/2019/07/30/concurrent-thread-pool/

欢迎关注公众号『easyserverdev』,同时,您也可以加入我的 QQ 群578019391。

本文分享自微信公众号 - 高性能服务器开发(easyserverdev)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-09-23

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏孙小白

深入理解static关键字

如果给一个属性加上static,那么这个属性不再属于某一个对象了,而是属于N个对象,共用同一个static属性。

9420
来自专栏诸葛青云的专栏

不找C++的工作,为什么要学习C++?

许多学编程的认为,特别是新手会觉得:“我又不找c语言的工作,需不需要学c语言?”,就象“我又不找C语言的工作,应不应该学c++”一样;我觉得答案不源于你做不做C...

8640
来自专栏授客的专栏

Loadrunner 脚本开发-从文件读取数据并参数化

if ((file_stream = fopen(filename, "r")) == NULL )

10810
来自专栏大道七哥

深入Java单例模式

原文出处:http://devbean.blog.51cto.com/448512/203501

8220
来自专栏CV学习史

Python vtk学习(1)

Vtk,(visualization toolkit)是一个开源的免费软件系统,主要用于三维计算机图形学、图像处理和可视化。Vtk是在面向对象原理的基础上设计和...

13910
来自专栏孙小白

编程语言分类

    编译型:即把源程序的每一条语句都编译成机器语言,并保存为二进制文件,这样运行时计算机可以直接以机器语言来运行此程序,优点:执行速度很快。缺点:开发效率低...

13920
来自专栏诸葛青云的专栏

资深程序员告诉你:想要将C++学到熟练运用,应该怎么去学?

C++语言从诞生到今天已经经历了将近30个年头。不可否认,它的学习难度都比其它 语言较高。而它的学习难度,主要来自于它的复杂性。现在C++的使用范围比以前已经少...

11850
来自专栏Rude3Knife的后端开发专栏

Java学习笔记(三)——类和对象

在类实例化的过程中自动执行的方法叫做构造方法,它不需要你手动调用。构造方法可以在类实例化的过程中做一些初始化的工作。

8010
来自专栏算法修养

LeetCode 60. Permutation Sequence

康拓展开其实就是表示一个连续序列,其实也不用连续,给定一个序列,可以很快速的按照字典序,列出所有序列。给出特定序列,快速告诉你它是按照字典序排序是第几个,给出排...

6710
来自专栏python-爬虫

django基础

7440

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励