专栏首页健程之道Java 线程池讲解——针对 IO 密集型任务

Java 线程池讲解——针对 IO 密集型任务

针对 IO 密集型的任务,我们可以针对原本的线程池做一些改造,从而可以提高任务的处理效率。

基本

阿里巴巴泰山版java开发手册中有这么一条:

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,
这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

那么如果要使用 ThreadPoolExecutor ,那就先来看看构造方法中的所有入参:

corePoolSize : 核心线程数,当线程池中的线程数量为 corePoolSize 时,即使这些线程处于空闲状态,也不会销毁(除非设置 allowCoreThreadTimeOut)。
maximumPoolSize : 最大线程数,线程池中允许的线程数量的最大值。
keepAliveTime : 线程空闲时间,当线程池中的线程数大于 corePoolSize 时,多余的空闲线程将在销毁之前等待新任务的最长时间。
workQueue : 任务队列
unit :线程空闲时间的单位。
threadFactory :线程工厂,线程池创建线程时使用的工厂。
handler : 拒绝策略,因达到线程边界和任务队列满时,针对新任务的处理方法。

这么说可能有些难以理解,你可以结合下图进行参考:

那么由此我们可以知道,当大量任务被放入线程池之后,先是被核心线程执行,多余的会被放进队列里,当队列满了之后才会创建额外的线程进行处理,再多就会采取拒绝策略。

但这样真的能满足我们的所有需求吗?

任务的分类

正常来说,我们可以把需要处理的任务按照消耗资源的不同,分为两种:CPU 密集型IO 密集型

CPU 密集型

既然名字里带有CPU了,说明其消耗的主要资源就是 CPU 了。

具体是指那种包含大量运算、在持有的 CPU 分配的时间片上一直在执行任务、几乎不需要依赖或等待其他任何东西。

这样的任务,在我的理解中,处理起来其实没有多少优化空间,因为处理时几乎没有等待时间,所以一直占有 CPU 进行执行,才是最好的方式。

唯一能想到优化的地方,就是当单个线程累计较多任务时,其他线程能进行分担,类似fork/join框架的概念。

设置线程数时,针对单台机器,最好就是有几个 CPU ,就创建几个线程,然后每个线程都在执行这种任务,永不停歇。

IO 密集型

和上面一样,既然名字里带有IO了,说明其消耗的主要资源就是 IO 了。

我们所接触到的 IO ,大致可以分成两种:磁盘 IO网络 IO

磁盘 IO ,大多都是一些针对磁盘的读写操作,最常见的就是文件的读写,假如你的数据库、 Redis 也是在本地的话,那么这个也属于磁盘 IO。

网络 IO ,这个应该是大家更加熟悉的,我们会遇到各种网络请求,比如 http 请求、远程数据库读写、远程 Redis 读写等等。

IO 操作的特点就是需要等待,我们请求一些数据,由对方将数据写入缓冲区,在这段时间中,需要读取数据的线程根本无事可做,因此可以把 CPU 时间片让出去,直到缓冲区写满。

既然这样,IO 密集型任务其实就有很大的优化空间了(毕竟存在等待),那现有的线程池可以很好的满足我们的需求吗?

线程池的优化

还记得上面说的, ThreadPoolExecutor 针对多余任务的处理,是先放到等待队列中,当队列塞满后,再创建额外的线程进行处理。

假设我们的任务基本都是 IO 密集型,我们希望程序可以有更高的吞吐量,可以在更短的时间内处理更多的任务,那么上面的 ThreadPoolExecutor 明显是不满足我们的需求,那该如何解决呢?

也许再来看看 ThreadPoolExecutor 的 execute 方法,会让我们有一些思路:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 如果当前活跃线程数,小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 则优先创建线程
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果任务可以成功放入队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果不可以成功放入队列,则创建线程
        else if (!addWorker(command, false))
            // 如果无法继续创建线程,则拒绝任务
            reject(command);
    }

