Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Java中的 Threadpoolexecutor类

Java中的 Threadpoolexecutor类

作者头像
呼延十
发布于 2019-06-26 09:21:29
发布于 2019-06-26 09:21:29
51600
代码可运行
举报
文章被收录于专栏:呼延呼延
运行总次数:0
代码可运行

前言

在之前的文章Java中executors提供的的4种线程池中,学习了一下Executors类中提供的四种线程池.

在该文中提到,这四种线程池只是四个静态工厂方法而已,本质上其实是调用的ThreadPoolExecutor类的构造方法,并且对其中的一些参数进行了了解.比如corePoolSize,maximumPoolSize等等.

但是对其中剩余的两个参数,queen阻塞队列,handler拒绝策略等没有了解的十分透彻.因此今天来补充一下.

本文主要对以上两个参数的作用以及实现方法,使用方法来学习一下,中间可能夹杂部分ThreadPoolExecutor的源码学习.

阻塞队列

对阻塞队列完全不了解的同学可以查看一下这篇文章,Java中对阻塞队列的实现.

这里不会在对阻塞队列的原理做过多的探讨,主要聚焦于在线程池中阻塞队列的作用.

我前一阵面试的时候,对线程池这一块仅限于使用,一知半解(现在也是呢哈哈哈),在一次面试中问到了线程池中阻塞队列的作用,以及在什么情景下任务会被放入阻塞队列,而我一脸懵逼,今天也回答一下这个问题.

要想知道怎么放入,我们直接从execute方法来看,因为一般情况下我们都是通过这个方法来提交任务的,它的代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

我一般copy源代码都是删除注释的,因为实在太长了…但是这个的源代码我觉得十分的棒,简洁并且极其清晰.可以看一下.

方法上的注释:

将在未来的某个时间执行给定的任务,任务可能会在一个新的线程或者一个旧的线程里执行.

如果任务不可以被执行,可能是因为线程池关闭了或者容量满了,任务将会被RejectedExecutionHandler处理.

看起来是不是没有什么用?其实在大的逻辑上说的很清晰了,接下来是代码中的这一段注释.

分为三步:1.如果当前运行的线程数量小于核心池的数量,试着以给定的任务作为第一个任务去创建一个新的线程.这个添加worker的请求会原子性的检查线程的运行状态以及工作线程的数量,如果添加失败,会返回false.

2.如果这个任务可以被成功的放入队列,我们将在添加一个线程前进行double-check双重检查,因为可能在此期间有一个线程挂掉了或者线程池挂掉了.所以我们再次检查状态,如果必要的话回滚对象,或者新建一个线程.

3.如果我们不能讲任务放进队列,我们将新增一个线程,如果这也失败了,我们知道我们挂掉了或者说线程池的容量满了,然后我们拒绝这个任务.

这就是对上面那个问题的回答.也就是阻塞队列在线程池中的使用方法.

那么使用哪种阻塞队列呢?Java有很多的阻塞队列的实现的.

Executors的四种静态工厂中,使用的阻塞队列实现有两种,LinkedBlockingQueueSynchronousQueue.

LinkedBlockingQueue: 这个阻塞队列在前一篇文章中讲过了,主要强调一点,他可以是一个无界的阻塞队列,可以放下大量的任务.

SynchronousQueue: 这个阻塞队列内部没有容器,不会持有任务,而是将每一个生产者阻塞,知道等到与他配对的消费者.

从上面阻塞队列的使用方法中可以看出来,maximumPoolSize阻塞队列的长度这两个值会互相影响,当阻塞队列很大时,相应的maximumPoolSize可以小一点,对CPU的压力也就会相应的小一点.而当阻塞队列很小的时候,会频繁的出现放入队列失败,然后尝试新建线程,这时会出现两种可能,线程数暴增或者大量的拒绝任务,都不是很好的选择,

因此在决定使用哪种阻塞队列的时候,需要对吞吐量和CPU的压力之间做一个权衡.

拒绝策略

当你的阻塞队列以及线程池容量全部爆掉之后,再次提交任务就会被拒绝,拒绝的策略由构造参数中的handler来提供.

这是ThreadPoolExecutor中默认使用的拒绝策略AbortPolicy:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

可以看到这个策略比较粗暴,直接抛出了异常.

JDK中还有一些其他的拒绝策略,如下:

  • ThreadPoolExecutor#AbortPolicy:这个策略直接抛出 RejectedExecutionException 异常。
  • ThreadPoolExecutor#CallerRunsPolicy:这个策略将会使用 Caller 线程来执行这个任务,这是一种 feedback 策略,可以降低任务提交的速度。
  • ThreadPoolExecutor#DiscardPolicy:这个策略将会直接丢弃任务。
  • ThreadPoolExecutor#DiscardOldestPolicy:这个策略将会把任务队列头部的任务丢弃,然后重新尝试执行,如果还是失败则继续实施策略。

