专栏首页Rust语言学习交流【原创】Rust tokio 如何以异步非阻塞方式运行大量任务

【原创】Rust tokio 如何以异步非阻塞方式运行大量任务

tokio 官方给了一个完整的例子:手动构建 runtime ,利用 block_on 来运行多个任务。tokio 的任务是由 tokio::spawn 之类的函数产生的 JoinHandle 类型,而且是个 Future

而下面利用 #[tokio::main] 和 await 编写了等价的版本(为了直观对比任务完成的实际顺序和总耗时,我对 sleep 的时间做了一些简化):

use std::time::Instant;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let now = Instant::now();

    let mut handles = Vec::with_capacity(10);
    for i in 0..10 {
        handles.push(tokio::spawn(my_bg_task(i)));
    }

    // Do something time-consuming while the background tasks execute.
    std::thread::sleep(Duration::from_millis(120));
    println!("Finished time-consuming task.");

    // Wait for all of them to complete.
    for handle in handles {
        handle.await?;
    }

    println!("总耗时:{} ms", now.elapsed().as_millis());
    Ok(())
}

async fn my_bg_task(i: u64) {
    let millis = 100;
    println!("Task {} sleeping for {} ms.", i, millis);
    sleep(Duration::from_millis(millis)).await;
    println!("Task {} stopping.", i);
}

输出结果:

Task 0 sleeping for 100 ms.
Task 1 sleeping for 100 ms.
Task 2 sleeping for 100 ms.
Task 3 sleeping for 100 ms.
Task 4 sleeping for 100 ms.
Task 5 sleeping for 100 ms.
Task 6 sleeping for 100 ms.
Task 7 sleeping for 100 ms.
Task 8 sleeping for 100 ms.
Task 9 sleeping for 100 ms.
Task 9 stopping.
Task 0 stopping.
Task 1 stopping.
Task 2 stopping.
Task 3 stopping.
Task 4 stopping.
Task 5 stopping.
Task 6 stopping.
Task 7 stopping.
Task 8 stopping.
Finished time-consuming task.
总耗时:120 ms

如果把主线程的的 sleep 时间改成 100 ms:std::thread::sleep(Duration::from_millis(100)); 则产生下面的结果:

Task 0 sleeping for 100 ms.
Task 1 sleeping for 100 ms.
Task 2 sleeping for 100 ms.
Task 3 sleeping for 100 ms.
Task 4 sleeping for 100 ms.
Task 5 sleeping for 100 ms.
Task 6 sleeping for 100 ms.
Task 7 sleeping for 100 ms.
Task 8 sleeping for 100 ms.
Task 9 sleeping for 100 ms.
Finished time-consuming task.
Task 3 stopping.
Task 0 stopping.
Task 1 stopping.
Task 2 stopping.
Task 9 stopping.
Task 4 stopping.
Task 5 stopping.
Task 6 stopping.
Task 7 stopping.
Task 8 stopping.
总耗时:103 ms

可以看到,my_bg_task 实际是异步非阻塞执行的 👍 :

  • 异步:因为每个任务不必等待其结果就可以开始下一个任务,即;
// 异步
Task 0 sleeping for 100 ms.
Task 1 sleeping for 100 ms.
...

// 同步
Task 0 sleeping for 100 ms.
Task 0 stopping.
Task 1 sleeping for 100 ms.
Task 1 stopping.
...
  • 非阻塞:每个任务之间可以快速切换,不必等待其他任务完成才切换,这个例子表现在:
    • 任务 0-9 以乱序方式 stop
    • Finished time-consuming task.Task x stopping. 的打印顺序只与任务各自的运行 (sleep) 时间有关,与源代码的声明执行顺序无关。只有任务之间快速切换才能做到这一点。回顾官网的例子:10 个任务的 sleep 时间线性递减 (let millis = 1000 - 50 * i;),从 6 个任务开始小于主线程 sleep 任务的时间(750 ms),而等待 10 个任务执行的语句 for handle in handles { ... } 显然位于 std::thread::sleep 之后,所以任务之间非阻塞执行的话,打印结果为 sleep 时间越短的任务先完成,时间越长的任务后完成,总耗时为任务中的最长耗时:
Task 0 sleeping for 1000 ms.
Task 1 sleeping for 950 ms.
Task 2 sleeping for 900 ms.
Task 3 sleeping for 850 ms.
Task 4 sleeping for 800 ms.
Task 5 sleeping for 750 ms.
Task 6 sleeping for 700 ms.
Task 7 sleeping for 650 ms.
Task 8 sleeping for 600 ms.
Task 9 sleeping for 550 ms.
Task 9 stopping.
Task 8 stopping.
Task 7 stopping.
Task 6 stopping.
Finished time-consuming task.
Task 5 stopping.
Task 4 stopping.
Task 3 stopping.
Task 2 stopping.
Task 1 stopping.
Task 0 stopping.
总耗时:1001 ms // 非常完美

