Rust Async: futures-timer源码解析

本文转载自:https://zhuanlan.zhihu.com/p/78036342

已获作者授权。

之前的文章已经提到Future,Executor和Reactor是rust异步的三个基本组件。由于io相关的Reactor涉及到操作系统的api,分析Future和Reactor 之间的交互过程往往没那么直观。futures-timer是rust官方提供一个纯用户态实现的定时器基础库,本身代码精炼,通过源码的阅读分析,可以学习到:

  1. 学习Reactor,以及和Future的交互逻辑;
  2. 学习thread::park/unpark的使用;
  3. 学习一个简单的并发链表设计;
  4. 干净地进行资源的释放;

futures-timer对外提供了Delay和Interval,以及充当Reactor角色的Timer,默认情况下会启动一个后台线程运行全局的Timer,高级用户也可以自己创建管理Timer。借助于Delay支持时间重设的特性,Interval只是Delay的一个封装,因此不做论述。

futures-timer 整体架构图

定时器超时的管理一般有红黑树,最小堆和旋转时间轮等数据结构。futures-timer采用的是最小堆,由Timer结构维护。由于设计一个并发的最小堆难度较大,因此 Timer结构有两部分组成,一个是管理定时器超时的堆数据结构,是Timer的私有数据,不能并发访问;一个是用于和外部交互的并发链表,作为存放外部请求的待处理队列。Timer把并发链表包装成TimerHandle,供外部创建和修改定时器。主要的数据结构定义如下:

pub struct Timer {
    inner: Arc<Inner>,
    timer_heap: Heap<HeapTimer>,
}

pub struct TimerHandle {
    inner: Weak<Inner>,
}

struct Inner {
// 保存刚创建或者更新的定时器队列,等待Timer进行处理
    list: ArcList<ScheduledTimer>,
    // Timer一般在一个死循环里,没消息时处于睡眠状态,
    // 当上面的list更新后通过这个waker把Timer唤醒让其继续干活
    waker: AtomicWaker,
}

pub struct Delay {
    state: Option<Arc<Node<ScheduledTimer>>>, // 为None时表示定时器无效
    when: Instant,
}

/// Delay和Timer之间共享的数据
struct ScheduledTimer {
    waker: AtomicWaker, // 用于Timer给Delay发送通知
// 第一个bit标记是否定时器是否已经触发,第二个bit标记是否定时器无效,
// 其他bits标记定时器被重置的次数,每重置一次加1。只有和HeapTimer里保存的一致时才会触发超时通知。
    state: AtomicUsize,

    inner: Weak<Inner>,
    at: Mutex<Option<Instant>>,
    slot: Mutex<Option<Slot>>,
}

Delay的实现

构造

根据上面的架构图,实现是比较显然的, 构造好共享的ScheduledTimer状态,通过TimerHandle插入Timer的list,并唤醒Timer工作。

impl Delay {
	// 创建过程
    pub fn new_handle(at: Instant, handle: TimerHandle) -> Delay {
    	// 尝试把Weak升级到Arc,失败表示Timer已经销毁了
        let inner = match handle.inner.upgrade() {
            Some(i) => i,
            None => {
                return Delay {
                    state: None,
                    when: at,
                }
            }
        };
        // 初始化好内部状态
        let state = Arc::new(Node::new(ScheduledTimer {
            at: Mutex::new(Some(at)),
            state: AtomicUsize::new(0),
            waker: AtomicWaker::new(),
            inner: handle.inner,
            slot: Mutex::new(None),
        }));

        // 尝试插入待处理队列,如果失败表示链表已经封锁,Timer处于析构过程中。
        if inner.list.push(&state).is_err() {
            return Delay {
                state: None,
                when: at,
            };
        }
        // 添加成功,唤醒Timer继续干活,处理队列
        inner.waker.wake();
        Delay {
            state: Some(state),
            when: at,
        }
    }
}

重置超时时间

Delay的时间重置是整个代码复杂度的主要来源,在重置时,内部的ScheduledTimer可能存在于Timer的list中(Delay刚创建完,Timer还没来得及处理), 也可能存在于Timer的Heap中(Delay创建后,已被Timer处理放进堆里进行调度)。当在Heap上的时候,按正常流程是应该先把他从Heap上移除掉,再更新时间重新插入, 但是前面已经提到,Heap是不支持并发操作的。为了解决这个问题,ScheduledTimer的state字段的高bit位保存了一个计数器,初始为0, 每次reset的时候递增。同时Timer在把Delay插入Heap的时候也保存了当时的计数器。当计数器超时的时候,会把Heap里保存的和当前的计数器进行比较,如果不一致,表示在插入Heap之后被重置过。

 fn _reset(&mut self, at: Instant) -> Result<(), ()> {
        let state = match self.state {
            Some(ref state) => state,
            None => return Err(()), // 这种情况表示在Delay构建的时候,Timer已经挂掉了
        };
        if let Some(timeouts) = state.inner.upgrade() {
            let mut bits = state.state.load(SeqCst);
            loop {
                // 定时器无效,直接返回
                if bits & 0b10 != 0 {
                    return Err(());
                }
                // 递增计数器,并更新到state里
                let new = bits.wrapping_add(0b100) & !0b11;
                match state.state.compare_exchange(bits, new, SeqCst, SeqCst) {
                    Ok(_) => break,
                    Err(s) => bits = s,
                }
            }
            *state.at.lock().unwrap() = Some(at);
            // 插入list,并唤醒Timer
            timeouts.list.push(state)?;
            timeouts.waker.wake();
        }

        Ok(())
    }