那么我们能不能自己实现一种策略呢,当然可以,还很简单.

我们实现一种策略,当被拒绝时候,打印一句日志然后给我们发一个邮件好了(不值当鸭).

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
        ThreadPoolExecutor ex = new ThreadPoolExecutor(10, 100, 100L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), new MyRejectPolicy());


    	private static class MyRejectPolicy implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("reject me,555");
            // send email ...
        }
    }

我们新建了一个线程池,核心池大小为10,最大大小为100,存活时间为100s,使用容量为10的LinkedBlockingQueue为工作队列,拒绝策略使用我们自己实现的一个策略,类定义如上所示.

只需要实现RejectedExecutionHandler接口并且实现他的唯一方法即可.

额外的小技巧

在看源代码的过程中,我发现了一个属性, private volatile boolean allowCoreThreadTimeOut; 这个属性可以控制核心池中的线程会不会因为空闲时间过程而死亡,虽然听起来没什么用,因为我们可以通过减小核心池的大小来达到差不多的目的,但是总是有区别的,记录一下,说不定就遇到合适使用的场景了呢.

钩子Hook

在git中,hook十分有用,可以让我们进行很多事情,比如自动化部署,发邮件等等.那么在线程池中怎么能没有呢?

ThreadPoolExecutor提供了三个Hook来让我们执行一些定制化的东西,可以通过继承此类然后重写钩子来实现,三个Hook分别是:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    protected void beforeExecute(Thread t, Runnable r) { }
    
    protected void afterExecute(Runnable r, Throwable t) { }

    protected void terminated() { }

他们分别在任务执行前,执行后,以及线程池终止的时候被调用.让我们来测试一下.

我们的类如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    private static class  MyExecutor extends ThreadPoolExecutor{


        public MyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }

        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            System.out.println("before");
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            System.out.println("after");
        }

        @Override
        protected void terminated() {
            System.out.println("executor terminate");

        }
    }

测试代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public static void main(String[] args) {
        ThreadPoolExecutor ex = new MyExecutor(10, 100, 100L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), new MyRejectPolicy());
        ex.execute(()->{
            for (int i = 0; i < 10; ++i) {
                System.out.println("i:" + i);
            }
        });

        ex.execute(()->{
            for (int i = 0; i < 10; ++i) {
                System.out.println("j:" + i);
            }
        });

        ex.shutdown();
    }

打印输出如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
before
i:0
i:1
i:2
i:3
i:4
i:5
before
i:6
i:7
i:8
i:9
j:0
j:1
j:2
j:3
j:4
j:5
j:6
j:7
after
j:8
j:9
after
executor terminate

可以看到我们对钩子的实现,完全的被执行了,所以我们可以用它做很多东西,比如记录日志,比如发推送消息,比如更加高级一点在执行之前设置ThreadLocal等等.具体操作就看我们的想象力了!

但是请注意一点,钩子中的内容如果执行错误,会影响任务本身的执行结果,要尽力保证钩子的正确性,不要顾此失彼.

完.

参考文章

https://mp.weixin.qq.com/s/Epi-cBVFkeZWgvKvOMQZqw

ChangeLog

2019-05-05 完成

以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客——>呼延十

