首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rust Async: async-task源码分析

Rust Async: async-task源码分析

作者头像
MikeLoveRust
发布2019-11-24 16:13:32
1.4K0
发布2019-11-24 16:13:32
举报

本文转载自知乎地址:https://zhuanlan.zhihu.com/p/92679351?utm_source=wechat_session&utm_medium=social&utm_oi=51691969839104

async-std是rust异步生态中的基础运行时库之一,核心理念是合理的性能 + 用户友好的api体验。经过几个月密集的开发,前些天已经发布1.0稳定版本。因此是时候来一次深入的底层源码分析。async-std的核心是一个带工作窃取的多线程Executor,而其本身的实现又依赖于async-task这个关键库,因此本文主要对async-task的源码进行分析。

当Future提交给Executor执行时,Executor需要在堆上为这个Future分配空间,同时需要给它分配一些状态信息,比如Future是否可以执行(poll),是否在等待被唤醒,是否已经执行完成等等。我们一般把提交给Executor执行的Future和其连带的状态称为 task。async-task这个库就是对task进行抽象封装,以便于Executor的实现,其有几个创新的特性:

  1. 整个task只需要一次内存分配;
  2. 完全隐藏了RawWaker,以避免实现Executor时处理unsafe代码的麻烦;
  3. 提供了 JoinHandle,这样spawn函数对Future没有 Output=()的限制,极大方便用户使用;

使用方式

async-task只对外暴露了一个函数接口以及对应了两个返回值类型:

pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)where    F: Future<Output = R> + Send + 'static,    R: Send + 'static,    S: Fn(Task<T>) + Send + Sync + 'static,    T: Send + Sync + 'static,

其中,参数future表示要执行的Future,schedule是一个闭包,当task变为可执行状态时会调用这个函数以调度该task重新执行,tag是附带在该task上的额外上下文信息,比如task的名字,id等。返回值Task就是构造好的task对象,JoinHandle实现了Future,用于接收最终执行的结果。

值得注意的是spawn这个函数并不会做类似在后台进行计算的操作,而仅仅是分配内存,创建一个task出来,因此其实叫create_task反而更为恰当且好理解。

Task提供了如下几个方法:

    // 对该task进行调度
    pub fn schedule(self);
    // poll一次内部的Future,如果Future完成了,则会通知JoinHandle取结果。否则task进
    // 入等待,直到被被下一次唤醒进行重新调度执行。
    pub fn run(self);
    // 取消task的执行
    pub fn cancel(&self);
    // 返回创建时传入的tag信息
    pub fn tag(&self) -> &T;

JoinHandle实现了Future trait,同时也提供了如下几个方法:

    // 取消task的执行
    pub fn cancel(&self);
    // 返回创建时传入的tag信息
    pub fn tag(&self) -> &T;

同时,Task和JoinHandle都实现了Send+Sync,所以他们可以出现在不同的线程,并通过tag方法可以同时持有 &T,因此spawn函数对T有Sync的约束。

借助于async_task的抽象,下面的几十行代码就实现了一个共享全局任务队列的多线程Executor:

use std::future::Future;
use std::thread;

use crossbeam::channel::{unbounded, Sender};
use futures::executor;
use once_cell::sync::Lazy;

static QUEUE: Lazy<Sender<async_task::Task<()>>> = Lazy::new(|| {
    let (sender, receiver) = unbounded::<async_task::Task<()>>();
    for _ in 0..4 {
        let recv = receiver.clone();

        thread::spawn(|| {
            for task in recv {
                task.run();
            }
        });
    }

    sender
});

fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, ()>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let schedule = |task| QUEUE.send(task).unwrap();
    let (task, handle) = async_task::spawn(future, schedule, ());

    task.schedule();

    handle
}

fn main() {
    let handles: Vec<_> = (0..10).map(|i| {
        spawn(async move {
            println!("Hello from task {}", i);
        })
    }).collect();

    // Wait for the tasks to finish.
    for handle in handles {
        executor::block_on(handle);
    }
}

Task的结构图

通常rust里的并发数据结构会包含底层的实现,一般叫Inner或者RawXXX,包含大量裸指针等unsafe操作,然后再其基础上进行类型安全包装,提供上层语义。比如channel,上层暴露出 SenderReceiver,其行为不一样,但内部表示是完全一样的。async-task也类似,JoinHandle, Task以及调用Future::poll时传递的Waker类型内部都共享同一个RawTask结构。由于JoinHandle本身是一个Future,整个并发结构还有第四个角色-在JoinHandle上调用poll的task传递的Waker,为避免引起混淆就称它为Awaiter吧。整个的结构图大致如下:

