FutureTask:一个可取消的异步任务执行类,这个类提供了Future接口的基本实现,主要有以下功能:
先看一下类关系图:
FutureTask类关系图.jpg
修饰符 | 变量名 | 描述 |
---|---|---|
private Callable | callable | 任务的执行体 |
private Object | outcone | 最终输出的结果 |
private volatile Thread | runner | 异步执行任务的线程 |
private volatile WaitNode | waiters | 获取任务结果的等待线程(是一个链式列表) |
private volatile int | state | 当前一步任务的状态 |
private static final int | NEW | 任务初始化状态 |
private static final int | COMPLETING | 任务已经完成,但结果还没有赋值给outcome |
private static final int | NORMAL | 任务执行完成 |
private static final int | EXCEPTIONAL | 任务执行异常 |
private static final int | INTERRUPTING | 任务被中断中 |
private static final int | INTERRUPTED | 任务被中断 |
通过传入Callable来构造一个任务
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
通过传人Runnable来构造一个任务
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
以上是两个构造函数,有构造函数可知,回家传入参数转换成Callable,然后放到callable变量里,将任务执行状态置为NEW。
公共方法概要如下:
修饰符 | 方法 | 描述 |
---|---|---|
boolean | cancel(boolean mayInterruptIfRunning) | 取消或者中断任务(true为中断,false为取消) |
V | get() | 返回执行结果,当为完成执行时,则阻塞线程 |
V | get(long timeout, TimeUnit unit) | 获得执行结果,当时间超出设定时间时,则返回超时 |
boolean | isCancelled() | 返回任务是否已经取消 |
boolean | isDone() | 判断任务是否执行完毕 |
执行任务时可能有4种状态转换:
public void run() {
//如果状态不是初始化后,则返回(避免重复执行)
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//执行主要方法
result = c.call();
ran = true;
} catch (Throwable ex) {
//异常处理
result = null;
ran = false;
setException(ex);
}
//如果执行成功,则设置结果和状态
if (ran)
set(result);
}
} finally {
// 将执行任务的执行线程置空
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
//中断处理
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
/**
* 执行结果的赋值操作, 子类可重写
**/
protected void set(V v) {
//为state赋值COMPLETING
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
//赋值完成后,为state赋值NORMAL
U.putOrderedInt(this, STATE, NORMAL);
//完成处理
finishCompletion();
}
}
/**
* 在任务执行完成(包括取消、正常结束、发生异常), 将等待线程列表唤醒
* 同时让任务执行体置空
**/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//释放许可
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//完成之后需要执行的方法,该方法可重写
done();
callable = null; // to reduce footprint
}
public boolean cancel(boolean mayInterruptIfRunning) {
//如果任务状态不是初始化状态,则取消任务
if (!(state == NEW &&
U.compareAndSwapInt(this, STATE, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
//则让执行线程中断(在这个过程中执行线程可能会改变任务的状态)
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
//改变任务状态为INTERRUPTED
U.putOrderedInt(this, STATE, INTERRUPTED);
}
}
} finally {
//处理任务完成的结果
finishCompletion();
}
return true;
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
//返回结果值
return report(s);
}
/**
* 等待任务的执行结果
* timed: 是否有时间限制 nanos: 限制的时间
**/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
/**
* 1. 首先判断任务状态是否是完成状态, 是就直接返回结果
* 2. 如果1为false,并且任务的状态是COMPLETING, 也就是在set()任务结果时被阻塞了,则让出当前线程cpu资源
* 3. 如果前两步false,并且q==null,则初始化一个当前线程的等待节点
* 4. 下一次循环体, 如果前3步依然是false,并且当前节点没有加入到等待列表,
* 则将当前线程节点放在等待列表的第一个位置
* 5. 在下一次循环, 如果前4步为false, 如果是时间范围内等待的,则判断当前时间是否过期,
* 过期则将线程节点移出等待队列并返回任务状态结果, 如果没过期,则让当前线程阻塞一定时间
* 6. 如果不是时间范围内等待, 并且前5步均为false,则让线程阻塞,直到被唤醒
**/
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
Thread.yield();
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)
queued = U.compareAndSwapObject(this, WAITERS,
q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
LockSupport.park(this);
}
}
注:如果你在源码中看到sun.misc.Unsafe,这个类是Java中直接操作内存的,可以理解为C语言中的指针,由于安全原因,Java并不向外部透露此类,使用时可以通过反射的方法。如果想要了解具体使用方法,请自行百度~
ExecutorService executor = new ScheduledThreadPoolExecutor(2);
FutureTask<String> future = new FutureTask<>(() -> {
System.out.println("FutureTask sleep..., Time is " + System.currentTimeMillis());
Thread.sleep(5000);
return "OK";
});
executor.execute(future);
System.out.println("MainThread sleep..., Time is " + System.currentTimeMillis());
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String result = "";
try {
result = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("result is " + result + ", Time is " + System.currentTimeMillis());
执行结果如下:
MainThread sleep..., Time is 1491225495421
FutureTask sleep..., Time is 1491225495421
result is OK, Time is 1491225500426
作 者:ChanghuiN
原文链接:https://cloud.tencent.com/developer/article/1333329
版权声明:非特殊声明均为本站原创作品,转载时请注明作者和原文链接。