var gitment = new Gitment({ id: 'Java中的 Threadpoolexecutor类', // 可选。默认为 location.href owner: 'hublanker', repo: 'blog', oauth: { client_id: '2297651c181f632a31db', client_secret: 'a62f60d8da404586acc965a2ba6a6da9f053703b', }, }) gitment.render('container')



本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
java并发线程池---了解ThreadPoolExecutor就够了
总结:线程池的特点是,在线程的数量=corePoolSize后,仅任务队列满了之后,才会从任务队列中取出一个任务,然后构造一个新的线程,循环往复直到线程数量达到maximumPoolSize执行拒绝策略。
intsmaze-刘洋
2018/08/29
2.7K0
面试题-关于Java线程池一篇文章就够了
线程池是一种多线程处理形式,处理过程中将任务提交到线程池,任务的执行交由线程池来管理。
程序新视界
2019/12/20
1.9K0
面试题-关于Java线程池一篇文章就够了
【死磕Java并发】-----J.U.C之线程池:ThreadPoolExecutor
原文出处http://cmsblogs.com/ 『chenssy』 作为Executor框架中最核心的类,ThreadPoolExecutor代表着鼎鼎大名的线程池,它给了我们足够的理由来弄清楚它。 下面我们就通过源码来一步一步弄清楚它。 内部状态 线程有五种状态:新建,就绪,运行,阻塞,死亡,线程池同样有五种状态:Running, SHUTDOWN, STOP, TIDYING, TERMINATED。 private final AtomicInteger ctl = new AtomicIntege
用户1655470
2018/04/26
8290
【死磕Java并发】-----J.U.C之线程池:ThreadPoolExecutor
JDK源码分析-ThreadPoolExecutor
ThreadPoolExecutor 是 JDK 中线程池的实现类,它的继承结构如下:
WriteOnRead
2019/08/30
4080
juc系列-Executor框架
ThreadPoolExecutor是jdk提供的线程池的服务类,基于ThreadPoolExecutor可以很容易将一个实现Runnable接口的任务放入线程池中执行,下面是ThreadPoolExecutor实现:
topgunviper
2022/05/12
3940
juc系列-Executor框架
Java源码之ThreadPoolExecutor
“ ThreadPoolExecutor类是线程池的基础,线程池的目的是为了减少了每个任务调用的开销,在拥有大量异步任务时可以增强的性能,并且还可以提供绑定和管理资源的方法”
每天学Java
2020/06/02
4680
java线程池(三):ThreadPoolExecutor源码分析
在前面分析了Executors工厂方法类之后,我们来看看AbstractExecutorService的最主要的一种实现类,ThreadpoolExecutor。
冬天里的懒猫
2020/09/18
8780
java线程池(三):ThreadPoolExecutor源码分析
详解 ThreadPoolExecutor 的参数含义及源码执行流程?
线程池是为了避免线程频繁的创建和销毁带来的性能消耗,而建立的一种池化技术,它是把已创建的线程放入“池”中,当有任务来临时就可以重用已有的线程,无需等待创建的过程,这样就可以有效提高程序的响应速度。但如果要说线程池的话一定离不开 ThreadPoolExecutor ,在阿里巴巴的《Java 开发手册》中是这样规定线程池的:
小熊学Java
2024/01/10
2400
详解 ThreadPoolExecutor 的参数含义及源码执行流程?
通过了解RejectedExecutionException来分析ThreadPoolExecutor源码
观看本文章之前,最好看一下这篇文章熟悉下ThreadPoolExecutor基础知识。 1.关于Java多线程的一些常考知识点 2.看ThreadPoolExecutor源码前的骚操作
用户2032165
2018/10/09
7070
通过了解RejectedExecutionException来分析ThreadPoolExecutor源码
线程池
在实际开发中,我们的项目里是杜绝在某些业务中直接继承Thread类或者实现Runnalbe接口等方式创建线程的,因为这样创建的每一个线程都会经历创建、运行直至销毁,缺乏统一管理,并且这样会导致无限制创建新线程,线程互相竞争,严重时会占用过多的系统资源或内存溢出(OOM)。在JDK1.5推出的java.util.concurrent(简称JUC)并发工具包中又一并发利器就是线程池,需要做异步或并发执行任务都可以使用线程池。使用线程池可以带来以下好处。
胖虎
2020/12/08
5080
Java并发——线程池(八)
线程池(Thread Pool)是一种基于“池化”思想管理线程的工具,经常出现在多线程服务器中。通过创建一定数量的线程,让这些线程处于就绪状态来提高系统响应速度,在线程使用完成后归还到线程池来达到重复利用的目标,从而降低系统资源的消耗。
翰墨飘香
2024/05/15
1330
ThreadPoolExecutor详解
ThreadPoolExecutor是JDK提供的线程池的基类,其中定义了线程池的核心框架,并且允许客户端通过继承的方式实现自定义的线程池。JDK提供默认的几种线程池都继承了ThreadPoolExecutor类,因此有必要对ThreadPoolExecutor进行详细的分析。
张申傲
2020/09/03
6660
ThreadPoolExecutor详解
java创建线程池代码_java手写线程池
当队列和线程池都满了,说明线程池处于饱和的状态,那么必须采取一种策略处理提交的新任务。这个策略默认是AbortPolicy,表示无法处理新任务时抛出异常
全栈程序员站长
2022/11/14
7940
一文带你彻底弄懂线程池
虽然 Java 对线程的创建、中断、等待、通知、销毁、同步等功能提供了很多的支持,但是从操作系统角度来说,频繁的创建线程和销毁线程,其实是需要大量的时间和资源的。
Java极客技术
2023/12/12
1.2K0
一文带你彻底弄懂线程池
相关推荐
java并发线程池---了解ThreadPoolExecutor就够了
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验