一般开发者是利用 Executors 提供的统一线程创建方法,取创建不同配置的线程池,主要区别在于不同的 ExecutorService类型或者不同的初始参数。
Executors 提供了 5 种不同的线程池创建方式:
线程池工作原理
private fnal BlockingQueue<Runnable> workQueue;
private fnal HashSet<Worker> workers = new HashSet<>();
线程池构造函数如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
private fnal AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 真正决定了工作线程数的理论上限
private satic fnal int COUNT_BITS = Integer.SIZE - 3;
private satic fnal int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 线程池状态,存储在数字的高位
private satic fnal int RUNNING = -1 << COUNT_BITS;
…
// Packing and unpacking ctl
private satic int runStateOf(int c) { return c & ~COUNT_MASK; }
private satic int workerCountOf(int c) { return c & COUNT_MASK; }
private satic int ctlOf(int rs, int wc) { return rs | wc; }
线程池的生命周期
public void execute(Runnable command) {
…
int c = ctl.get();
// 检查工作线程数目,低于corePoolSize则添加Worker
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// isRunning就是检查线程池是否被shutdown
// 工作队列可能是有界的, ofer是比较友好的入队方式
if (isRunning(c) && workQueue.ofer(command)) {
int recheck = ctl.get();
// 再次进行防御性检查
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 尝试添加一个worker,如果失败以为着已经饱和或者被shutdown了
else if (!addWorker(command, false))
reject(command);
}
线程数 = CPU核数 × (1 + 平均等待时间/平均工作时间)
这些时间并不能精准预计,需要根据采样或者概要分析等方式进行计算,然后在实际中验证和调整。