首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >(juc系列)runnable与future等异步设计

(juc系列)runnable与future等异步设计

作者头像
呼延十
发布2021-10-22 11:04:25
2240
发布2021-10-22 11:04:25
举报
文章被收录于专栏:呼延呼延

这是JUC包中,与Future异步等相关的类图.

2021-10-18-18-03-24
2021-10-18-18-03-24

简单的介绍以及记录. 偏意识流.

首先看一下最基本的Runnable

Runnable

Runable接口, 应该由那些想要被线程执行的类来实现.它定义了一个无参数,无返回值的run()方法,负责运行代码段.

通俗点理解,Runnable接口,定义了可被运行这个概念。定义一个类,实现Runnable,将运行逻辑写入run方法,那么这个类可以手动进行执行,也可以交给线程来调度.

Future

Future代表异步计算的结果. 提供了检查计算

  • 计算是否完成
  • 等待完成
  • 获取计算结果

等等方法.

使用get()方法可以获取计算的结果,如果计算还没有完成,就等待.

还提供了判断任务是完成还是取消掉了的方法. 一旦任务完成,就不可以再取消了. 如果你想执行一个可取消的,但是没有结果的计算任务,可以使用Future<?>.然后返回null.

提供了以下方法:

  • cancel 取消任务
  • isCancelled 判断任务是否取消
  • isDone 是否完成
  • get 获取结果
  • get(time) 等待一定的时间来获取结果,超时抛出异常

在JDK中,Future的子接口和子类还是比较多的,一个一个看一下.

ScheduleFuture

public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

实现了FutureDelyed. 在Future的基础上,提供了延迟的功能.

RunnableFuture

    public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
  */
        void run();
    }

同时实现了FutureRunnable. 这个接口的子类,可以被执行,且通过Future拿到结果.

RunnableScheduledFuture

实现了上方的两个接口,这个接口的子类可以被执行,可以被调度,可以拿到结果.

public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {

    /**
     * Returns {@code true} if this task is periodic. A periodic task may
     * re-run according to some schedule. A non-periodic task can be
     * run only once.
     *
     * @return {@code true} if this task is periodic
     */
    boolean isPeriodic();
}

FutureTask

这是RunnableFuture的一个实现类,那么就具有相关的特性

  • 可被执行
  • 可以拿到结果

具体看看实现.

属性
    // 核心状态
    private volatile int state;
    // 状态的一些常量
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    // 底层的Callable
    private Callable<V> callable;
    // 返回值或者抛出的异常
    private Object outcome; // non-volatile, protected by state reads/writes
    // 线程
    private volatile Thread runner;
    // 等待线程
    private volatile WaitNode waiters;
构造方法
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

两个构造方法,分别提供与对CallableRunnable的包装.且将状态赋值为New.

Runnable 执行
    public void run() {
        // 如果任务状态不对,或者设置当前线程为运行线程失败,就退出
        if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 执行任务
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    // 抛出异常
                    setException(ex);
                }
                if (ran)
                    // 设置返回结果
                    set(result);
            }
        } finally {
            // 清理状态
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

实现Runnablerun方法.

  1. 检查状态
  2. 运行内部持有的Callable.
  3. 出错就抛出异常
  4. 成功执行结束就设置返回值.
  5. 清理状态.

当执行成功时,设置相关属性:

    protected void set(V v) {
        // 设置状态为正在完成中
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            // 返回值为结果
            outcome = v;
            // 设置状态
            STATE.setRelease(this, NORMAL); // final state
            // 调用完成
            finishCompletion();
        }
    }
    

    private void finishCompletion() {
        // assert state > COMPLETING;
        // 将所有等待者,全部唤醒
        for (WaitNode q; (q = waiters) != null;) {
            if (WAITERS.weakCompareAndSet(this, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        // 调用完成后的hook方法,留给子类去实现一些特殊的逻辑.
        done();

        callable = null;        // to reduce footprint
    }

当任务成功执行完之后,首先将所有等待的线程全部唤醒. 之后调用一个hook方法. 也就是done.这是留给子类的一个方法,可以方便的通过重写来实现特殊逻辑.

Future相关 获取结果
get 结果

get方法有永不超时和带有超时时间的两个版本. 本质上都是调用awaitDone.

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // The code below is very delicate, to achieve these goals:
        // - call nanoTime exactly once for each call to park
        // - if nanos <= 0L, return promptly without allocation or nanoTime
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        // 自旋进行等待
        for (;;) {
            // 完成了,返回状态
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            // 等一会
            else if (s == COMPLETING)
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            // 线程中断了,取消所有等待者
            else if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            // 添加新的等待者
            else if (q == null) {
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)
                queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
            else if (timed) {
                final long parkNanos;
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // 等待
                // nanoTime may be slow; recheck before parking
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            else
                LockSupport.park(this);
        }
    }

这是一个等待执行完成或者超时/中断的方法.

  1. 如果状态显示已经执行完成,返回结果
  2. 如果状态显示正在执行,当前线程让出线程执行时间》
  3. 如果线程被中断了,移除等待者,抛出中断异常.
  4. 如果当前节点没有初始化,就初始化一个Node.
  5. 如果没有入队,就把当前线程放到等待链表中.
  6. 如果有超时设置:
    1. 没超时,就等待一会.
    2. 超时了,移除等待者,返回状态
  7. 如果没有超时设置,当前线程直接阻塞.
removeWaiters 移除等待线程

当一个任务有多个等待线程时,如果正常完成了,那么需要唤醒所有等待线程。

如果在等待过程中,当前线程超时了,那么就需要移除掉自身Node.不再等待.

    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    else if (!WAITERS.compareAndSet(this, q, s))
                        continue retry;
                }
                break;
            }
        }
    }
取消任务 cancel
    public boolean cancel(boolean mayInterruptIfRunning) {
        // 任务必须还是新创建的状态,如果不是就取消失败
        if (!(state == NEW && STATE.compareAndSet
              (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            // 中断线程
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    STATE.setRelease(this, INTERRUPTED);
                }
            }
        } finally {
            // 任务执行完成,虽然是取消导致的完成.
            finishCompletion();
        }
        return true;
    }

判断状态, 然后中断线程,设置任务已经完成.

查看状态
    // 取消了
    public boolean isCancelled() {
        return state >= CANCELLED;
    }

    // 状态不是new. 代表任务完成.
    public boolean isDone() {
        return state != NEW;
    }
ForkJoinTask

这是用于ForkJoinPool的一个任务实现,它以及它的三个子类

  • RecuriveAction
  • RecuriveTask
  • CountedCompleter

已经在ForkJoin框架文章中讲过了,这里不再赘述.

完。

联系我

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。 也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。

以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Runnable
  • Future
    • ScheduleFuture
      • RunnableFuture
        • RunnableScheduledFuture
          • FutureTask
            • 属性
            • 构造方法
            • Runnable 执行
            • Future相关 获取结果
            • ForkJoinTask
        • 联系我
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档