整个task在堆上一次分配,内存布局按Header,Tag, Schedule,Future/Output排列。由于Future和Output不同时存在,因此他们共用同一块内存。

  • JoinHandle:只有一个,不访问Future,可以访问Output,一旦销毁就不再生成;
  • Task:主要访问Future,销毁后可以继续生成,不过同一时间最多只有一个,这样可以避免潜在的多个Task对Future进行并发访问的bug;
  • Waker:可以存在多份,主要访问schedule数据,由于spawn函数的参数要求schedule必须是Send+Sync,因此多个waker并发调用是安全的。
  • Header:本身包含三个部分,state是一个原子变量,包含引用计数,task的执行状态,awaiter锁等信息;awaiter保存的是JoinHandle所在的task执行时传递的Waker,用于当Output生成后通知JoinHandle来取;vtable是一个指向静态变量的虚表指针。

task中的状态

所有的并发操作都是通过Header中的state这个原子变量来进行同步协调的。主要有以下几种flag:

  1. constSCHEDULED:usize=1<<0; task已经调度准备下一次执行,这个flag可以和RUNGING同时存在。
  2. constRUNNING:usize=1<<1; 这个task正在执行中,这个flag可以和SCHEDULED同时存在。
  3. constCOMPLETED:usize=1<<2; 这个task的future已经执行完成。
  4. constCLOSED:usize=1<<3; 表示这个task要么被cancel掉了,要么output被JoinHandle取走了,是一个终结状态。
  5. constHANDLE:usize=1<<4; 表示JoinHandle存在。
  6. constAWAITER:usize=1<<5; 表示JoinHandle正在等待Output,用于快速判断Header里的awaiter不为None,避免获取锁的操作。
  7. constLOCKED:usize=1<<6; 读写Header里的awaiter时,需要设置这个字段,标识是否处于locked状态。
  8. constREFERENCE:usize=1<<7; 从第7bit开始到最高位当作引用计数用,代表Task和Waker的总数,主要JoinHandle在HANDLE的flag里跟踪。

JoinHandle的实现分析

JoinHandle::cancel

为避免并发问题,JoinHandle不接触Future数据,而由于取消task的执行需要析构Future数据,因此cancel操作通过重新schedule一次,把操作传递给Task执行。

impl<R, T> JoinHandle<R, T> {
    pub fn cancel(&self) {
        let ptr = self.raw_task.as_ptr();
        let header = ptr as *const Header;

        unsafe {
            let mut state = (*header).state.load(Ordering::Acquire);

            loop {
                // 如果task已经结束或者closed,什么也不做。
                if state & (COMPLETED | CLOSED) != 0 {
                    break;
                }

                let new = if state & (SCHEDULED | RUNNING) == 0 {
                    // 如果不处于scheduled或running状态,那么下面就需要调用schedule
                    // 函数通知Task,因此要加上SCHEDULED 和增加引用计数
                    (state | SCHEDULED | CLOSED) + REFERENCE
                } else {
                    // 否则要么task已经schedue过了,过段时间会重新执行,要么当前正在
                    // 运行,因此只需要设置closed状态,task执行完后会收到close状态并
                    // 进行处理。
                    state | CLOSED
                };

                match (*header).state.compare_exchange_weak(
                    state,
                    new,
                    Ordering::AcqRel,
                    Ordering::Acquire,
                ) {
                    Ok(_) => {
                        // 重新schedule以便executor将Future销毁
                        if state & (SCHEDULED | RUNNING) == 0 {
                            ((*header).vtable.schedule)(ptr);
                        }

                        // 如果有awaiter的话,通知相应的的task。
                        if state & AWAITER != 0 {
                            (*header).notify();
                        }

                        break;
                    }
                    Err(s) => state = s,// 失败重试
                }
            }
        }
    }
}

JoinHandle::drop

由于整个task的所有权是由JoinHandle,Task和Waker共享的,因此都需要手动实现drop。Output只会由JoinHandle访问,因此如果有的话也要一同销毁。

