Loading [MathJax]/jax/input/TeX/config.js
前往小程序,Get更优阅读体验!
立即前往
社区首页 >专栏 >亲缘性线程池,这是什么鬼?

亲缘性线程池,这是什么鬼?

作者头像
加多
发布于 2020-07-03 02:31:57
发布于 2020-07-03 02:31:57
1.8K00
代码可运行
举报
文章被收录于专栏:Java编程技术Java编程技术
运行总次数:0
代码可运行

一、前言

JDK中的线程池主要解决两个问题:

  • 一方面当执行大量异步任务时候线程池能够提供较好的性能,在不使用线程池的时,每当需要执行异步任务时候是直接 new一线程运行,而线程的创建和销毁是需要开销的。而使用线程池时候,线程池里面的线程是可复用的,不会每次执行异步任务时候都重新创建和销毁线程。
  • 另一方面线程池提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等,每个 ThreadPoolExecutor 也保留了一些基本的统计数据,比如当前线程池完成的任务数目等。

JDK中的线程池固然好,但是其不具有亲缘性,也就是当我们顺序向其中投递多个任务后,不能保证具有相同属性的任务顺序执行,本文我们就来看一个可以实现亲缘性的线程池。

二、测试案例

首先我们在做个测试,看看JDK中线程池是否具有亲缘性,我们创建一个Person类,其中id作为唯一标识,data为需要处理的数据,如下代码,我们创建一些Person对象,放到list,然后把任务顺序投递到JDK线程池:

代码语言:javascript
代码运行次数:0
复制
    //0.普通线程池
    static ExecutorService executorService = Executors.newFixedThreadPool(8);
    public static void executeByOldPool(List<Person> personList) {
        personList.stream().forEach(p -> executorService.execute(() -> {
            System.out.println(JSON.toJSONString(p));
        }));
    }
 public static void main(String[] args) {

        //1.创建列表
        List<Person> personList = new ArrayList<>();
        personList.add(Person.builder().id(1).data("1s").build());
        personList.add(Person.builder().id(2).data("2s").build());
        personList.add(Person.builder().id(1).data("11s").build());
        personList.add(Person.builder().id(3).data("3s").build());
        personList.add(Person.builder().id(1).data("111s").build());
        personList.add(Person.builder().id(2).data("22s").build());
        personList.add(Person.builder().id(3).data("33s").build());
        personList.add(Person.builder().id(1).data("1111s").build());

        //2.使用普通线程池执行
        executeByOldPool(personList);
}

执行上面代码,如果线程池是亲缘的,则比如对应id=1的Person的输出应该是和投递到线程池时一致,也就是下面顺序:

代码语言:javascript
代码运行次数:0
复制
{"data":"1s","id":1}
{"data":"11s","id":1}
{"data":"111s","id":1}
{"data":"1111s","id":1}

但是当我们执行上面代码,一个可能的输出为:

代码语言:javascript
代码运行次数:0
复制
{"data":"3s","id":3}
{"data":"2s","id":2}
{"data":"33s","id":3}
{"data":"22s","id":2}
{"data":"1111s","id":1}
{"data":"1s","id":1}
{"data":"11s","id":1}
{"data":"111s","id":1}

可知其并没实现亲缘性,比如id=1的person的data并没有按照投递线程池顺序输出。

究其原因是因为JDK中线程池是不保证先投递到线程池的任务先执行完毕。

三、亲缘性线程池实现

如果想实现亲缘线程池,则这里有大佬w.vela的一个开源实现 https://github.com/PhantomThief/simple-pool

首先我们需要引入其依赖:

代码语言:javascript
代码运行次数:0
复制
        <dependency>
            <groupId>com.github.phantomthief</groupId>
            <artifactId>simple-pool</artifactId>
            <version>0.1.17</version>
        </dependency>

然后上面executeByOldPool方法修改为下面:

