前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >这篇文章是我在评论区学到的。

这篇文章是我在评论区学到的。

作者头像
why技术
发布2024-07-29 09:29:14
1120
发布2024-07-29 09:29:14
举报
文章被收录于专栏:why技术

你好呀,我是歪歪。

上周发了这篇文章《线程池遇到父子任务,有大坑,要注意!》

里面描述了一个线程池遇到父子任务的情况。

总结来说就是如果线程池的任务之间存在父子关系,那么请不要使用同一个线程池。如果使用了同一个线程池,可能会因为子任务进了队列,导致父任务一直等待,出现假死现象。

然后评论区有这样的一个评论:

comparable future 加 join,啥意思呢?

歪师傅这种经验老道的程序员一眼就 get 到了。

之前版本的代码里面,我为了让所有子任务完成后,父任务才继续执行,用 CountDownLatch 来做了一个栅栏:

其实这里也可以用“comparable future 加 join”。

我再给你一版代码:

代码语言:javascript
复制
public class Main {

    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(5, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(100));
        StopWatch watch = new StopWatch("MainTest");
        CompletableFuture[] mainFutureArr = new CompletableFuture[10];
        watch.start();
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            CompletableFuture<Void> mainFuture = CompletableFuture.runAsync(() -> {
                System.out.println("当前线程" + Thread.currentThread().getName() + ",---【任务" + finalI + "】开始执行---");
                //模拟获取数据
                List<String> arrayList = getDataFromDB();
                CompletableFuture[] subFutureArr = new CompletableFuture[arrayList.size()];
                for (int j = 0; j < arrayList.size(); j++) {
                    subFutureArr[j] = getFuture(finalI, arrayList.get(j), executorService);
                }
                //等待所有子任务完成
                CompletableFuture.allOf(subFutureArr).join();
                System.out.println("当前线程" + Thread.currentThread().getName() + ",---【任务" + finalI + "】执行完成---");
            }, executorService);
            mainFutureArr[i] = mainFuture;
        }
        //等待所有父任务完成
        CompletableFuture.allOf(mainFutureArr).join();
        watch.stop();
        System.out.println(watch.prettyPrint());
    }

    private static CompletableFuture<Void> getFuture(int finalI, String str, ExecutorService executorService) {
        return CompletableFuture.runAsync(() -> {
            System.out.println("当前线程" + Thread.currentThread().getName() + ",【任务" + finalI + "】开始处理数据=" + str);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, executorService);
    }

    private static List<String> getDataFromDB() {
        ArrayList<String> arrayList = new ArrayList<>();
        arrayList.add("1");
        arrayList.add("2");
        return arrayList;
    }
}

和之前一版代码实现的逻辑是一模一样的,只是写法不一样。

提交任务的方式变化了:

原提交任务方式:java.util.concurrent.ExecutorService#submit(java.lang.Runnable) 新提交任务方式:java.util.concurrent.CompletableFuture#runAsync(java.lang.Runnable, java.util.concurrent.Executor)

然后是等待线程的方式变化了:

原等待线程方式:java.util.concurrent.CountDownLatch#await() 新等待线程方式:java.util.concurrent.CompletableFuture#join()

其实你对比新老逻辑会发现,之前线程池是线程池,栅栏是栅栏,是我们写代码的时候把这两个组合在了一起,实现了我们的功能。

而老逻辑的两个方法都是在 CompletableFuture 里面,整体看起来确实更加直观。

虽然任何一种写法都能很好的完成需求,但是多学一种,总归是有好处的。

asyncTool

除了上面这种写法之外,还有人看了文章后悄悄给我说了一个框架:asyncTool。

我还是很听话的,于是在网上搜了一下,发现这确实是一个封装的很好的线程编排框架:

https://gitee.com/jd-platform-opensource/asyncTool

在项目的介绍里面,图示了这样的几个场景。

多个执行单元的串行请求:

多个执行单元的并行请求:

阻塞等待,串行的后面跟多个并行:

串并行相互依赖:

复杂场景:

还有很多其他的介绍,比如作者分析了好几种并发场景下可能存在的需求,你可以打开前面的链接,自己看看:

我这里只是把项目特点部分单独截个图,因为从这个介绍上来看,确实是厉害的:

解决任意的多线程并行、串行、阻塞、依赖、回调的并发框架,可以任意组合各线程的执行顺序,带全链路回调和超时控制。

这个项目拉到本地后,可以直奔 test 模块,里面有各种各样的测试案例,是摸清楚这个框架原理的一个好抓手:

比如这个测试用例,我看了一下,这两个测试用例就很适合我的需求:

parallel.TestPar#testMulti3 parallel.TestPar#testMulti3Reverse

那么问题就来了?

为什么会有两个测试用例呢,注释还一模一样?

这两个测试用例在写法上有点差异:

testMulti3 的关键方法是 next,而 testMulti3Reverse 的关键方法是 depend。

拿这个具体的需求来说:

0 执行完,同时 1 和 2 , 1\2 都完成后 3

用 next 是需要先构建任务 3,然后构建 1 和 2,并把它们的 next 设置为 3。最后才是构建 0,并关联上 1 和 2。

用 depend 是先构建 0,然后构建 1,2,并把它们的 depend 设置为 0。最后才是构建 3,并 depend 设置为 1,2。

有人喜欢倒着写,有人喜欢顺着写。没关系,两种写法作者都给你提供了。

这个小细节让我觉得作者是真的很用心了。

我个人喜欢顺着写,所以我就直接跑 testMulti3Reverse 这个案例了。

在跑之前,我们可以看到每个任务的执行时间都是 1s:

所以,按照我们的预期应该是项目启动后 0 先完成,然后 1,2 同时执行,1s 后 3 开始执行。

跑一把看看输出是不是这样的:

从输出结果来看,确实是先执行了 0 ,然后 1,2 同时执行,最后执行的 3,符合我们的需求。

那么底层是如何实现的呢?

有源码,也有 Demo,你直接上手盘它啊。

这个框架的入口就是这个方法:

com.jd.platform.async.executor.Async#beginWork(long, java.util.concurrent.ExecutorService, java.util.List<com.jd.platform.async.wrapper.WorkerWrapper>)

你去找到这个方法的时候,还会贴心的看到作者写的:出发点。

通过 Debug 我们可以看到,workerWrappers 这个入参就是我们的 0 号任务,并且里面也有了后续任务集合:

继续往下 Debug,你会来到这个地方:

com.jd.platform.async.wrapper.WorkerWrapper#beginNext

在这个方法里面,有个 for 循环把 1 和 2 任务扔到线程池,然后等待 1 和 2 执行完成。

你看这里的逻辑,搞个 CompletableFuture[] 数组,然后有个 for 循环,循环里面 CompletableFuture.runAsync() 方法,最后把数组放到 allOf 里面 CompletableFuture.allOf(futures) 再来个 get() 方法阻塞等待。

眼熟不眼熟?

和我们前面 Demo 里面的这段代码,不能说相差无几,只能说一模一样:

这并不是巧合,是因为不管这个 asyncTool 玩儿的多花里胡哨,最底层还是基于 CompletableFuture 来做的,所以在同样的需求下,代码是类似的。

我这里主要只是给你分享一下,让你知道有这样的一个异步线程编排框架的存在。

如果你感兴趣,可以拉一下源码,作者在写代码的时候,就留下了大量的注释,学习成本并不高:

另外,作者在 QuickStart 里面也提到了。

如果需要深入了解这个框架是如何一步一步实现的,从接到需求,到每一步的思考,每个类为什么这么设计,为什么有这些方法,也就是如何从 0 到 1 开发出这个框架,可以看看作者的这四篇文章:

https://blog.csdn.net/tianyaleixiaowu/category_9637010.html

学明白了,这就是你的了。

当你遇到一个复杂的涉及到异步任务编排的需求的时候,你就可以把这个掏出来看看,应该是能比较优雅的解决你遇到的问题。

思考

回到我们自己的 Demo 中,当我用 CompletableFuture 改造完成之后,我还发现了一个小细节。