impl<R, T> Drop for JoinHandle<R, T> {
    fn drop(&mut self) {
        let ptr = self.raw_task.as_ptr();
        let header = ptr as *const Header;

        let mut output = None;

        unsafe {
            // 由于很多时候JoinHandle不用,会在刚创建的时候直接drop掉,因此针对这种情
            // 况作一个特殊化处理。这样一个原子操作就完成了。
            if let Err(mut state) = (*header).state.compare_exchange_weak(
                SCHEDULED | HANDLE | REFERENCE,
                SCHEDULED | REFERENCE,
                Ordering::AcqRel,
                Ordering::Acquire,
            ) {
                loop {
                    // 如果task完成了,但是还没有close掉,说明output还没有被取走,需
                    // 要在这里取出来进行析构。
                    if state & COMPLETED != 0 && state & CLOSED == 0 {
                        // 标记为closed,这样就可以安全地读取output的数据。
                        match (*header).state.compare_exchange_weak(
                            state,
                            state | CLOSED,
                            Ordering::AcqRel,
                            Ordering::Acquire,
                        ) {
                            Ok(_) => {
                                output =
                                    Some((((*header).vtable.get_output)(ptr) as *mut R)
                                    .read());

                                // 更新状态重新循环
                                state |= CLOSED;
                            }
                            Err(s) => state = s,
                        }
                    } else {
                        // 进到这里说明task要么没完成,要么已经closed了。
                        let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
                            // Task和Waker都已经没了,并且没closed,根据进else的条
                            // 件可知task没完成,Future还在,重新schedule一次,让
                            // executor把Future析构掉。
                            SCHEDULED | CLOSED | REFERENCE
                        } else {
                            // 移除HANDLE flag
                            state & !HANDLE
                        };

                        match (*header).state.compare_exchange_weak(
                            state,
                            new,
                            Ordering::AcqRel,
                            Ordering::Acquire,
                        ) {
                            Ok(_) => {
                                // 如果这是最后一个引用
                                if state & !(REFERENCE - 1) == 0 {
                                    if state & CLOSED == 0 {
                                        //并且没closed,根据进else的条件可知task没
                                        // 完成,重新schedule一次,析构Future
                                        ((*header).vtable.schedule)(ptr);
                                    } else {
                                        // task已经完成了,output也已经在上面读出
                                        // 来了,同时也是最后一个引用,需要把task自
                                        // 身析构掉。
                                        ((*header).vtable.destroy)(ptr);
                                    }
                                }

                                // 还有其他引用在,资源的释放由他们负责。
                                break;
                            }
                            Err(s) => state = s,
                        }
                    }
                }
            }
        }

        // 析构读取出来的output
        drop(output);
    }
}

JoinHandle::poll

检查Output是否已经可以拿,没有的话注册cx.waker()等通知。

impl<R, T> Future for JoinHandle<R, T> {
    type Output = Option<R>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let ptr = self.raw_task.as_ptr();
        let header = ptr as *const Header;

        unsafe {
            let mut state = (*header).state.load(Ordering::Acquire);

            loop {
                // task已经closed了,没output可拿。
                if state & CLOSED != 0 {
                    // 大部分可情况下,header里的awaiter就是cx.waker,也有例外,因
                    // 此一并进行通知。
                    (*header).notify_unless(cx.waker());
                    return Poll::Ready(None);
                }

                // 如果task还没完成
                if state & COMPLETED == 0 {
                    // 那么注册当前的cx.waker到Header::awaiter里,这样完成了可以收
                    // 到通知。
                    abort_on_panic(|| {
                        (*header).swap_awaiter(Some(cx.waker().clone()));
                    });

                    // 要是在上面注册前正好task完成了,那么就收不到通知了,因此注册后
                    // 需要重新读取下状态看看。
                    state = (*header).state.load(Ordering::Acquire);

                    // task已经closed了,没output可拿,返回None。
                    if state & CLOSED != 0 {
                        // 这里我分析下来是不需要再通知了,提了个pr等作者回应。
                        (*header).notify_unless(cx.waker());
                        return Poll::Ready(None);
                    }

                    // task还没完成,上面已经注册了waker,可以直接返回Pending。
                    if state & COMPLETED == 0 {
                        return Poll::Pending;
                    }
                }

                // 到这里说明task已经完成了。把它设置为closed状态,就可以拿output了。
                match (*header).state.compare_exchange(
                    state,
                    state | CLOSED,
                    Ordering::AcqRel,
                    Ordering::Acquire,
                ) {
                    Ok(_) => {
                        // 设置closed成功,通知其他的awaiter。由于上面是原子的swap操
                        // 作,且一旦设置为closed,awaiter就不会再变更了,因此可以
                        // 用AWAITER这个flag进行快速判断。
                        if state & AWAITER != 0 {
                            (*header).notify_unless(cx.waker());
                        }

                        // 读取出Output并返回。
                        let output = ((*header).vtable.get_output)(ptr) as *mut R;
                        return Poll::Ready(Some(output.read()));
                    }
                    Err(s) => state = s,
                }
            }
        }
    }
}

Task的实现分析

Task::schedule

