提供Executor的工厂类
忽略了自定义的ThreadFactory、callable和unconfigurable相关的方法
newFixedxxx:在任意时刻,最多有nThreads个线程在处理task;如果所有线程都在运行时来了新的任务,它会被扔入队列;如果有线程在执行期间因某种原因终止了运行,如果需要执行后续任务,新的线程将取代它
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
newCachedxxx:新任务到来如果线程池中有空闲的线程就复用,否则新建一个线程。如果一个线程超过60秒没有使用,它就会被关闭移除线程池
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
newSingleThreadExecutor:仅使用一个线程来处理任务,如果这线程挂了,会产生一个新的线程来代替它。每一个任务被保证按照顺序执行,而且一次只执行一个
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
使用newFixedxxx方法也能实现类似的作用,但是ThreadPoolExecutor会提供修改线程数的方法,FinalizableDelegatedExecutorService则没有修改的途径,它在DelegatedExecutorService的基础 上仅提供了执行finalize时候去关闭线程,而DelegatedExecutorService仅暴漏ExecutorService自身的方法
newScheduledThreadPool:提供一个线程池来延迟或者定期执行任务
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
newSingleThreadScheduledExecutor:提供单个线程来延迟或者定期执行任务,如果执行的线程挂了,会生成新的。
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
同样,它保证返回的Executor自身的线程数不可修改
从上述的实现可以看出,核心在于三个部分
对应的,相应也有不同的队列去实现不同的场景
它仅仅是包装了ExecutorService的方法,交由传入的ExecutorService来执行,所谓的UnConfigurable实际也就是它没有暴漏配置各种参数调整的方法
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}
提供一系列的schedule方法,使得任务可以延迟或者周期性的执行,对应schedule方法会返回ScheduledFuture以供确认是否执行以及是否要取消。它的实现ScheduledThreadPoolExecutor也支持立即执行由submit提交的任务
仅支持相对延迟时间,比如距离现在5分钟后执行。类似Timer也可以管理延迟任务和周期任务,但是存在一些缺陷:
ScheduledExecutorService的多线程机制可弥补
1:可以使用try-catch-finally对相应执行快处理;2:通过execute执行的方法可以设置UncaughtExceptionHandler来接收未捕获的异常,并作出处理;3:通过submit执行的,将被封装层ExecutionException重新抛出
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
Executors.defaultThreadFactory
ThreadPoolExecutor可自定义beforeExecutor、afterExecutor可以用来添加日志统计、计时、件事或统计信息收集功能,无论run是正常返回还是抛出异常,afterExecutor都会被执行。如果beforeExecutor抛出RuntimeException,任务和afterExecutor都不会被执行。terminated在所有任务都已经完成,并且所有工作者线程关闭后会调用,此时也可以用来执行发送通知、记录日志等等。
个处理器的系统上,线程池大小设置为
能够实现最优的利用率;
cpu的个数
为CPU的个数,
为CPU的利用率,
为等待时间与计算时间的比率,此时线程池的最优大小为
将一个网站的业务抽象成如下几块
理论上,服务端通过实现约定的接口就可以实现接收请求和处理连续不断的请求过来
ServerSocket socket = new ServerSocket(80);
while(true){
Socket conn = socket.accept();
handleRequest(conn)
}
缺点:每次只能处理一个请求,新请求到来时,必须等到正在处理的请求处理完成,才能接收新的请求
为每个请求创建新的线程提供服务
ServerSocket socket = new ServerSocket(80);
while(true){
final Socket conn = socket.accept();
Runnable task = new Runnable(){
public void run(){
handleRequest(conn);
}
}
new Thread(task).start();
}
缺点:
使用java自带的Executor框架。
private static final Executor exec = Executors.newFixedThreadPool(100);
...
ServerSocket socket = new ServerSocket(80);
while(true){
final Socket conn = socket.accept();
Runnable task = new Runnable(){
public void run(){
handleRequest(conn);
}
}
exec.execute(task);
}
...
线程池策略通过实现预估好的线程需求,限制并发任务的数量,重用现有的线程,解决每次创建线程的资源耗尽、竞争过于激烈和频繁创建的问题,也囊括了线程的优势,解耦了任务提交和任务执行。
renderText(source);
List<ImageData> imageData = new ArrayList<ImageData>();
for(ImageInfo info:scaForImageInfo(source)){
imageData.add(info.downloadImage());
}
for(ImageData data:imageData){
renderImage(data);
}
缺点:图像的下载大部分时间在等待I/O操作执行完成,这期间CPU几乎不做任何工作,使得用户看到最终页面之前要等待过长的时间
渲染过程可以分成两个部分,1是渲染文本,1是下载图像
private static final ExecutorService exec = Executors.newFixedThreadPool(100);
...
final List<ImageInfo> infos=scaForImageInfo(source);
Callable<List<ImageData>> task=new Callable<List<ImageData>>(){
public List<ImageData> call(){
List<ImageData> r = new ArrayList<ImageData>();
for(ImageInfo info:infos){
r.add(info.downloadImage());
}
return r;
}
};
Future<List<ImageData>> future = exec.submit(task);
renderText(source);
try{
List<ImageData> imageData = future.get();
for(ImageData data:imageData){
renderImage(data);
}
}catch(InterruptedException e){
Thread.currentThread().interrupt();
future.cancel(true);
}catche(ExecutionException e){
throw launderThrowable(e.getCause());
}
使用Callable来返回下载的图片结果,使用future来获得下载的图片,这样将减少用户所需要的等待时间。 缺点:图片的下载很明显时间要比文本要慢,这样的并行化很可能速度可能只提升了1%
使用CompletionService。
private static final ExecutorService exec;
...
final List<ImageInfo> infos=scaForImageInfo(source);
CompletionService<ImageData> cService = new ExecutorCompletionService<ImageData>(exec);
for(final ImageInfo info:infos){
cService.submit(new Callable<ImageData>(){
public ImageData call(){
return info.downloadImage();
}
});
}
renderText(source);
try{
for(int i=0,n=info.size();t<n;t++){
Future<ImageData> f = cService.take();
ImageData imageData=f.get();
renderImage(imageData)
}
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}catche(ExecutionException e){
throw launderThrowable(e.getCause());
}
核心思路为为每一幅图像下载都创建一个独立的任务,并在线程池中执行他们,从而将串行的下载过程转换为并行的过程
广告展示如果在一定的时间以内没有获取,可以不再展示,并取消超时的任务。
ExecutorService exe = Executors.newFixedThreadPool(3);
List<MyTask> myTasks = new ArrayList<>();
for (int i=0;i<3;i++){
myTasks.add(new MyTask(3-i));
}
try {
List<Future<String>> futures = exe.invokeAll(myTasks, 1, TimeUnit.SECONDS);
for (int i=0;i<futures.size();i++){
try {
String s = futures.get(i).get();
System.out.println("task execut "+myTasks.get(i).getSleepSeconds()+" s");
} catch (ExecutionException e) {
System.out.println("task sleep "+myTasks.get(i).getSleepSeconds()+" not execute ");
}catch (CancellationException e){
System.out.println("task sleep "+myTasks.get(i).getSleepSeconds()+" not execute ,because "+e);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
exe.shutdown();
invokeAll方法对于没有完成的任务会被取消,通过CancellationException可以捕获,invokeAll返回的序列顺序和传入的task保持一致。结果如下:
task sleep 3 not execute ,because java.util.concurrent.CancellationException
task sleep 2 not execute ,because java.util.concurrent.CancellationException
task execut 1 s