很多人都知道或者用过线程池,线程池构造方法的参数中有一个参数为拒绝策略。
那么,通过拒绝策略我们可以学到哪些思想?
下面简单讲讲自己的理解。
不知道你有没有思考过,为什么会有那么多编程语言?为什么会有那么多算法?为什么会有那么多设计模式?为什么会有那么多存储方式?为什么会有那么多线程池拒绝策略?
如果你稍微思考,就会发现不同的编程语言、算法、设计模式、存储方式、线程池拒绝策略等适用不同的场景。
(图片来源:美团技术)
启发:没有最好的选择,只有最适合的选择 我们要做的是在某个特定场景下选择最适合的技术,而不是最新、最热的技术。
如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务。
如果由你来设计线程池经典的处理策略你会提供怎样的策略?
如果我们从来没学过这一块,可能就想不出这么多经典的处理策略。
JDK 源码提供的拒绝策略应该是经过深思熟虑,能够覆盖到常见业务场景。
启发:当我们面临大数据量处理时,也可以参考这些策略根据其适用的场景去灵活处理。
不知道你是否思考过,为什么默认的策略是 AbortPolicy?
如果默认是 DiscardPolicy 或者 DiscardOldestPolicy,真正触发拒绝策略时,使用者不容易感知。 使用者对源码理解不深入很容易出现不符合预期的运行效果。
显然超出“承载能力” 丢弃任务并抛出异常是一个更适合的选择。不提供服务,没啥好说的,已经超过能力了,没法提供服务。抛出异常为了让上游感知到并做出相应处理。
另外我们可以看下 Redis 缓存淘汰策略:
1.noeviction(默认策略):对于写请求不再提供服务,直接返回错误(DEL请求和部分特殊请求除外) 2.allkeys-lru:从所有key中使用LRU算法进行淘汰 3.volatile-lru:从设置了过期时间的key中使用LRU算法进行淘汰 4.allkeys-random:从所有key中随机淘汰数据 5.volatile-random:从设置了过期时间的key中随机淘汰 6.volatile-ttl:在设置了过期时间的key中,淘汰过期时间剩余最短的
我们会发现两者也有“惊人”的相似性,都是不提供服务,返回错误。
启发1:当我们遇到超过承载量的场景时,需要提供不同的选择,可以借鉴上述思想,并且默认需要考虑报错,保护自身的同事,让上游感知到。
启发2:可以通过对比经典技术对相似问题的处理来得到比较靠谱的方案 or 思考方向。
作为一个严谨的设计者,就应该预料到自己提供的策略不可能涵盖所有场景。 因此需要支持用户自定义才行。
比如 Dubbo 中提供了 AbortPolicyWithReport
拒绝策略,在触发拒绝策略时可以报告一些信息。
public class AbortPolicyWithReport extends AbortPolicy {
protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
private final String threadName;
private final URL url;
private static volatile long lastPrintTime = 0L;
private static Semaphore guard = new Semaphore(1);
public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED! Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d), Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", this.threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), this.url.getProtocol(), this.url.getIp(), this.url.getPort());
logger.warn(msg);
this.dumpJStack();
throw new RejectedExecutionException(msg);
}
private void dumpJStack() {
long now = System.currentTimeMillis();
if (now - lastPrintTime >= 600000L) {
if (guard.tryAcquire()) {
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
String dumpPath = AbortPolicyWithReport.this.url.getParameter("dump.directory", System.getProperty("user.home"));
String OS = System.getProperty("os.name").toLowerCase();
SimpleDateFormat sdf;
if (OS.contains("win")) {
sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
} else {
sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
}
String dateStr = sdf.format(new Date());
FileOutputStream jstackStream = null;
try {
jstackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log." + dateStr));
JVMUtil.jstack(jstackStream);
} catch (Throwable var15) {
AbortPolicyWithReport.logger.error("dump jstack error", var15);
} finally {
AbortPolicyWithReport.guard.release();
if (jstackStream != null) {
try {
jstackStream.flush();
jstackStream.close();
} catch (IOException var14) {
}
}
}
AbortPolicyWithReport.lastPrintTime = System.currentTimeMillis();
}
});
}
}
}
}
PS: 这也是我们学习和理解 volatile、Semaphore 的一个机会。
再比如 Netty 中就提供了通过新建线程执行的策略:NewThreadRunsPolicy
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}
}
这些开源项目的拒绝策略都为我们开了不少脑洞。
我们也可以根据具体业务,扔到消息队列里再消费等方式处理。
启发1:面向未来编程。 当我们设计一些通用工具时,也要留一些拓展性给别人。
启发1: 带着问题学技术或者把技术放在特定场景里学习,更有兴趣,也更容易掌握。
本文简单谈下自己从线程池拒绝策略中学到的一点知识,希望能够对大家有启发。
希望大家在读源码时能多一些思考,多思考为什么,而不是记忆结论。
多问几个问题,如:
如果你觉得本文对你有帮助,欢迎点赞、收藏加关注三连!!