一般情况下,对于 async block/fn 你至少有以下一些做法:

  1. 对 async block/fn 调用 .await 来等待结果;
  2. 对可列举的少数 Future 调用 join! 或者 select! 来同时等待多个结果 或者 等待多个分支的第一个结果;
  3. 对大量 Future 调用 join 或者 select 一类支持传入 Vec / iter 参数类型的函数,比如这个例子中的 for handle in handles { ... } 部分就可以改写成 futures::future::join_all(handles).await;
  4. 把 async block/fn 变成任务,然后调用 Runtime::block_on (等价地,对任务 await)来执行许多任务。

容易犯的错误是,希望异步非阻塞时,对所有 async block/fn 进行了 await,而没有进行任务化处理(即 把 Future 通过 spwan 函数转化成任务):

use std::time::Instant;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let now = Instant::now();

    let mut handles = Vec::with_capacity(10);
    for i in 0..10 {
        handles.push(my_bg_task(i)); // 没有把 Future 变成任务
    }

    std::thread::sleep(Duration::from_millis(120));
    println!("Finished time-consuming task.");

    for handle in handles {
        handle.await; // 而且每个 handle 必须执行完才能执行下一个 handle
    }
    println!("总耗时:{} ms", now.elapsed().as_millis());
}

async fn my_bg_task(i: u64) {
    let millis = 100;
    println!("Task {} sleeping for {} ms.", i, millis);
    sleep(Duration::from_millis(millis)).await;
    println!("Task {} stopping.", i);
}

运行结果:同步阻塞

Finished time-consuming task.
Task 0 sleeping for 100 ms.
Task 0 stopping.
Task 1 sleeping for 100 ms.
Task 1 stopping.
Task 2 sleeping for 100 ms.
Task 2 stopping.
Task 3 sleeping for 100 ms.
Task 3 stopping.
Task 4 sleeping for 100 ms.
Task 4 stopping.
Task 5 sleeping for 100 ms.
Task 5 stopping.
Task 6 sleeping for 100 ms.
Task 6 stopping.
Task 7 sleeping for 100 ms.
Task 7 stopping.
Task 8 sleeping for 100 ms.
Task 8 stopping.
Task 9 sleeping for 100 ms.
Task 9 stopping.
总耗时:1130 ms

或者像这样:

use std::time::Instant;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let now = Instant::now();

    let mut handles = Vec::with_capacity(10);
    for i in 0..10 {
        handles.push(my_bg_task(i)); // 没有把 Future 变成任务
    }

    std::thread::sleep(Duration::from_millis(120));
    println!("Finished time-consuming task.");

    futures::future::join_all(handles).await; // 但是 join_all 会等待所有 Future 并发执行完
    println!("总耗时:{} ms", now.elapsed().as_millis());
}

async fn my_bg_task(i: u64) {
    let millis = 100;
    println!("Task {} sleeping for {} ms.", i, millis);
    sleep(Duration::from_millis(millis)).await;
    println!("Task {} stopping.", i);
}

运行结果:异步阻塞

Finished time-consuming task.
Task 0 sleeping for 100 ms.
Task 1 sleeping for 100 ms.
Task 2 sleeping for 100 ms.
Task 3 sleeping for 100 ms.
Task 4 sleeping for 100 ms.
Task 5 sleeping for 100 ms.
Task 6 sleeping for 100 ms.
Task 7 sleeping for 100 ms.
Task 8 sleeping for 100 ms.
Task 9 sleeping for 100 ms.
Task 0 stopping.
Task 1 stopping.
Task 2 stopping.
Task 3 stopping.
Task 4 stopping.
Task 5 stopping.
Task 6 stopping.
Task 7 stopping.
Task 8 stopping.
Task 9 stopping.
总耗时:221 ms

P.S. 关于代码中 std::thread::sleeptokio::time::sleep 的区别,参考这篇文章 Async: What is blocking? (by Alice Ryhl) 。