析构

理论上Delay的析构函数不作任何处理也是没有问题的,等到了超时时间Timer会自动将其移出Heap。为了避免一直占用Heap,可以通过TimerHandle给Timer发送消息,使其尽快清理掉无效的Delay。

fn drop(&mut self) {
        let state = match self.state {
            Some(ref s) => s,
            None => return,
        };
        if let Some(timeouts) = state.inner.upgrade() {
        	// 超时时间设为None,Timer收到消息后,会将该Delay立刻从Heap中移除
            *state.at.lock().unwrap() = None;
            if timeouts.list.push(state).is_ok() {
                timeouts.waker.wake();
            }
        }
    }

实现Future Trait

impl Future for Delay {
    type Output = io::Result<()>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let state = match self.state {
            Some(ref state) => state,
            None => {
                let err = Err(io::Error::new(io::ErrorKind::Other, "timer has gone away"));
                return Poll::Ready(err);
            }
        };
//先快速地直接尝试下是否超时,这行代码没有也是ok的
        if state.state.load(SeqCst) & 1 != 0 {
            return Poll::Ready(Ok(()));
        }

// 注册好Executor传下来的waker,这样后面到了超时时Timer可以通知Executor再次poll
        state.waker.register(&cx.waker());

// 进行正式的检查
        match state.state.load(SeqCst) {
            n if n & 0b01 != 0 => Poll::Ready(Ok(())),
            n if n & 0b10 != 0 => Poll::Ready(Err(io::Error::new(
                io::ErrorKind::Other,
                "timer has gone away",
            ))),
            _ => Poll::Pending,
        }
    }
}

Timer的实现

定时器超时处理

根据前面的分析,这部分已经比较明显了,从Heap里面取出超时的定时器,如果重置计数器匹配,就发通知(Delay在poll的时候已经注册了waker)。

impl Timer {

    pub fn advance_to(&mut self, now: Instant) {
        loop {
            match self.timer_heap.peek() {
                Some(head) if head.at <= now => {}
                Some(_) => break,
                None => break,
            };

            // Flag the timer as fired and then notify its task, if any, that's
            // blocked.
            let heap_timer = self.timer_heap.pop().unwrap();
            *heap_timer.node.slot.lock().unwrap() = None;
            let bits = heap_timer.gen << 2;
            match heap_timer
                .node
                .state
                .compare_exchange(bits, bits | 0b01, SeqCst, SeqCst)
            {
                Ok(_) => heap_timer.node.waker.wake(),
                Err(_b) => {}
            }
        }
    }
}

析构

Timer析构过程中,外部依然可以通过TimerHandle访问并发链表,所以需要小心处理。处理方式是先封锁掉链表,防止后续的插入动作,然后将链表和Heap中现有的元素全部设置为失效,并发送通知。

fn drop(&mut self) {
    let mut list = self.inner.list.take_and_seal();
    while let Some(t) = list.pop() {
        self.invalidate(t);
    }
    while let Some(t) = self.timer_heap.pop() {
        self.invalidate(t.node);
    }
}

fn invalidate(&mut self, node: Arc<Node<ScheduledTimer>>) {
    node.state.fetch_or(0b10, SeqCst);
    node.waker.wake();
}

Future trait的实现

为了方便与其他库的集成,futures-timer把对Timer的唤醒抽离出来,把Timer实现为Future,这样可以把Timer集成到Executor中执行,也可以用一个线程单独跑。Future的实现主要工作就是注册waker,这样外面对链表更新的时候可以唤醒Timer,然后依次把链表元素取出进行处理。

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.inner.waker.register(cx.waker());
        let mut list = self.inner.list.take();
        while let Some(node) = list.pop() {
            let at = *node.at.lock().unwrap();
            match at {
                Some(at) => self.update_or_add(at, node),
                None => self.remove(node), //根据前面分析为None时,表示Delay析构了
            }
        }
        Poll::Pending
    }

后台Reactor运行Timer的原理

根据上面的架构图,不难想到后台的Reactor主要就是一个大循环里不停地从list里取出请求进行处理,然后看看Heap里有没有超时的,没有就等待。

fn run(mut timer: Timer, done: Arc<AtomicBool>) {
    // 这个waker调用的时候会调用thread::unpark唤醒当前线程
    let mut waker = current_thread_waker();
    let mut cx = Context::from_waker(&mut waker);

    while !done.load(Ordering::SeqCst) {
        // 处理现有的链表里的请求,并注册好waker
        drop(timer.poll_unpin(&mut cx));

        timer.advance(); // 通知所有超时的定时器
        match timer.next_event() {
            // heap里还有没触发的定时器,最早的将在when时触发
            Some(when) => {
                let now = Instant::now();
                if now < when {
                    // 时间没到,让线程阻塞
                    thread::park_timeout(when - now)
                } else {
                    // 上面的advance调用比较耗时可能存在这种情况,调用完又有超时的
                    // .. continue...
                }
            }
            // heap里已经没有定时器了,那就一直阻塞,直到外部往list里面插入元素唤醒
            None => thread::park(),
        }
    }
}

注意这段代码能够工作有一个关于thread::park的非常重要的细节:其他线程在先调用thread::unpark,然后reactor线程调用thread::park时,不会阻塞。这样在调用timer.poll之后,如果外部调用了unpark,那么thread::park能够立即返回,重新进行一轮循环,否则整个系统就可能因为丢失通知而全部卡死。

原文发布于微信公众号 - Rust语言学习交流(rust-china)

原文发表时间:2019-08-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券