前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rust Async: futures-timer源码解析

Rust Async: futures-timer源码解析

作者头像
MikeLoveRust
发布2019-08-19 11:54:10
1.4K0
发布2019-08-19 11:54:10
举报

本文转载自:https://zhuanlan.zhihu.com/p/78036342

已获作者授权。

之前的文章已经提到Future,Executor和Reactor是rust异步的三个基本组件。由于io相关的Reactor涉及到操作系统的api,分析Future和Reactor 之间的交互过程往往没那么直观。futures-timer是rust官方提供一个纯用户态实现的定时器基础库,本身代码精炼,通过源码的阅读分析,可以学习到:

  1. 学习Reactor,以及和Future的交互逻辑;
  2. 学习thread::park/unpark的使用;
  3. 学习一个简单的并发链表设计;
  4. 干净地进行资源的释放;

futures-timer对外提供了Delay和Interval,以及充当Reactor角色的Timer,默认情况下会启动一个后台线程运行全局的Timer,高级用户也可以自己创建管理Timer。借助于Delay支持时间重设的特性,Interval只是Delay的一个封装,因此不做论述。

futures-timer 整体架构图

定时器超时的管理一般有红黑树,最小堆和旋转时间轮等数据结构。futures-timer采用的是最小堆,由Timer结构维护。由于设计一个并发的最小堆难度较大,因此 Timer结构有两部分组成,一个是管理定时器超时的堆数据结构,是Timer的私有数据,不能并发访问;一个是用于和外部交互的并发链表,作为存放外部请求的待处理队列。Timer把并发链表包装成TimerHandle,供外部创建和修改定时器。主要的数据结构定义如下:

代码语言:javascript
复制
pub struct Timer {
    inner: Arc<Inner>,
    timer_heap: Heap<HeapTimer>,
}

pub struct TimerHandle {
    inner: Weak<Inner>,
}

struct Inner {
// 保存刚创建或者更新的定时器队列,等待Timer进行处理
    list: ArcList<ScheduledTimer>,
    // Timer一般在一个死循环里,没消息时处于睡眠状态,
    // 当上面的list更新后通过这个waker把Timer唤醒让其继续干活
    waker: AtomicWaker,
}

pub struct Delay {
    state: Option<Arc<Node<ScheduledTimer>>>, // 为None时表示定时器无效
    when: Instant,
}

/// Delay和Timer之间共享的数据
struct ScheduledTimer {
    waker: AtomicWaker, // 用于Timer给Delay发送通知
// 第一个bit标记是否定时器是否已经触发,第二个bit标记是否定时器无效,
// 其他bits标记定时器被重置的次数,每重置一次加1。只有和HeapTimer里保存的一致时才会触发超时通知。
    state: AtomicUsize,

    inner: Weak<Inner>,
    at: Mutex<Option<Instant>>,
    slot: Mutex<Option<Slot>>,
}

Delay的实现

构造

根据上面的架构图,实现是比较显然的, 构造好共享的ScheduledTimer状态,通过TimerHandle插入Timer的list,并唤醒Timer工作。

代码语言:javascript
复制
impl Delay {
	// 创建过程
    pub fn new_handle(at: Instant, handle: TimerHandle) -> Delay {
    	// 尝试把Weak升级到Arc,失败表示Timer已经销毁了
        let inner = match handle.inner.upgrade() {
            Some(i) => i,
            None => {
                return Delay {
                    state: None,
                    when: at,
                }
            }
        };
        // 初始化好内部状态
        let state = Arc::new(Node::new(ScheduledTimer {
            at: Mutex::new(Some(at)),
            state: AtomicUsize::new(0),
            waker: AtomicWaker::new(),
            inner: handle.inner,
            slot: Mutex::new(None),
        }));

        // 尝试插入待处理队列,如果失败表示链表已经封锁,Timer处于析构过程中。
        if inner.list.push(&state).is_err() {
            return Delay {
                state: None,
                when: at,
            };
        }
        // 添加成功,唤醒Timer继续干活,处理队列
        inner.waker.wake();
        Delay {
            state: Some(state),
            when: at,
        }
    }
}

