前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >FutureTask源码分析

FutureTask源码分析

作者头像
haifeiWu
发布2018-09-11 10:19:27
3860
发布2018-09-11 10:19:27
举报

FutureTask:一个可取消的异步任务执行类,这个类提供了Future接口的基本实现,主要有以下功能:

  • 异步执行任务
  • 可以开始、取消以及查看任务是否完成
  • 如果任务没有执行完,get方法会导致线程阻塞
  • 一旦一个执行任务已经完成就不能再次开始和结束(除非执行时通过runAndReset()方法)

类关系

先看一下类关系图:

FutureTask类关系图.jpg

  • Future为Java Tuture模式接口,Runable是实现异步操作的接口
  • RunnableFuture同时继承了以上两个接口,所以同时具有两种功能
  • 而FutureTask为RunnableFuture的实现类

成员变量

修饰符

变量名

描述

private Callable

callable

任务的执行体

private Object

outcone

最终输出的结果

private volatile Thread

runner

异步执行任务的线程

private volatile WaitNode

waiters

获取任务结果的等待线程(是一个链式列表)

private volatile int

state

当前一步任务的状态

private static final int

NEW

任务初始化状态

private static final int

COMPLETING

任务已经完成,但结果还没有赋值给outcome

private static final int

NORMAL

任务执行完成

private static final int

EXCEPTIONAL

任务执行异常

private static final int

INTERRUPTING

任务被中断中

private static final int

INTERRUPTED

任务被中断

构造函数及方法

通过传入Callable来构造一个任务

代码语言:javascript
复制
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

通过传人Runnable来构造一个任务

代码语言:javascript
复制
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

以上是两个构造函数,有构造函数可知,回家传入参数转换成Callable,然后放到callable变量里,将任务执行状态置为NEW。

公共方法概要如下:

修饰符

方法

描述

boolean

cancel(boolean mayInterruptIfRunning)

取消或者中断任务(true为中断,false为取消)

V

get()

返回执行结果,当为完成执行时,则阻塞线程

V

get(long timeout, TimeUnit unit)

获得执行结果,当时间超出设定时间时,则返回超时

boolean

isCancelled()

返回任务是否已经取消

boolean

isDone()

判断任务是否执行完毕

执行状态转换

执行任务时可能有4种状态转换:

  1. 任务顺利执行:NEW -> COMPLETING -> NORMAL
  2. 任务执行异常:NEW -> COMPLETING -> EXCEPTIONAL
  3. 任务取消:NEW -> CANCELLED
  4. 任务中断:NEW -> INTERRUPTING -> INTERRUPTED

主要方法

run()

代码语言:javascript
复制
public void run() {
	//如果状态不是初始化后,则返回(避免重复执行)
    if (state != NEW ||
        !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
            	//执行主要方法
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
            	//异常处理
                result = null;
                ran = false;
                setException(ex);
            }
            //如果执行成功,则设置结果和状态
            if (ran)
                set(result);
        }
    } finally {
        // 将执行任务的执行线程置空
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        //中断处理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

set()

代码语言:javascript
复制
/**
* 执行结果的赋值操作, 子类可重写
**/
protected void set(V v) {
	//为state赋值COMPLETING
    if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
        outcome = v;
        //赋值完成后,为state赋值NORMAL
        U.putOrderedInt(this, STATE, NORMAL);
        //完成处理
        finishCompletion();
    }
}

finishCompletion()

代码语言:javascript
复制
/**
* 在任务执行完成(包括取消、正常结束、发生异常), 将等待线程列表唤醒
* 同时让任务执行体置空
**/
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (U.compareAndSwapObject(this, WAITERS, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    //释放许可
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
	//完成之后需要执行的方法,该方法可重写
    done();

    callable = null;        // to reduce footprint
}

cancel(boolean mayInterruptIfRunning)

代码语言:javascript
复制
public boolean cancel(boolean mayInterruptIfRunning) {
	//如果任务状态不是初始化状态,则取消任务
    if (!(state == NEW &&
          U.compareAndSwapInt(this, STATE, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
            	//则让执行线程中断(在这个过程中执行线程可能会改变任务的状态)
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
            	//改变任务状态为INTERRUPTED
                U.putOrderedInt(this, STATE, INTERRUPTED);
            }
        }
    } finally {
    	//处理任务完成的结果
        finishCompletion();
    }
    return true;
}

get()

代码语言:javascript
复制
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    //返回结果值
    return report(s);
}