代码语言:javascript
代码运行次数:0
复制
    static KeyAffinityExecutor executor = KeyAffinityExecutor.newSerializingExecutor(8,200, "MY-POOL");
    public static void executeByAffinitydPool(List<Person> personList) {
        personList.stream().forEach(p -> executor.executeEx(p.getId(), () -> {
            System.out.println(JSON.toJSONString(p));
        }));
    }

如上代码投递任务到线程池时,我们使用person的id作为key,这可以保证相同的id顺序投递到线程池的任务可以顺序执行,修改后,运行,一个可能的输出为:

代码语言:javascript
代码运行次数:0
复制
{"data":"3s","id":3}
{"data":"1s","id":1}
{"data":"2s","id":2}
{"data":"33s","id":3}
{"data":"11s","id":1}
{"data":"22s","id":2}
{"data":"111s","id":1}
{"data":"1111s","id":1}

如上输出可知对应相同id的Person,其输出与投递到线程池顺序一致。

那么亲缘性线程池如何实现保证顺序内,大家可以看下其代码,其实很简单,就是把相同key的任务按照投递线程池的顺序,放到同一个内存队列(这里我们设置为200大小),每个内存队列有一个线程来消费。那么消费线程有几个那?其实是按照创建线程池时newSerializingExecutor的第一个参数来决定。

四、总结

亲缘性线程池在需要保证顺序消费,并且需要高吞吐量的情况下很用用,必须普通情况下顺序消费的保证是靠单线程来做的(比如rocketmq的顺序消息,消费端消费时)。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
异步编程 - 07 基于JDK中的Future实现异步编程(下)_当Stream遇见CompletableFuture
JDK8中提供了流式对数据进行处理的功能,它的出现允许我们以声明式方式对数据集合进行处理。所谓声明式是相对于我们平时所用的命令式编程来说的,使用声明式编程会让我们对业务的表达更清晰。另外使用流可以让我们很方便地对数据集进行并行处理。
小小工匠
2023/09/09
3480
java进阶|比较器Comparable和Comparator
一,可能对于java的coder来说,这个点很简单,但对于我来说又是很难,想写这篇文章也是很久了,今天就以自己的理解来看下这两个接口,首先我们先看下Comparable接口的结构图。
码农王同学
2020/07/06
7110
JUC内置线程池
整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况。
兜兜转转
2023/03/08
1670
JUC内置线程池
看到一个魔改线程池,面试素材加一!
放心,我标题党来着,我觉得面试不会有人考这个玩意,但是工作中是有可能真的会遇到响应的场景。
why技术
2021/12/04
5820
看到一个魔改线程池,面试素材加一!
任务调度线程池
在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但 由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个 任务的延迟或异常都将会影响到之后的任务。
一个风轻云淡
2024/01/02
2350
异步编程 - 05 基于JDK中的Future实现异步编程(中)_CompletableFuture
Java8 - 使用工厂方法 supplyAsync创建 CompletableFuture
小小工匠
2023/09/09
2730
血的教训,如何正确使用线程池 submit 和 execute 方法
血的教训之背景:使用线程池对存量数据进行迁移,但是总有一批数据迁移失败,无异常日志打印
好好学java
2019/07/10
3.4K0
异步编程 - 02 显式使用线程和线程池实现异步编程
我们主要探讨如何显式地使用线程和线程池实现异步编程,这包含如何显式使用线程实现异步编程以及使用线程编程的缺点,如何显式使用线程池实现异步编程以及线程池实现原理。
小小工匠
2023/09/07
2550
异步编程 - 02 显式使用线程和线程池实现异步编程
Java中调度线程池ScheduledThreadPoolExecutor原理探究
前面讲解过Java中线程池ThreadPoolExecutor原理探究,ThreadPoolExecutor是Executors中一部分功能,下面来介绍另外一部分功能也就是ScheduledThreadPoolExecutor的实现,后者是一个可以在一定延迟时候或者定时进行任务调度的线程池。
加多
2018/09/06
7360
Java中调度线程池ScheduledThreadPoolExecutor原理探究
线程池总结 原
New Thread的弊端如下:        a、每次New Thread新建对象性能差。        b、线程缺乏统一的管理,可能无限制的新建线程,相互之间竞争,极可能占用过多的系统资源导致死机 或者 OOM。        c、缺乏更多功能,如定时执行、定期执行、线程中断。
wuweixiang
2018/08/14
4210
线程池总结
                                                                            原