重置超时时间

Delay的时间重置是整个代码复杂度的主要来源,在重置时,内部的ScheduledTimer可能存在于Timer的list中(Delay刚创建完,Timer还没来得及处理), 也可能存在于Timer的Heap中(Delay创建后,已被Timer处理放进堆里进行调度)。当在Heap上的时候,按正常流程是应该先把他从Heap上移除掉,再更新时间重新插入, 但是前面已经提到,Heap是不支持并发操作的。为了解决这个问题,ScheduledTimer的state字段的高bit位保存了一个计数器,初始为0, 每次reset的时候递增。同时Timer在把Delay插入Heap的时候也保存了当时的计数器。当计数器超时的时候,会把Heap里保存的和当前的计数器进行比较,如果不一致,表示在插入Heap之后被重置过。

代码语言:javascript
复制
 fn _reset(&mut self, at: Instant) -> Result<(), ()> {
        let state = match self.state {
            Some(ref state) => state,
            None => return Err(()), // 这种情况表示在Delay构建的时候,Timer已经挂掉了
        };
        if let Some(timeouts) = state.inner.upgrade() {
            let mut bits = state.state.load(SeqCst);
            loop {
                // 定时器无效,直接返回
                if bits & 0b10 != 0 {
                    return Err(());
                }
                // 递增计数器,并更新到state里
                let new = bits.wrapping_add(0b100) & !0b11;
                match state.state.compare_exchange(bits, new, SeqCst, SeqCst) {
                    Ok(_) => break,
                    Err(s) => bits = s,
                }
            }
            *state.at.lock().unwrap() = Some(at);
            // 插入list,并唤醒Timer
            timeouts.list.push(state)?;
            timeouts.waker.wake();
        }

        Ok(())
    }

析构

理论上Delay的析构函数不作任何处理也是没有问题的,等到了超时时间Timer会自动将其移出Heap。为了避免一直占用Heap,可以通过TimerHandle给Timer发送消息,使其尽快清理掉无效的Delay。

代码语言:javascript
复制
fn drop(&mut self) {
        let state = match self.state {
            Some(ref s) => s,
            None => return,
        };
        if let Some(timeouts) = state.inner.upgrade() {
        	// 超时时间设为None,Timer收到消息后,会将该Delay立刻从Heap中移除
            *state.at.lock().unwrap() = None;
            if timeouts.list.push(state).is_ok() {
                timeouts.waker.wake();
            }
        }
    }

实现Future Trait

代码语言:javascript
复制
impl Future for Delay {
    type Output = io::Result<()>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let state = match self.state {
            Some(ref state) => state,
            None => {
                let err = Err(io::Error::new(io::ErrorKind::Other, "timer has gone away"));
                return Poll::Ready(err);
            }
        };
//先快速地直接尝试下是否超时,这行代码没有也是ok的
        if state.state.load(SeqCst) & 1 != 0 {
            return Poll::Ready(Ok(()));
        }

// 注册好Executor传下来的waker,这样后面到了超时时Timer可以通知Executor再次poll
        state.waker.register(&cx.waker());

// 进行正式的检查
        match state.state.load(SeqCst) {
            n if n & 0b01 != 0 => Poll::Ready(Ok(())),
            n if n & 0b10 != 0 => Poll::Ready(Err(io::Error::new(
                io::ErrorKind::Other,
                "timer has gone away",
            ))),
            _ => Poll::Pending,
        }
    }
}

Timer的实现

定时器超时处理

根据前面的分析,这部分已经比较明显了,从Heap里面取出超时的定时器,如果重置计数器匹配,就发通知(Delay在poll的时候已经注册了waker)。

代码语言:javascript
复制
impl Timer {