如果你还记得前一篇文章,那你应该知道是因为父子线程使用了同一个线程池导致的。

在我使用 CompletableFuture 的写法时,如果我不指定线程池,也就是这样:

会发什么呢?

程序会正常执行完成:

那么问题就来了:为什么不指定线程池的时候,反而没有问题呢?

其实你从日志输出中也能发现端倪:

当前线程ForkJoinPool.commonPool-worker-6

这说明什么?

是不是说明 runAsync 方法内置了一个默认的线程池?

而这个默认线程池没有出现问题,那我们是不是完全有理由猜测,它这个线程池的核心线程数很大,所以没有任何一个子任务进队列。

那到底是多大呢?

我也不知道,但是我知道大力出奇迹。

所以:

我把循环扩大到了 10000 次,并且在任务里面 sleep 1s,方便任务快速占满核心线程。

在程序运行之前,你觉得线程编号会到多少去?

我寻思着怎么也得千儿八百的吧。

嘿,你猜怎么着?

有意思的事情就来了:

线程池编号从 1 开始只到了 11。

后面都是在复用这 11 个线程,且程序在运行了 917s 之后正常结束了,没有出现阻塞的情况:

这个结果确实有点让人摸不着头脑。

但是我们即有 Demo 也有源码啊。

盘它源码,扒它底裤不就完了吗。

这个 runAsync 方法一进来,我们就直接看到了答案:

java.util.concurrent.CompletableFuture#runAsync(java.lang.Runnable)

可以看到当 USE_COMMON_POOL 为 false 时会使用 new ThreadPerTaskExecutor() 方法来搞个线程池。为 true 的时候,会使用 ForkJoinPool.commonPool() 来搞个线程池。

而 ThreadPerTaskExecutor 方法非常的简单粗暴:

就是每来一个任务就起一个线程,对应我们的 Demo 那就是必然不会出现父子等待的情况。

如果在歪师傅的机器上, USE_COMMON_POOL 为 false 的话,那就直接破案了啊。

然而,很不幸,它是 true。

所以我们看看为 true 的时候,ForkJoinPool.commonPool() 是怎么个事儿。

commonPool 方法进来之后直接返回了一个 common 对象,这个对象是 ForkJoinPool 类:

重启项目之后,可以观察到这个对象是在 ForkJoinPool 类初始化的时候就生成了:

其中 parallelism 的值为 11,从命名上猜也能猜出来了,这个值和 ForkJoinPool 里工作线程数量有关。

但是 11 这个值到底是怎么来的呢?

通过代码可以看到,这个值可以通过配置进行指定,如果不指定,则获取默认值。

默认值,为 CPU 核心数减一。

我的机器是 12 核的,所以 12-1=11,它就是这样来的:

而这里的这个 11 和我们前面分析日志“线程池编号从 1 开始只到了 11”也呼应上了。

所以,同样的代码,在你的电脑上跑,可能就不是 11 了,这个小细节需要注意一下。

现在我们拿到一个 ForkJoinPool 类了,只需要搞懂它的工作原理就行了。

剩下的部分,就当是一个思考题吧。

关于这个 ForkJoinPool 部分我其实是写了一点的,我又都删了,一个原因是这部分写起来确实感觉有点难度,你去看源码就知道它的源码可读性真不高。另外一个主要的原因是因为写的过程中我翻到了一篇文章:《一次线程池引发的线上故障分析》

我们要找的答案就在这篇文章里面:

我是在查阅资料的时候看到这篇文章的,看完之后,怎么说呢?

就是说:绝了,我们遇到的问题简直就是如出一辙。

而这是一篇 2020 年的文章,四年过去了,问题还是那个问题,但是遇到这个问题的程序员都换了一批又一批了。

也许再过四年,又有一个程序员遇到了这个问题,然后在记录问题的时候,搜索到了我的这篇文章。

那我就提前给这个小伙子或者小姑娘打个招呼吧:

朋友,你好呀,我是歪歪。

·············· END ··············

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-07-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 why技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • asyncTool
  • 思考
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档