并发容器和线程池
当你的线程需要执行一个后继任务,即完成每个前置任务后,会自动执行下一个任务。这时我们使用CompletableFuture 来实现。
摸鱼的G
2023/02/22
3590
线程池的7种创建方式,强烈推荐你用它...
根据摩尔定律所说:集成电路上可容纳的晶体管数量每 18 个月翻一番,因此 CPU 上的晶体管数量会越来越多。
磊哥
2020/12/21
5670
线程池的7种创建方式,强烈推荐你用它...
线程池创建方式
创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程。
ruochen
2021/11/24
7080
java线程池详解
在java中,执行任务的最小单位是线程。我们知道,线程是一种稀缺的资源,它的创建于销毁是一个非常耗费资源的操作,而Java线程依赖于内核线程,其线程的创建需要进行操作系统状态的切换,为了避免多度消耗资源需要设法重用线程去执行多个任务。而线程池具备缓存和管理线程的功能,可以很好的对线程进行统一分配、监控和调优。
从大数据到人工智能
2022/09/16
6620
java线程池详解
OKHttp源码解析(三)--中阶之线程池和消息队列
android的异步任务一般都是用Thread+Handler或者AsyncTask来实现,其中笔者当初经历过各种各样坑,特别是内存泄漏,当初笔者可是相当的欲死欲仙啊!所以现在很少有开发者还在用这一套来做异步任务,现在一般都是Rxjava为主,当然还有自己自定义的异步任务框架(比如笔者),像RxJava都帮我们写好了对应场景的线程池,这是为什么?
隔壁老李头
2018/08/30
2.5K0
OKHttp源码解析(三)--中阶之线程池和消息队列
一文带你彻底弄懂线程池
虽然 Java 对线程的创建、中断、等待、通知、销毁、同步等功能提供了很多的支持,但是从操作系统角度来说,频繁的创建线程和销毁线程,其实是需要大量的时间和资源的。
Java极客技术
2023/12/12
1.2K0
一文带你彻底弄懂线程池
并发编程之线程池
一、关于ThreadPoolExecutor 为了更好地控制多线程,JDK提供了一套Executor框架,帮助开发人员有效的进行线程控制,其本质就是一个线程池。其中ThreadPoolExecutor是线程池中最核心的一个类,后面提到的四种线程池都是基于ThreadPoolExecutor实现的。 ThreadPoolExecutor提供了四个构造方法,我们看下最重要的一个构造函数: public class ThreadPoolExecutor extends AbstractExecutorServic
lyb-geek
2018/03/27
9500
并发编程之线程池
Java线程池监控小结
最近我们组杨青同学遇到一个使用线程池不当的问题:异步处理的线程池线程将主线程hang住了,分析代码发现是线程池的拒绝策略设置得不合理,设置为CallerRunsPolicy。当异步线程的执行效率降低时,阻塞队列满了,触发了拒绝策略,进而导致主线程hang死。
阿杜
2018/08/06
1.8K0
Java线程池监控小结
Spring-webflux 响应式编程
Spring 提供了两个并行堆栈。一种是基于带有 Spring MVC 和 Spring Data 结构的 Servlet API。另一个是完全反应式堆栈,它利用了 Spring WebFlux 和 Spring Data 的反应式存储库。在这两种情况下,Spring Security 都提供了对两种堆栈的支持。
鱼找水需要时间
2023/02/16
1.5K0
Spring-webflux 响应式编程
JUC学习之共享模型之工具上之线程池浅学
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
大忽悠爱学习
2022/01/10
4310
JUC学习之共享模型之工具上之线程池浅学
相关推荐
异步编程 - 07 基于JDK中的Future实现异步编程(下)_当Stream遇见CompletableFuture
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文