    pub fn advance_to(&mut self, now: Instant) {
        loop {
            match self.timer_heap.peek() {
                Some(head) if head.at <= now => {}
                Some(_) => break,
                None => break,
            };

            // Flag the timer as fired and then notify its task, if any, that's
            // blocked.
            let heap_timer = self.timer_heap.pop().unwrap();
            *heap_timer.node.slot.lock().unwrap() = None;
            let bits = heap_timer.gen << 2;
            match heap_timer
                .node
                .state
                .compare_exchange(bits, bits | 0b01, SeqCst, SeqCst)
            {
                Ok(_) => heap_timer.node.waker.wake(),
                Err(_b) => {}
            }
        }
    }
}

析构

Timer析构过程中,外部依然可以通过TimerHandle访问并发链表,所以需要小心处理。处理方式是先封锁掉链表,防止后续的插入动作,然后将链表和Heap中现有的元素全部设置为失效,并发送通知。

代码语言:javascript
复制
fn drop(&mut self) {
    let mut list = self.inner.list.take_and_seal();
    while let Some(t) = list.pop() {
        self.invalidate(t);
    }
    while let Some(t) = self.timer_heap.pop() {
        self.invalidate(t.node);
    }
}

fn invalidate(&mut self, node: Arc<Node<ScheduledTimer>>) {
    node.state.fetch_or(0b10, SeqCst);
    node.waker.wake();
}

Future trait的实现

为了方便与其他库的集成,futures-timer把对Timer的唤醒抽离出来,把Timer实现为Future,这样可以把Timer集成到Executor中执行,也可以用一个线程单独跑。Future的实现主要工作就是注册waker,这样外面对链表更新的时候可以唤醒Timer,然后依次把链表元素取出进行处理。

代码语言:javascript
复制
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.inner.waker.register(cx.waker());
        let mut list = self.inner.list.take();
        while let Some(node) = list.pop() {
            let at = *node.at.lock().unwrap();
            match at {
                Some(at) => self.update_or_add(at, node),
                None => self.remove(node), //根据前面分析为None时,表示Delay析构了
            }
        }
        Poll::Pending
    }

后台Reactor运行Timer的原理

根据上面的架构图,不难想到后台的Reactor主要就是一个大循环里不停地从list里取出请求进行处理,然后看看Heap里有没有超时的,没有就等待。

代码语言:javascript
复制
fn run(mut timer: Timer, done: Arc<AtomicBool>) {
    // 这个waker调用的时候会调用thread::unpark唤醒当前线程
    let mut waker = current_thread_waker();
    let mut cx = Context::from_waker(&mut waker);

    while !done.load(Ordering::SeqCst) {
        // 处理现有的链表里的请求,并注册好waker
        drop(timer.poll_unpin(&mut cx));

        timer.advance(); // 通知所有超时的定时器
        match timer.next_event() {
            // heap里还有没触发的定时器,最早的将在when时触发
            Some(when) => {
                let now = Instant::now();
                if now < when {
                    // 时间没到,让线程阻塞
                    thread::park_timeout(when - now)
                } else {
                    // 上面的advance调用比较耗时可能存在这种情况,调用完又有超时的
                    // .. continue...
                }
            }
            // heap里已经没有定时器了,那就一直阻塞,直到外部往list里面插入元素唤醒
            None => thread::park(),
        }
    }
}

注意这段代码能够工作有一个关于thread::park的非常重要的细节:其他线程在先调用thread::unpark,然后reactor线程调用thread::park时,不会阻塞。这样在调用timer.poll之后,如果外部调用了unpark,那么thread::park能够立即返回,重新进行一轮循环,否则整个系统就可能因为丢失通知而全部卡死。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Rust语言学习交流 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • futures-timer 整体架构图
  • Delay的实现
    • 构造
      • 重置超时时间
        • 析构
          • 实现Future Trait
          • Timer的实现
            • 定时器超时处理
              • 析构
                • Future trait的实现
                  • 后台Reactor运行Timer的原理
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档