Thread
类new Thread().start()
的方式启动线程public static void main(String[] args) {
System.out.println("线程" + Thread.currentThread().getName() + "开始执行");
new Thread(new Thread01(), "thread01").start();
System.out.println("线程" + Thread.currentThread().getName() + "执行完毕");
}
static class Thread01 extends Thread {
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "开始执行");
int i = 10 / 5;
System.out.println("计算结果为" + i);
}
}
Runnable
接口new Thread(线程类).start()
的方式启动线程public static void main(String[] args) {
System.out.println("线程" + Thread.currentThread().getName() + "开始执行");
new Thread(new Thread02(), "thread02").start();
System.out.println("线程" + Thread.currentThread().getName() + "执行完毕");
}
static class Thread02 implements Runnable {
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "开始执行");
int i = 10 / 5;
System.out.println("计算结果为" + i);
}
}
Callable<T>
接口FutureTask<T> futureTask = new FutureTask<>(线程类);
new Thread(futureTask).start()
的方式启动线程futureTask.get()
获取返回值public static void main(String[] args) throws Exception {
System.out.println("线程" + Thread.currentThread().getName() + "开始执行");
FutureTask<String> stringFutureTask = new FutureTask<>(new Thread03());
new Thread(stringFutureTask).start();
System.out.println(stringFutureTask.get());
System.out.println("线程" + Thread.currentThread().getName() + "执行完毕");
}
static class Thread03 implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("线程" + Thread.currentThread().getName() + "开始执行");
int i = 10 / 5;
System.out.println("计算结果为" + 10);
return "返回到主程序了" + i;
}
}
newFixedThreadPool
// @param nThreads 线程数量,核心线程数和最大线程数均为该值
// @param threadFactory 创建线程的工厂
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
newSingleThreadExecutor
// @param threadFactory 创建线程的工厂
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
newCachedThreadPool
// @nThreads 线程数量,核心线程数和最大线程数均为该值
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
所有Executors框架提供的线程池底层均为java.util.concurrent.ThreadPoolExecutor
/**
* 通过给定的参数创建一个线程池.
*
* @param corePoolSize 一直存活的线程数,即使空闲,除非设置了allowCoreThreadTimeOut。
* @param maximumPoolSize 最大存活线程数
* @param keepAliveTime 当前存活线程数大于核心线程数时,空闲线程等待新任务最大时间
* @param unit 参数keepAliveTime的时间单位
* @param workQueue 任务执行前保存任务的队列。只保存由execute方法提交的Runnable任务。
* @param threadFactory 新建线程的线程工厂
* @param handler 拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
execute
进行static ExecutorService executorService = Executors.newFixedThreadPool(4);
public static void main(String[] args) {
System.out.println("线程" + Thread.currentThread().getName() + "开始执行");
executorService.execute(new Thread01()); // 没有返回值的异步执行
executorService.execute(new Thread02());
executorService.submit(new Thread01()); // 有返回值的异步执行
executorService.submit(new Thread02());
System.out.println("线程" + Thread.currentThread().getName() + "执行完毕");
}
CompletableFuture
组合式异步编程runAsync
和 supplyAsync
方法CompletableFuture
提供了四个静态方法来创建一个异步操作。
runAsync
方法不支持返回值。supplyAsync
可以支持返回值。public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
当CompletableFuture
的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:
//可以处理异常,无返回值
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
//可以处理异常,有返回值
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
可以看到Action的类型是BiConsumer<? super T,? super Throwable>
它可以处理正常的计算结果,或者异常情况。
handle 是执行任务完成时对结果的处理。 handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
等待之前任务完成后执行
public <U> CompletionStage<U> thenApply(Function<? super T, ? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor);
public <U> CompletionStage<U> thenAccept(Consumer<? super T> action);
public <U> CompletionStage<U> thenAcceptAsync(Consumer<? super T> action);
public <U> CompletionStage<U> thenAcceptAsync(Consumer<? super T> action, Executor executor);
public <U> CompletionStage<U> thenRun(Runnable action);
public <U> CompletionStage<U> thenRunAsync(Runnable action);
public <U> CompletionStage<U> thenRunAsync(Runnable action, Executor executor);
thenCombine:组合两个future,获取两个的结果,返回当前任务的返回值
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor);
thenAcceptBoth:组合两个future,获取结果,然后处理任务,没有返回值
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor);
runAfterBoth:组合两个future,不需要获取结果,只需要两个future执行完成后就执行该任务
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor);
applyToEither:组合两个future完成其中一个执行,获取它的结果,返回当前任务的返回值
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor);
acceptEither:组合两个future完成其中一个执行,获取一个的结果,然后处理任务,没有返回值
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor);
runAfterEither:组合两个future完成其中一个执行,不需要获取结果,只需要一个future执行完成后就执行该任务
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor);
allOf:等待所有任务完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
anyOf:只要有一个完成
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);