线程池源码中出现了很多Callable、Future、FutureTask等以前没介绍过的接口,尤其是线程池提交任务时总是把任务封装成FutureTask,今天就来为大家解惑:
Runnable接口只有一个run方法,而run方法的返回值是void,所以线程执行完之后没有返回值。
public interface Runnable {
public abstract void run();
}
在很多场景下,我们通过线程来异步执行任务之后,希望获取到任务的执行结果。比如RPC框架中,需要异步获取任务返回值。这种情况下,Runnable无法获取返回值就无法满足需求了,因此Callable就出现了。
Callable也是一个接口,也只有一个call()方法,不同的是Callable的call()方法有是有返回值的,返回值的类型是一个泛型,泛型由创建Callable对象时指定。
public interface Callable<V> {
V call() throws Exception;
}
要想获得Callable的返回值就需要用到Future接口。Futrue可以监视和控制Callable任务的执行情况,如对执行结果进行取消、查询是否完成、获取结果等。
如:当一个任务通过线程池的submit()方法提交到线程池后,线程池会返回一个Future类型的对象,我们可以通过Future对象来获取任务在线程池中的状态。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
举例:Future获取Callable任务的返回值
public class FutureExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService threadPool = Executors.newCachedThreadPool();
Future<String> future = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "结果";
}
});
System.out.println("Callable返回值=" + future.get());
}
}
输出结果:
Callable返回值=结果
FutureTask是Runnable和Future的实现类,既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
当线程池调用submit()方法来向线程池中提交任务时,无论提交的是Runnable类型的任务,还是提交的是Callable类型的任务,最终都是将任务封装成一个FutureTask对象,我们可以通过这个FutureTask对象来获取任务在线程池中的状态。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 调用newTaskFor()将Callable任务封装成一个FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// 执行任务
execute(ftask);
return ftask;
}
// newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
// 直接new一个FutureTask对象
return new FutureTask<T>(callable);
}
public class FutureTask<V> implements RunnableFuture<V> {
/** state变量用来保存任务的状态 */
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** 提交的任务,Runnable类型的任务会通过Executors.callable()来转变为Callable */
private Callable<V> callable;
/** 用来保存Callable的call()方法的返回值 */
private Object outcome;
/** 执行Callable任务的线程 **/
private volatile Thread runner;
/**
* 任务未完成时,调用get方法获取结果的线程会阻塞等待
* waiters用于保存这些线程
*/
private volatile WaitNode waiters;
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
}
FutureTask任务的状态如下:
// 任务的初始状态,当新建一个FutureTask任务时,state值默认为NEW
private static final int NEW = 0;
// 任务处于完成中,也就是正在执行还未设置返回值
private static final int COMPLETING = 1;
// 任务正常被执行完成,并将任务的返回值赋值给outcome属性之后
private static final int NORMAL = 2;
// 任务出了异常,并将异常对象赋值给outcome属性之后
private static final int EXCEPTIONAL = 3;
// 调用cancle(false),任务被取消了
private static final int CANCELLED = 4;
// 调用cancle(true),任务中断,但是在线程中断之前
private static final int INTERRUPTING = 5;
// 调用cancle(true),任务中断,但是在线程中断之后
private static final int INTERRUPTED = 6;
状态变化如下图:
public void run() {
/*
* 1. 不是NEW状态,不能执行
* 2. 设置runner失败,不能执行
*/
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();// 真正执行任务
ran = true;// 执行成功,设置执行成功标志
} catch (Throwable ex) {
result = null;
ran = false;// 有异常,执行失败
setException(ex);// 设置异常
}
// 如果执行成功,则设置返回结果
if (ran)
set(result);
}
} finally {
runner = null;// 无论是否执行成功,把runner设置为null
int s = state;
// 处理中断
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
/**
* 设置执行结果
*/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 执行完成,设置COMPLETING状态
outcome = v;// 设置执行结果
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 设置完结果,设置NORMAL状态
finishCompletion();// 逐个唤醒waiters中的线程去获取执行结果
}
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果状态处于NEW或者COMPLETING状态,表示任务还没有执行完成,awaitDone()等待
if (s <= COMPLETING)
s = awaitDone(false, 0L);// 下文详解
// 返回结果,下文详解
return report(s);
}
/**
* 返回执行结果
*/
private V report(int s) throws ExecutionException {
Object x = outcome;
// 任务正常结束时,返回outcome
if (s == NORMAL)
return (V)x;
// 任务被取消了,抛出CancellationException
if (s >= CANCELLED)
throw new CancellationException();
// 这里只能第EXCEPTIONAL状态,表示在执行过程中出现了异常,抛出ExecutionException。
throw new ExecutionException((Throwable)x);
}
/**
* 处于NEW或者COMPLETING状态时,get线程等待
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// ......
for (;;) {
// ......
// 任务处于COMPLETING中,就让当前线程先暂时放弃CPU的执行权
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// ......
// 如果设置了超时,阻塞至超时时间
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 等待一段时间
LockSupport.parkNanos(this, nanos);
}
else
// 如果没有设置超时,会一直阻塞,直到被中断或者被唤醒
LockSupport.park(this);
}
}
将任务状态设置成INTERRUPTING/INTERRUPTED/CANCELLED状态就表示取消了线程,因为在这些状态下任务的run方法是不能执行的。
public boolean cancel(boolean mayInterruptIfRunning) {
/*
* 以下情况不能取消任务:
* 1. 当前任务不是NEW状态,已经被执行了,不能取消
* 2. 当前任务还没有执行,state == NEW,但是CAS设置状态失败,不能取消
*/
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// 中断
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();// 中断线程
} finally { // final state
// 中断之后,设置INTERRUPTED状态
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();// 唤醒waiters中的线程去获取执行结果
}
return true;
}