前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Future、FutureTask实现原理浅析(源码解读)

Future、FutureTask实现原理浅析(源码解读)

作者头像
一枝花算不算浪漫
发布2018-12-27 15:34:25
7470
发布2018-12-27 15:34:25
举报

前言

最近一直在看JUC下面的一些东西,发现很多东西都是以前用过,但是真是到原理层面自己还是很欠缺。 刚好趁这段时间不太忙,回来了便一点点学习总结。 由于自己水平有限,可能存在大量漏洞和思考不周到的地方,不吝赐教。

Future 模式

一种非常经典的设计模式,这种设计模式主要就利用空间换时间得到概念,也就是说异步执行(需要开启一个新的线程)。 在互联网高并发的应用服务中,我们随处可见这种理念和代码,主要就是使用了这种模式。 Future模式非常适合在处理耗时很长的业务逻辑时进行使用,可以有效的减小系统的响应时间,提高系统的吞吐量。

Future代码示例:
代码语言:javascript
复制
/**
 * @Description:
 * @Author: wangmeng
 * @Date: 2018/12/17-20:44
 */
public class UseFuture implements Callable<String> {

    private String param;

    public UseFuture(String param) {
        this.param = param;
    }

    @Override
    public String call() throws Exception {
        //模拟执行业务逻辑的耗时
        TimeUnit.SECONDS.sleep(3);
        String result = this.param + " 处理完成!";
        return result;
    }

    public static void main(String[] args) throws Exception{
        String queryStr = "query1";
        String queryStr2 = "query2";
        FutureTask<String> future1 = new FutureTask<String>(new UseFuture(queryStr));
        FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr2));

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(future1);//异步操作
        executorService.submit(future2);//异步操作

        System.out.println("执行中...");

        TimeUnit.SECONDS.sleep(2);//处理其他相关的任务。
        String result1 = future1.get();
        String result2 = future2.get();

        System.out.println("数据处理完成。。" + result1);
        System.out.println("数据处理完成。。" + result2);
    }
}
应用场景

Future模式有点类似于商品订单,比如在网购时,当看中某一件商品时,就可以提交订单,当订单处理完成后,在家等待商品送货上门即可。 或者说更形象的我们发送Ajax请求的时候,页面是异步的进行后台处理,用户无需一直等待请求的结果,可以继续浏览或操作其他内容。

流程图
流程图
Future实现原理

看到上面示例代码,我们是通过executorService.submit(future1) 来提交线程的,进一步看看里面具体的逻辑。

1、 AbstractExecutorService 中submit()源码:

submit
submit

2、FutureTask中run()源码:

代码语言:javascript
复制
public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

这个是核心代码,首先我们需要知道FutureTask中有一个volatile state全局变量,通过这个值来界定任务是否已经执行完毕。

state
state

将上面run方法一点点拆解如下:

run方法
run方法

先判断state状态,如果不是NEW说明执行完毕,直接return掉。 后面使用CAS操作,判断这个任务是否已经执行,这里FutureTask有个全局的volatile runner字段,这里通过cas将当前线程指定给runner。 这里可以防止callable被执行多次。 接着往下看:

run方法2
run方法2

查看set方法具体实现:

set方法
set方法

继续往下跟,查看finishCompletion方法: FutureTask中有一个WaiteNode单链表,当执行futureTask.get()方法时,多个线程会将等待的线程的next指向下一个想要get获取结果的线程。 finishCompletion主要就是使用Unsafe.unpark()进行唤醒操作。

finish
finish

3,FutureTask.get() 源码 get() 方法会进行自旋操作等待,直到FutureTask中的state状态大于NORMAL(表示自行完成),然后才会通过FutureTask的outcome获取返回值。

get
get

接着往下跟awaitDone方法:

代码语言:javascript
复制
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

还是老样子,一点点分析:

await
await
await2
await2

不知道自己理解的是否有偏差,有问题欢迎大家随时指出,感谢备至。 到了这里 就不再讲解了,后面还有report、cancel等方法,大家可以自行参阅源码。

总结

结合上述分析可得 FutureTask 执行活动图如下:

流程图
流程图

同时也可以看出,在 FutureTask 中内部维护了一个单向链表 waiters , 在执行 get 的时候会向其中添加节点:

waiters
waiters

最后特别感谢掘金此位博主的分享,参考如下: FutureTask 源码分析

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-12-20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
    • Future 模式
      • Future代码示例:
      • 应用场景
      • Future实现原理
      • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档