前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ThreadPoolExecutor的submit正确的使用方式

ThreadPoolExecutor的submit正确的使用方式

作者头像
查拉图斯特拉说
发布2023-10-25 15:53:00
1920
发布2023-10-25 15:53:00
举报
文章被收录于专栏:后端架构后端架构

项目场景:

线程池的地方用的还是挺多的,一般来说用的多的还是execute方法,submit方法还是用的挺少的,一般ThreadPoolExecutorsubmit 方法通常用于将一个任务提交到线程池中执行。这个方法会返回一个 Future 对象,可以用来检查任务的执行状态,获取任务的返回值或者取消任务的执行。

使用 submit 方法可以将任务提交到线程池中,由线程池中的线程来执行任务,从而避免了为每个任务创建线程的开销。同时,线程池可以限制同时执行的任务数量,避免资源被过度占用。


问题描述

提示:部分代码

某台服务器上配置了一个agent服务用来做命令执行,发现队列老是堆积。消费不过来明明用了线程池也发现任务队列没有满,奇怪。 项目日志:

代码语言:javascript
复制
 Add task [com.timelinecapital.util.agent.commands.ops.CommandSHLoginExecuteRunner@7751c119] ThreadPool status: FixedThreadPool:ActiveCount:0,CompletedTaskCount:1680,TaskCount:1681,PoolSize:6,CorePoolSize:6,QueueSize:1

项目代码:

代码语言:javascript
复制
ThreadPoolExecutor service = null;
    @Test
    public void testPools() throws ExecutionException, InterruptedException, TimeoutException {
        final int corePoolSize = Runtime.getRuntime().availableProcessors();
        log.info("corePoolSize:{}", corePoolSize);
        final int maximumPoolSize = corePoolSize * 2;
        final long keepAliveTime = 20;
        final TimeUnit unit = TimeUnit.SECONDS;
        final BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>();
        final ThreadFactory threadFactory = new ThreadFactory() {
            private final AtomicInteger mThreadNum = new AtomicInteger(1);

            @Override
            public Thread newThread(final Runnable r) {
                return new Thread(r, "command-thread-" + mThreadNum.getAndIncrement());
            }
        };
        service = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
                unit, workQueue, threadFactory);
        for (int i = 0; i < maximumPoolSize; i++) {
            log.info("pool========{}", showStatus());
            final Future<?> future =  service.submit((Callable) () -> {
                log.info("thread name start:{}========", Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(3);
                    log.info("thread name end:{}========", Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return null;
            });
            Object o = future.get(10, TimeUnit.SECONDS);
        }
        log.info("complate");
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    String showStatus() {
        final ThreadPoolExecutor executor = (ThreadPoolExecutor) service;
        final StringBuilder info = new StringBuilder();
        info.append("FixedThreadPool:");
        info.append("ActiveCount:").append(executor.getActiveCount()).append(",");
        info.append("CompletedTaskCount:").append(executor.getCompletedTaskCount()).append(",");
        info.append("TaskCount:").append(executor.getTaskCount()).append(",");
        info.append("PoolSize:").append(executor.getPoolSize()).append(",");
        info.append("CorePoolSize:").append(executor.getCorePoolSize()).append(",");
        final BlockingQueue<Runnable> queue = executor.getQueue();
        if (queue != null) {
            info.append("QueueSize:").append(queue.size());
        }
        return info.toString();
    }

原因分析:

提示:跑了一次看到日志按照单线程的方式执行,瞬间顿悟。

代码语言:javascript
复制
2023-08-04 11:10:25 INFO  UtilsTest                       :105 - corePoolSize:12
2023-08-04 11:10:25 INFO  UtilsTest                       :121 - pool========FixedThreadPool:ActiveCount:0,CompletedTaskCount:0,TaskCount:0,PoolSize:0,CorePoolSize:12,QueueSize:0
2023-08-04 11:10:25 INFO  UtilsTest                       :123 - thread name start:command-thread-1========
2023-08-04 11:10:28 INFO  UtilsTest                       :126 - thread name end:command-thread-1========
2023-08-04 11:10:28 INFO  UtilsTest                       :121 - pool========FixedThreadPool:ActiveCount:0,CompletedTaskCount:1,TaskCount:1,PoolSize:1,CorePoolSize:12,QueueSize:0
2023-08-04 11:10:28 INFO  UtilsTest                       :123 - thread name start:command-thread-2========
2023-08-04 11:10:31 INFO  UtilsTest                       :126 - thread name end:command-thread-2========
2023-08-04 11:10:31 INFO  UtilsTest                       :121 - pool========FixedThreadPool:ActiveCount:0,CompletedTaskCount:2,TaskCount:2,PoolSize:2,CorePoolSize:12,QueueSize:0
2023-08-04 11:10:31 INFO  UtilsTest                       :123 - thread name start:command-thread-3========
2023-08-04 11:10:34 INFO  UtilsTest                       :126 - thread name end:command-thread-3========
2023-08-04 11:10:34 INFO  UtilsTest                       :121 - pool========FixedThreadPool:ActiveCount:0,CompletedTaskCount:3,TaskCount:3,PoolSize:3,CorePoolSize:12,QueueSize:0
2023-08-04 11:10:34 INFO  UtilsTest                       :123 - thread name start:command-thread-4========

原来submit的方式用错了,不应该直接这么get的,这样就跟没有开线程池一样,因为future.get(10, TimeUnit.SECONDS)会阻塞线程继续执行,线程池的最大使用效率没有返回出来,只用到一个单线程在执行,结果等于没有用。 从查看submit的源码来看,其实也是调用了java.util.concurrent.Executor#execute方法,只是换了线程实现而已,又让我想起那句话,之前不懂代码的时候看代码是代码,后面懂代码了,看代码就是看方法,现在深入代码底层看代码还是代码,惯性是个恐怖的事情。

代码语言:javascript
复制
    /**
     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
类图
类图

解决方案:

提示:取消立马获 future.get(10, TimeUnit.SECONDS)方式

最后只能修改业务逻辑,因为对执行结果不是特别需求,所有可以改成execute方式,当然如果逻辑对返回值的需求特别的可以解耦,使用生产者消费者模式,一边计算一边处理,实现逻辑可以这样,在submit返回的Future对象存储在一个集合里面,在另一边可以批次处理也可以单次处理,批次处理就判断所有的submit执行完之后处理,单次处理就使用队列集合,一次取一个值理论情况下不会阻塞太久。

总结

习惯了用execute就忘记了submit的正确使用方式,惯性是很恐怖的,还是得多多跑跑单元测试。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-08-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 项目场景:
  • 问题描述
  • 原因分析:
  • 解决方案:
  • 总结
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档