针对放入队列的操作,如果队列放入失败,线程池就会选择去创建线程了。因此,我们或许可以尝试自定义线程池,针对 offer 操作,做一些自定义处理。

也就是将任务放入队列时,先检查线程池的线程数是否小于最大线程数,如果是,则拒绝放入队列,否则,再尝试放入队列中。

如果你有看过 dubbo 或者 tomcat 的线程池,你会发现他们就有这样的实现方法。

比如 dubbo 中的 TaskQueue,我们来看看它的 offer 方法:

    @Override
    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }

        int currentPoolThreadSize = executor.getPoolSize();
        // 如果有空闲等待的线程,则将任务放入队列中,让线程去处理任务
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }

        // 如果当前线程数小于最大线程数,则返回 false ,让线程池去创建新的线程
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        // 否则,就将任务放入队列中
        return super.offer(runnable);
    }

这样就可以让线程池优先新建线程了。需要注意的是,此时的队列因为需要根据线程池中的线程数决定是否放入任务成功,所以需要持有executor对象,这点不要忘记奥。

总结

通过本篇文章,主要是让大家重新了解了一下 ThreadPoolExecutor ,并针对高吞吐场景下如何进行局部优化。

有兴趣的话可以访问我的博客或者关注我的公众号,说不定会有意外的惊喜。

https://death00.github.io/

公众号:健程之道

本文分享自微信公众号 - 健程之道(JianJianCoder),作者:健健壮

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-05-20

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Java中Synchronized的优化原理

    我们知道,从 JDK1.6 开始,Java 对 Synchronized 同步锁做了充分的优化,甚至在某些场景下,它的性能已经超越了 Lock 同步锁。那么就让...

    健程之道
  • Java面试-interrupt

    我们都知道,Java中停止一个线程不能用stop,因为stop会瞬间强行停止一个线程,且该线程持有的锁并不能释放。大家多习惯于用interrupt,那么使用它又...

    健程之道
  • ThreadLocal的进化——InheritableThreadLocal

    之前有介绍过 ThreadLocal,JDK 后来针对此做了一个升级版本 InheritableThreadLocal,今天就来好好介绍下。

    健程之道
  • Dubbo线程模型

    dubbo线程模型包括线程模型策略和dubbo线程池策略两个方面,下面就依次进行分析。

    luoxn28
  • 一文搞定Java并发编程面试考点

    任何线程都可以设置为守护线程和用户线程,通过方法Thread.setDaemon(bool on);true则把该线程设置为守护线程,反之则为用户线程。Thre...

    黄泽杰
  • 「JAVA」线程生命周期分阶段详解,哲学家们也深感死锁难解

    每个事物都有其生命周期,也就是事物从出生开始到最终消亡这中间的整个过程;在其整个生命周期的历程中,会有不同阶段,每个阶段对应着一种状态,比如:人的一生会经历从婴...

    老夫编程说
  • 线程的生命周期

    线程的六种状态: NEW、RUNNABLE、BIOCKED、WAITING、TIME_WAITING、TERMINATED。

    用户7386338
  • 「JAVA」线程生命周期分阶段详解,哲学家们深感死锁难解

    每个事物都有其生命周期,也就是事物从出生开始到最终消亡这中间的整个过程;在其整个生命周期的历程中,会有不同阶段,每个阶段对应着一种状态,比如:人的一生会经历从婴...

    老夫编程说
  • 没想到,这么简单的线程池用法,深藏这么多坑!

    生产有个对账系统,每天需要从渠道端下载对账文件,然后开始日终对账。这个系统已经运行了很久,前两天突然收到短信预警,没有获取渠道端对账文件。

    andyxh
  • Java 线程池中的线程复用是如何实现的?

    那么就来和大家探讨下这个问题,在线程池中,线程会从 workQueue 中读取任务来执行,最小的执行单位就是 Worker,Worker 实现了 Runnabl...

    用户1516716

扫码关注云+社区

领取腾讯云代金券