这个函数先通过Task内部保存的指针指向Header,并从Header的vtable字段中拿到schedule函数指针,这个函数最终调用的是用户调用spawn时传入的schedule闭包。因此本身很直接。

Task::run

这个函数先通过Task内部保存的指针指向Header,并从Header的vtable字段中拿到run函数指针,其指向RawTask::run,实现如下:

首先根据指针参数强转为RawTask,并根据Header的vtable拿到RawWakerVTable,构造好Waker和Context,为调用Future::poll做准备。

unsafe fn run(ptr: *const ()) {
    let raw = Self::from_ptr(ptr);

    let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(
        ptr,
        &(*raw.header).vtable.raw_waker,
    )));
    let cx = &mut Context::from_waker(&waker);
    
    //...
}

然后获取当前的state,循环直到更新state的RUNING成功为止。

    let mut state = (*raw.header).state.load(Ordering::Acquire);
    loop {
        // 如果task已经closed,那么Future可以直接析构掉,并返回。
        if state & CLOSED != 0 {
            if state & AWAITER != 0 {
                (*raw.header).notify();
            }

            Self::drop_future(ptr);

            // 扣掉当前task的引用计数,因为run函数的参数是self。
            Self::decrement(ptr);
            return;
        }

        // 移除SCHEDULED状态,并标记RUNING
        match (*raw.header).state.compare_exchange_weak(
            state,
            (state & !SCHEDULED) | RUNNING,
            Ordering::AcqRel,
            Ordering::Acquire,
        ) {
            Ok(_) => {
            	// 更新state到新的状态,后面的代码还要复用state。
                state = (state & !SCHEDULED) | RUNNING;
                break;
            }
            Err(s) => state = s,
        }
    }

标记为RUNING状态后,就可以开始正式调用Future::poll了,不过在调用前设置Guard,以便poll函数panic时,可以调用Guard的drop函数保证状态一致。

    let guard = Guard(raw);
    let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
    mem::forget(guard); // 没panic,移除掉guard.drop的调用。

    match poll {
        Poll::Ready(out) => {
        	/// ...
        }
        Poll::Pending => {
            // ...
        }
    }

如果Future完成了,那么先把Future析构掉,腾出内存把output写进去。并循环尝试将RUNING状态去掉。

