支持生产阻塞的线程池

我们使用线程池的时候,经常使用默认的丢弃策略,那么我们也可以自定义策略,那么下面的文章可以看下。

在各种并发编程模型中,生产者-消费者模式大概是最常用的了。在实际工作中,对于生产消费的速度,通常需要做一下权衡。通常来说,生产任务的速度要大于消费的速度。一个细节问题是,队列长度,以及如何匹配生产和消费的速度。 一个典型的生产者-消费者模型如下:

Paste_Image.png

在并发环境下利用J.U.C提供的Queue实现可以很方便地保证生产和消费过程中的线程安全。这里需要注意的是,Queue必须设置初始容量,防止生产者生产过快导致队列长度暴涨,最终触发OutOfMemory。

对于一般的生产快于消费的情况。当队列已满时,我们并不希望有任何任务被忽略或得不到执行,此时生产者可以等待片刻再提交任务,更好的做法是,把生产者阻塞在提交任务的方法上,待队列未满时继续提交任务,这样就没有浪费的空转时间了。阻塞这一点也很容易,BlockingQueue就是为此打造的,ArrayBlockingQueue和LinkedBlockingQueue在构造时都可以提供容量做限制,其中LinkedBlockingQueue是在实际操作队列时在每次拿到锁以后判断容量。

更进一步,当队列为空时,消费者拿不到任务,可以等一会儿再拿,更好的做法是,用BlockingQueue的take方法,阻塞等待,当有任务时便可以立即获得执行,建议调用take的带超时参数的重载方法,超时后线程退出。这样当生产者事实上已经停止生产时,不至于让消费者无限等待。

于是一个高效的支持阻塞的生产消费模型就实现了。 等一下,既然J.U.C已经帮我们实现了线程池,为什么还要采用这一套东西?直接用ExecutorService不是更方便?

我们来看一下ThreadPoolExecutor的基本结构:

Paste_Image.png

可以看到,在ThreadPoolExecutor中,BlockingQueue和Consumer部分已经帮我们实现好了,并且直接采用线程池的实现还有很多优势,例如线程数的动态调整等。

但问题在于,即便你在构造ThreadPoolExecutor时手动指定了一个BlockingQueue作为队列实现,事实上当队列满时,execute方法并不会阻塞,原因在于ThreadPoolExecutor调用的是BlockingQueue非阻塞的offer方法:

Paste_Image.png

这时候就需要做一些事情来达成一个结果:当生产者提交任务,而队列已满时,能够让生产者阻塞住,等待任务被消费。

关键在于,在并发环境下,队列满不能由生产者去判断,不能调用ThreadPoolExecutor.getQueue().size()来判断队列是否满。 线程池的实现中,当队列满时会调用构造时传入的RejectedExecutionHandler去拒绝任务的处理。默认的实现是AbortPolicy,直接抛出一个RejectedExecutionException。

几种拒绝策略在这里就不赘述了,这里和我们的需求比较接近的是CallerRunsPolicy,这种策略会在队列满时,让提交任务的线程去执行任务,相当于让生产者临时去干了消费者干的活儿,这样生产者虽然没有被阻塞,但提交任务也会被暂停。

Paste_Image.png

但这种策略也有隐患,当生产者较少时,生产者消费任务的时间里,消费者可能已经把任务都消费完了,队列处于空状态,当生产者执行完任务后才能再继续生产任务,这个过程中可能导致消费者线程的饥饿。

参考类似的思路,最简单的做法,我们可以直接定义一个RejectedExecutionHandler,当队列满时改为调用BlockingQueue.put来实现生产者的阻塞:

Paste_Image.png

这样,我们就无需再关心Queue和Consumer的逻辑,只要把精力集中在生产者和消费者线程的实现逻辑上,只管往线程池提交任务就行了。

相比最初的设计,这种方式的代码量能减少不少,而且能避免并发环境的很多问题。当然,你也可以采用另外的手段,例如在提交时采用信号量做入口限制等,但是如果仅仅是要让生产者阻塞,那就显得复杂了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏java 成神之路

ps 命令详解

47913
来自专栏用户2442861的专栏

系统的环境变量path的作用是什么

 http://blog.csdn.net/libo2006/article/details/1531545

1.8K2
来自专栏Rovo89

Go基本安装

6544
来自专栏C/C++基础

Linux命令(14)——df命令

用于查看Linux文件系统的磁盘空间占用情况。可以利用该命令来获取硬盘被占用了多少空间,以及剩余空间等信息。

1698
来自专栏FreeBuf

Cookie篡改与命令注入

cookie 篡改 (cookie poisoning) 是一项主要以获取模拟和隐私权泄密著称的技术,通过维护客户(或终端用户)身份的会话信息操纵来实现的。通过...

1993
来自专栏北京马哥教育

HTTP cookies 详解

HTTP cookies,通常称之为“cookie”,已经存在很长时间了,但是仍然没有被充分理解。首要问题是存在许多误解,认为 cookie 是后门程序或病毒,...

2884
来自专栏会跳舞的机器人

java并发编程的艺术第九章——java中的线程池

execute()方法适用于任务提交之后没有返回值的这种情况,因为没有返回值,所以提交任务之后我们也无法判断任务是否执行成功。

752
来自专栏java一日一条

Java 线程池(ThreadPoolExecutor)原理分析与使用

在我们的开发中“池”的概念并不罕见,有数据库连接池、线程池、对象池、常量池等等。下面我们主要针对线程池来一步一步揭开线程池的面纱。

952
来自专栏Linux驱动

arm裸板驱动总结(makefile+lds链接脚本+裸板调试)

在裸板2440中,当我们使用nand启动时,2440会自动将前4k字节复制到内部sram中,如下图所示: ? 然而此时的SDRAM、nandflash的控制时序...

3189
来自专栏运维一切

zookeeper的额外端口 原

zookeeper有三个端口2181 2888 3888 但是在启动服务之后,他妈的竟然还有启动一个随机的端口,还挂在0.0.0.0上,转了一圈发现,这玩意竟然...

1022

扫码关注云+社区

领取腾讯云代金券