前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rust异步编程之Future初探

Rust异步编程之Future初探

作者头像
newbmiao
发布2024-01-10 14:25:56
3870
发布2024-01-10 14:25:56
举报
文章被收录于专栏:学点Rust学点Rust

RustFuture是用来实现异步编程的。今天我们围绕其了解下Rust的异步编程是如何构建。

Rustasync就能轻松创建开销很小的可异步执行的函数,在await时其才会被调度执行。

其比较轻量级,有别于异步多线程,依托在操作系统线程之上,构建大量并发则需要大量的线程资源,对资源的消耗比较大。

比如下边用async构建异步任务:

代码语言:javascript
复制
async fn async_fn() {
    // handle async logic
}

#[tokio::main]
async fn main() {
    async_fn().await
}

文章目录

  • 状态机
  • 调度
  • 运行时
  • async
  • pin

状态机

async其实也是帮你自动实现了下边的Future trait,用结构体维护了一个状态机

代码语言:javascript
复制
trait Future {
    type Output;
    fn poll(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}

Future定义一个poll方法,可以查询异步任务状态。对于异步任务,有PendingReady两种状态,Pending时会让出控制,等待可以处理时再被唤醒继续处理,如此重复,直到Ready

我们来尝试通过实现一个DelayFuture了解这个状态流转的过程

代码语言:javascript
复制
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> {
        if Instant::now() >= self.when {
            Poll::Ready("done")
        } else {
            // 还未ready,注册下一次唤醒
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(3);
    let future = Delay { when };

    let out = future.await;
    assert_eq!(out, "done");
}

Delay每次poll时会检查,时间是否满足,满足则Ready,否则 schedule 下一次执行并返回Pending

状态机是有了,Future怎么调度呢?

调度

Rust需要运行时runtime来调度异步任务taskruntime负责调度,检查future的状态。

调度一般在Pending时会交出task的控制,并schedule下一次什么时候唤醒(wake)。

流程处理展开来说,常规Ready处理:

代码语言:javascript
复制

Pending时, future要被schedule下一次唤醒,而每次唤醒可能不会都是在同一个task上执行。这里用于唤醒的waker会在每次poll时以context传递下去,

代码语言:javascript
复制

运行时

了解了调度,我们再展开说下运行时。rust的运行时没在标准库中实现,需要依赖第三方的运行时,常用的有tokio

就比如如下的tokio宏实际是添加了一个多线程(multi thread)的运行时,会阻塞当前线程直到异步任务完成。

代码语言:javascript
复制
#[tokio::main]
async fn main() {
    println!("hello");
}

// tranform to
fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            println!("Hello world");
        })
}

当然也可以用单线程的运行时(current thread

代码语言:javascript
复制
#[tokio::main(flavor = "current_thread")]
async fn main() {
    println!("Hello world");
}
// tranform to
fn main() {
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            println!("Hello world");
        })
}

async

其实一般很少直接去实现Future trait, 直接使用async去自动实现Future trait就足够了。上边Delay完全可以这么实现,简洁且高效

代码语言:javascript
复制
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use tokio::sync::Notify;

async fn delay(dur: Duration) {
    let when = Instant::now() + dur;
    let notify = Arc::new(Notify::new());
    let notify_clone = notify.clone();

    thread::spawn(move || {
        let now = Instant::now();

        if now < when {
            thread::sleep(when - now);
        }

        notify_clone.notify_one();
    });

    notify.notified().await;
}

#[tokio::main]
async fn main() {
    delay(Duration::from_secs(1)).await;
}

pin

还记得future trait上参数有个Pin<&mut Self>, 为什么要Pin future的引用?

来看下边一段代码:

代码语言:javascript
复制
async fn my_async_fn() {
    // async logic here
}

#[tokio::main]
async fn main() {
    let mut future = my_async_fn();
    (&mut future).await;
    // error:
    // within `impl Future<Output = ()>`, the trait `Unpin` is not implemented for `[async fn body@src/main.rs:1:24: 3:2]`
}

当尝试执行一个异步函数的引用时,编译器会报错要求其是Unpin trait

为什么呢?

future本质是一个封装的状态机结构体,调度时会被移动,如果其包含引用,引用的地址要能保证生命周期至少在其完成前还存活,不然就会出现引用一个已失效的地址。

所以 Rust 引入了Unpin trait。这个Unpin是代表其不需要固定地址,可以安全引用。

常规的类型一般都是实现了的。对于未实现的!Unpin类型,一般可以将其Box::pin到堆上或用宏pin!到栈上来确保其地址在future移动期间是有效的。

代码如:

代码语言:javascript
复制
use tokio::pin;

async fn my_async_fn() {
    // async logic here
}

#[tokio::main]
async fn main() {
    let future = my_async_fn();
    // option 1
    pin!(future);
    (&mut future).await;

    // option 2
    // let pinned_fut = Box::pin(future);
    // pinned_fut.await;
}

好了,今天就聊到这里,下一篇我们再聊聊多个异步同时怎么处理。

Pin感兴趣可以看看官方更详细的文档:Pinning[1]

异步编程更深入了解的话也推荐看下 tokio 的这篇:Async in depth[2]

参考资料

[1]

Pinning: https://rust-lang.github.io/async-book/04_pinning/01_chapter.html

[2]

Async in depth: https://tokio.rs/tokio/tutorial/async

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-01-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 菜鸟Miao 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 状态机
  • 调度
  • 运行时
  • async
  • pin
    • 参考资料
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档