match poll {
    Poll::Ready(out) => {
        Self::drop_future(ptr);
        raw.output.write(out);

        let mut output = None;

        loop {
            // JoinHandle已经没了,那么output没人取,我们需要析构掉output,并设置为
            // closed状态。
            let new = if state & HANDLE == 0 {
                (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
            } else {
                (state & !RUNNING & !SCHEDULED) | COMPLETED
            };

            match (*raw.header).state.compare_exchange_weak(
                state,
                new,
                Ordering::AcqRel,
                Ordering::Acquire,
            ) {
                Ok(_) => {
                    // 如果handle没了,或者跑的时候closed了,那么需要把output再读取
                    // 出来析构掉。
                    if state & HANDLE == 0 || state & CLOSED != 0 {
                        output = Some(raw.output.read());
                    }

                    // 通知JoinHandle来取数据。
                    if state & AWAITER != 0 {
                        (*raw.header).notify();
                    }

                    Self::decrement(ptr);
                    break;
                }
                Err(s) => state = s,
            }
        }
        drop(output);
    }
    Poll::Pending => {
    	// ...
    }

如果没完成的话,循环尝试移除RUNING,同时在poll的时候其他线程不能调用shedule函数,而是设置SCHEDULED,所以需要检查这个flag,如果设置了,则需要代劳。

match poll {
    Poll::Ready(out) => {
    	/// handle ready case ...
    }
    Poll::Pending => {
        loop {
            // poll的时候closed了,这里为啥要移除SCHEDULED状态,暂时不清楚,需要问问
            // 作者。
            let new = if state & CLOSED != 0 {
                state & !RUNNING & !SCHEDULED
            } else {
                state & !RUNNING
            };

            match (*raw.header).state.compare_exchange_weak(
                state,
                new,
                Ordering::AcqRel,
                Ordering::Acquire,
            ) {
                Ok(state) => {
                    if state & CLOSED != 0 {
                        // 设置closed状态的那个线程是不能碰Future的,否则和当前线程
                        // 产生内存并发访问冲突。因此代劳析构操作。
                        Self::drop_future(ptr);

                        Self::decrement(ptr);
                    } else if state & SCHEDULED != 0 {
                        // poll的时候其他线程想schedule这个task,但是不能调用,因此
                        // 当前线程代劳。chedule函数接收self,类似move语义,因此这里
                        // 不需要decrement。
                        Self::schedule(ptr);
                    } else {
                        Self::decrement(ptr);
                    }
                    break;
                }
                Err(s) => state = s,
            }
        }
    }
}

在poll时如果发生panic,则Guard负责收拾残局。

fn drop(&mut self) {
    let raw = self.0;
    let ptr = raw.header as *const ();

    unsafe {
        let mut state = (*raw.header).state.load(Ordering::Acquire);

        loop {
            // poll的时候被其他线程closed了,
            if state & CLOSED != 0 {
                // 看代码state一旦处于CLOSED后,schedule不会再运行。这里为啥要移除
                // SCHEDULED状态,暂时不清楚,需要问问作者。
                (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);

                // 析构Future
                RawTask::<F, R, S, T>::drop_future(ptr);
                RawTask::<F, R, S, T>::decrement(ptr);
                break;
            }
         
            match (*raw.header).state.compare_exchange_weak(
                state,
                (state & !RUNNING & !SCHEDULED) | CLOSED,
                Ordering::AcqRel,
                Ordering::Acquire,
            ) {
                Ok(state) => {
                    // 析构Future
                    RawTask::<F, R, S, T>::drop_future(ptr);

                    // 通知awaitertask已经close了.
                    if state & AWAITER != 0 {
                        (*raw.header).notify();
                    }

                    RawTask::<F, R, S, T>::decrement(ptr);
                    break;
                }
                Err(s) => state = s,
            }
        }
    }
}

Waker相关函数的实现

wake函数

wake函数主要功能是设置SCHEDULE状态,并尝试调用schedule函数,有两个重要的细节需要注意:

  1. task正在执行时不能调用schedule函数;
  2. 当task已经被schedule过了时,也需要额外做一次原子操作,施加Release语义。
unsafe fn wake(ptr: *const ()) {
    let raw = Self::from_ptr(ptr);

    let mut state = (*raw.header).state.load(Ordering::Acquire);

    loop {
        if state & (COMPLETED | CLOSED) != 0 {
            // 如果task完成或者close了,直接drop掉自己,wake的参数是self语义
            Self::decrement(ptr);
            break;
        }

        if state & SCHEDULED != 0 {
            // 这段代码极为关键,如果task已经schedule过了,则重新把读出来的state
            // 设置回去,虽然看起来好像是无用的,其实是为了施加Release同步语义,
            // 把当前线程的内存视图同步到其他线程去。即便是rust标准库,之前也因为
            // 没处理好类似这个情况出过bug。
            match (*raw.header).state.compare_exchange_weak(
                state,
                state,
                Ordering::AcqRel,
                Ordering::Acquire,
            ) {
                Ok(_) => {
                    Self::decrement(ptr);
                    break;
                }
                Err(s) => state = s,
            }
        } else {
            // task没schedule过,则设置状态。
            match (*raw.header).state.compare_exchange_weak(
                state,
                state | SCHEDULED,
                Ordering::AcqRel,
                Ordering::Acquire,
            ) {
                Ok(_) => {
                    // 如果task当前没有运行,那么可以调用schedule函数。
                    if state & (SCHEDULED | RUNNING) == 0 {
                        // Schedule the task.
                        let task = Task {
                            raw_task: NonNull::new_unchecked(ptr as *mut ()),
                            _marker: PhantomData,
                        };
                        (*raw.schedule)(task);
                    } else {
                        // task正在运行,不需要调用schedule,等运行结束后对应的
                        // 线程会代劳。
                        Self::decrement(ptr);
                    }

                    break;
                }
                Err(s) => state = s,
            }
        }
    }
}

wake_by_ref

这个函数的功能和wake类似,唯一的区别就是wake的参数是self,有move语义,wakebyref是&self。实现差异不大,就不做具体分析了。

clone_waker

waker的clone实现也比较简单,直接将Header里的state的引用计数加一即可。

unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
    let raw = Self::from_ptr(ptr);
    let raw_waker = &(*raw.header).vtable.raw_waker;

    let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);

    if state > isize::max_value() as usize {
        std::process::abort();
    }

    RawWaker::new(ptr, raw_waker)
}

总结

整个task的设计非常精细,api也非常直观,难怪一发布就直接上1.0版本。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Rust语言学习交流 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 使用方式
  • Task的结构图
  • task中的状态
  • JoinHandle的实现分析
    • JoinHandle::cancel
      • JoinHandle::drop
        • JoinHandle::poll
        • Task的实现分析
          • Task::schedule
            • Task::run
            • Waker相关函数的实现
              • wake函数
                • wake_by_ref
                  • clone_waker
                  • 总结
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档