本文分享自微信公众号 - Rust语言学习交流(rust-china),作者:苦瓜小仔

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-09-01

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Rust网络编程框架-Tokio进阶

    我们在上文《小朋友也能听懂的Rust网络编程框架知识-Tokio基础篇》对于Tokio的基础知识进行了一下初步的介绍,本文就对于Tokio的用法及原理进行进一步...

    beyondma
  • 【大家的项目】Rbatis - 制作 Rust 语言堪比 Mybatis 的异步 ORM 框架

    因为是复刻Java系的Mybatis,因此框架暂命名 Rbatis。小部分功能还在进行中。github链接https://github.com/rbatis/r...

    MikeLoveRust
  • 透过 rust 探索系统的本原:并发篇

    rust 是一门非常优秀的语言,我虽然没有特别正式介绍过 rust 本身,但其实已经写了好多篇跟 rust 相关的文章:

    tyrchen
  • 【译文】Rust futures: async fn中的thread::sleep和阻塞调用

    近来,关于Rust的futures和async/await如何工作(“blockers”,哈哈),我看到存在一些普遍的误解。很多新用户为async/await带...

    袁承兴
  • Rust 视界 | async-std 团队发布 Async Http 套件

    本文是对Yoshua Wuyts 博客文章的摘录,以及一些私人观点。原文地址:https://blog.yoshuawuyts.com/async-http/ ...

    MikeLoveRust
  • 【Rust日报】2020-04-27 - smol异步库,teleforking, traits

    这个运行环境库扩展了rust语言标准类库 并结合了异步机制,仅用1500行代码写成!请大家阅读文档 并参考提供的例子这样就可以开始自己动手写具有异步运行机制的程...

    MikeLoveRust
  • 【译文】Rust异步生态系统

    Rust目前仅提供编写异步代码最基础的能力。重要的是,标准库尚未提供执行器,任务,反应器,组合器以及底层I/O futures和特质。同时,社区提供的异步生态系...

    袁承兴
  • 【翻译】200行代码讲透RUST FUTURES (3)

    什么是Future? Future是一些将在未来完成的操作。 Rust中的异步实现基于轮询,每个异步任务分成三个阶段:

    MikeLoveRust
  • 那些必须要了解的Serverless时代的并发神器-Rust语言Tokio框架基础

    今天我们继续高并发的话题,传统的云计算技术,本质上都是基于虚拟机的,云平台可以将一些性能强劲的物理服务器,拆分成若干个虚拟机,提供给用户使用,但在互联网发展到今...

    beyondma
  • 【Rust每周一知】Rust 异步入门

    这是一篇博文翻译,略有删减,整理代码方便统一阅读,Github链接:https://github.com/lesterli/rust-practice/tree...

    MikeLoveRust
  • Rust网络编程框架-深入理解Tokio中的管道

    我们在上文《Rust网络编程框架-Tokio进阶》介绍了async/await和锁的基本用法,并完成了一个Server端的DEMO代码。本文继续来探讨这个话题。

    beyondma
  • 【Rust日报】2021-04-09 tokio 计划发布基于 io-uring 的新运行时

    tokio 今天发布了新的 RFC,提出了新的支持 io-uring 异步运行时的计划。

    MikeLoveRust
  • 深度分析:前端中的后端-实现篇

    当我有一个想法,并且这个想法很有意思,正好戳中我技能的盲区时,我便有一种强大的要将其实验一番的冲动。自从上周做一个「前端中的后端」的想法出炉后,这周我几乎寝食难...

    tyrchen
  • Linkerd最先进的Rust代理|Linkerd2-proxy

    部分由于Linkerd的性能数字和一流的安全审计报告,最近对Linkerd2-proxy(Linkerd使用的底层代理)的兴趣激增。作为一名Linkerd2维护...

    CNCF
  • 透过 Rust 探索系统的本原:网络篇

    如今所有的应用程序几乎都离不开网络。从应用开发的角度,绝大多数应用以及其后端系统都工作在应用层:

    tyrchen
  • TOKIO ASYNC&AWAIT 初探

    rust的async/await终于在万众瞩目之下稳定下来了,一起来尝尝鲜. 这篇文章主要是介绍基于tokio 0.2做一个服务程序员的小工具githubdns...

    MikeLoveRust
  • Rust 不适合开发 Web API

    Rust 是一门神奇的编程语言,有非常好的 CLI 工具,比如 ripgrep 和 exa。像 Cloudflare 这样的公司正在使用并鼓励人们写 Rust ...

    深度学习与Python
  • 浅谈Rust和Golang协程设计

    根据维基百科的定义,协程,是指在非抢占式地处理多任务场景下,用于生成子程序的计算机程序组件,它允许在执行过程中被暂停或恢复。

    云云众生
  • 零成本异步 I/O (上)

    async 是一个修饰符,它可以应用在函数上,这种函数不会在调用时一句句运行完成,而是立即返回一个 Future 对象,这个 Future 对象最终将给出这个函...

    MikeLoveRust

扫码关注云+社区

领取腾讯云代金券