awaitDone(boolean timed, long nanos)

代码语言:javascript
复制
/**
* 等待任务的执行结果
* timed: 是否有时间限制  nanos: 限制的时间
**/ 
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    long startTime = 0L;    // Special value 0L means not yet parked
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        int s = state;
        /**
        * 1. 首先判断任务状态是否是完成状态, 是就直接返回结果
        * 2. 如果1为false,并且任务的状态是COMPLETING, 也就是在set()任务结果时被阻塞了,则让出当前线程cpu资源
        * 3. 如果前两步false,并且q==null,则初始化一个当前线程的等待节点
        * 4. 下一次循环体, 如果前3步依然是false,并且当前节点没有加入到等待列表,
        *     则将当前线程节点放在等待列表的第一个位置
        * 5. 在下一次循环, 如果前4步为false, 如果是时间范围内等待的,则判断当前时间是否过期,
        *    过期则将线程节点移出等待队列并返回任务状态结果, 如果没过期,则让当前线程阻塞一定时间
        * 6. 如果不是时间范围内等待, 并且前5步均为false,则让线程阻塞,直到被唤醒
        **/
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING)
            Thread.yield();
        else if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        else if (q == null) {
            if (timed && nanos <= 0L)
                return s;
            q = new WaitNode();
        }
        else if (!queued)
            queued = U.compareAndSwapObject(this, WAITERS,
                                            q.next = waiters, q);
        else if (timed) {
            final long parkNanos;
            if (startTime == 0L) { // first time
                startTime = System.nanoTime();
                if (startTime == 0L)
                    startTime = 1L;
                parkNanos = nanos;
            } else {
                long elapsed = System.nanoTime() - startTime;
                if (elapsed >= nanos) {
                    removeWaiter(q);
                    return state;
                }
                parkNanos = nanos - elapsed;
            }
            // nanoTime may be slow; recheck before parking
            if (state < COMPLETING)
                LockSupport.parkNanos(this, parkNanos);
        }
        else
            LockSupport.park(this);
    }
}

注:如果你在源码中看到sun.misc.Unsafe,这个类是Java中直接操作内存的,可以理解为C语言中的指针,由于安全原因,Java并不向外部透露此类,使用时可以通过反射的方法。如果想要了解具体使用方法,请自行百度~

用法示例

代码语言:javascript
复制
ExecutorService executor = new ScheduledThreadPoolExecutor(2);
FutureTask<String> future = new FutureTask<>(() -> {
    System.out.println("FutureTask sleep..., Time is " + System.currentTimeMillis());
    Thread.sleep(5000);
    return "OK";
});
executor.execute(future);
System.out.println("MainThread sleep..., Time is " + System.currentTimeMillis());
try {
    Thread.sleep(4000);
} catch (InterruptedException e) {
    e.printStackTrace();
}
String result = "";
try {
    result = future.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
System.out.println("result is " + result + ", Time is " + System.currentTimeMillis());

执行结果如下:

代码语言:javascript
复制
MainThread sleep..., Time is 1491225495421
FutureTask sleep..., Time is 1491225495421
result is OK, Time is 1491225500426

作 者:ChanghuiN

原文链接:https://cloud.tencent.com/developer/article/1333329

版权声明:非特殊声明均为本站原创作品,转载时请注明作者和原文链接。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 类关系
  • 成员变量
  • 构造函数及方法
  • 执行状态转换
  • 主要方法
    • run()
      • set()
        • finishCompletion()
          • cancel(boolean mayInterruptIfRunning)
            • get()
              • awaitDone(boolean timed, long nanos)
              • 用法示例
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档