专栏首页Rust学习专栏Rust网络编程框架-Tokio进阶
原创

Rust网络编程框架-Tokio进阶

我们在上文《小朋友也能听懂的Rust网络编程框架知识-Tokio基础篇》对于Tokio的基础知识进行了一下初步的介绍,本文就对于Tokio的用法及原理进行进一步的介绍与说明。

目前市面上绝大多数编程语言所编写的程序,执行程序与代码编写顺序完全相同,当然有的读者可能会提到CPU的乱序执行机制,但乱序执行从本质上讲还是顺序提交的,程序在第一行执行完成之后再去执行下一行,并以此类推,是通用的编程模式。

在这种传统的式编程范式中,当程序遇到耗时操作时,会一直阻塞直到操作完成。比如建立TCP连接可能需要与网络上的对端节点进行若干次握手,这可能会花费相当多的时间。在此期间,线程被阻塞而无法完成其它操作。

在传统的编程范式中往往使用回调机制来进行资源调配的优化,对于不能立即完成的操作将被挂起到后台,这种情况下线程不会被阻塞,可以继续执行其它任务。一旦操作完成,该任务的回调函数将被调用,从而使任务最终完成。尽管回调模式可以带来使应用程序的效率更高,但也会导致程序更复杂。开发者需要跟踪异步操作完成后恢复工作所需的所有状态,从我的经验来看,这是一项特别乏味而且极容易出错的工作任务。

为什么需要异步调用

以下例程部分依赖于mini-redis模块在执行了cargo install mini-redis之后,并在Cargo.toml最后加入以下配置项之后,

tokio = { version = "1", features = ["full"] }

mini-redis = "0.4"

即可顺利执行下列代码:

use tokio::net::{TcpListener, TcpStream};

use mini_redis::{Connection, Frame};

#[tokio::main]

async fn main() {

// 绑定端口

    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {

        // 监控端口消息,对于每个socket请求,都启动一个folk进程,进行处理

        let (socket, _) = listener.accept().await.unwrap();

        Process(socket).await;

    }

}

async fn Process(socket: TcpStream) {

    let mut connection = Connection::new(socket);

    if let Some(frame) = connection.read_frame().await.unwrap() {

        println!("WE GOT: {:?}", frame);

        let response = Frame::Error("not finished".to_string());

        connection.write_frame(&response).await.unwrap();

    }

}

以上代码可能我们在其它语言编程中所经常遇到的,对于每个Socket连接都通过一个线程来处理(当然这里只是以Rust为例说明,在Tokio中不推荐这种做法,我也就没有另行启动线程)并且最关键的一点是process(socket).await;是同步调用,也就是说在线程阻塞在process函数时并没有其它事情可做,整个线程必须要等到响应被完全写入socket stream才能返回。而这种并发处理与我们尽可能多的同时处理更多请求的初衷是不一致的。

这里笔者必须要指出,并发和并行完全是两件事。多个任务交替执行是并发,并行是有多个人,一个人负责一个任务。而Rust的Tokio最大就是并发效率很高,线程并不需要去等待那些无效的任务,众多并发任务之间由Tokio去统一调度。

Tokio的答案

Rust使用spawn关键字来建立此类并发任务的任务池,按照笔者的理解,这和线程池不是一个概念,因为并发的任务可能有多个线程共同处理,也可能只有一个线程就搞定了。在使用Rust这种并发任务的异步函数使用async关键字修饰,在异步函数的函数体内任何类似于await的阻塞调用用都会使任务将控制权交还给线程。当操作进程在后台时,线程可以做其他工作。操作产生的结果也将形成一个Future,也就是未来才会产生的值被系统以变通的方式优化处理,改写后的代码如下:

use tokio::net::{TcpListener, TcpStream};

use mini_redis::{Connection, Frame};

#[tokio::main]

async fn main() {

// 绑定端口

    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {

        // 监控端口消息,对于每个socket请求,都启动一个folk进程,进行处理

        let (socket, _) = listener.accept().await.unwrap();

        tokio::spawn(async move {

            process(socket).await;

        });

    }

}

async fn Process(socket: TcpStream) {

    let mut connection = Connection::new(socket);

    if let Some(frame) = connection.read_frame().await.unwrap() {

        println!("WE GOT: {:?}", frame);

        let response = Frame::Error("not finished".to_string());

        connection.write_frame(&response).await.unwrap();

    }

}

Tokio的任务通过tokio::spawn来创建,spawn函数返回一个JoinHandle,调用者可以使用JoinHandle它与Tokio的任务进行交互。async修饰的函数的返回值以Future方式返回。调用者可以使用.awai来Future的执行结果。代码如下:

#[tokio::main]async fn main() {

    let handle = tokio::spawn(async {

          "hello beyondma"

    });

     let out = handle.await.unwrap();

    println!("GOT {}", out);

}

上述程序运行结果为

GOT hello beyondma

当Tokio任务执行过程中遇到错误时,JoinHandle将返回一个Err。当任务失败时,或者当任务被强制关闭时,是铁定会返回ERR的。Tokio任务由Tokio调度器管理的最小可执行单元。正如上文所说Tokio的任务可能在同一个线程上执行,也可能在不同的线程上执行,这种多路复用机制可以参考上文《《小朋友也能听懂的Rust网络编程框架知识-Tokio基础篇》》

Tokio任务之间的同步与通信

我们知道Rust有着比较独特的变量生命周期机制,在之前的示例代码当中都是用了move关键字来强制传递变量所属关系的,如下:

tokio::spawn(async move {

process(socket).await;

});

那么如何在各个Tokio任务之间进行通信与状态同步也是个值得在本文中讨论的问题。

这里我们先来讨论比较简单的情况,可以用Arc<Mutex<_>>类型,也就是加互斥锁的哈希表来进行任务间的信息传递与同步,使用clone方法来为每个任务获取自己的哈希表实例。具体如下:

use tokio::net::TcpListener;

use std::collections::HashMap;

use std::sync::{Arc, Mutex};



#[tokio::main]

async fn main() {

    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();



    println!("Listening");



    let hashMap= Arc::new(Mutex::new(HashMap::new()));

    loop {

        let (socket, _) = listener.accept().await.unwrap();

        // Clone the handle to the hash map.

        let thisMap = hashMap.clone();

        println!("Accepted");

        tokio::spawn(async move {

             let mut thisMap = thisMap .lock().unwrap();

             thisMap .insert("hello", "beyondma");

             println!("{:?}",thisMap );

           

        });

    }

}

这样随便找其它终端或者在本机上执行telnet 服务器IP 6379

就可以看到以下结果

Listening

Accepted

{"hello": "beyondma"}

这里这个hashMap的确可以在进程之间进行信息的共享与同步,但是在这种高并发的框架中一般还是推荐使用管道(channel)来进行相关操作。有关channel的话题我们会在下次再深入讲解。

Tokio的任务非常轻,只需要一个64字节的上下文即可,考虑到Rust中也没有GC机制,因此基于Tokio理论上完全可以做出比Golang支持更多并发的应用程序,这也是笔者会计划用3篇左右的系列文章来对于Tokio进行详细介绍的原因。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Rust高并发编程总结

    Serverless的概念火了,业界已经不再讨论要不要用Serverless的问题了,而是高喊Serverless First的口号力求快速拥抱Serverle...

    beyondma
  • 【大家的项目】Rbatis - 制作 Rust 语言堪比 Mybatis 的异步 ORM 框架

    因为是复刻Java系的Mybatis,因此框架暂命名 Rbatis。小部分功能还在进行中。github链接https://github.com/rbatis/r...

    MikeLoveRust
  • 【Rust日报】 2019-08-12:Tokio alpha 版发布,新版本支持async/await

    Read More: https://tokio.rs/blog/2019-08-alphas/

    MikeLoveRust
  • 那些必须要了解的Serverless时代的并发神器-Rust语言Tokio框架基础

    今天我们继续高并发的话题,传统的云计算技术,本质上都是基于虚拟机的,云平台可以将一些性能强劲的物理服务器,拆分成若干个虚拟机,提供给用户使用,但在互联网发展到今...

    beyondma
  • 【Rust日报】2021-02-21 Ballista:在Rust中实现的分布式计算平台

    Ballista-0.4.0 已于昨天发布。Ballista是主要在Rust中实现的分布式计算平台,由Apache Arrow支持。

    MikeLoveRust
  • Rust网络编程框架-深入理解Tokio中的管道

    我们在上文《Rust网络编程框架-Tokio进阶》介绍了async/await和锁的基本用法,并完成了一个Server端的DEMO代码。本文继续来探讨这个话题。

    beyondma
  • 【Rust日报】2021-09-22 kbio基于io_uring的异步 IO 框架

    Blog:https://blog.hidva.com/2021/09/14/kbio/

    MikeLoveRust
  • 两张图展示当前 Rust Web 生态

    由图可见,tokio 生态目前在网络服务和Web 开发方面基本的核心组件都已齐全。尤其是随着 Axum 框架的推出,tokio 在 Web 生态已接近完备。

    张汉东
  • 【Rust日报】 2019-07-17:微软安全响应中心:一种主动性的方式来提升安全

    本文简单介绍了在Rust中编写一个工程性更强的组件(crate)所必须要遵循的一些原则:

    MikeLoveRust
  • 介绍一款还不错的Rust ClickHouse客户端

    用过 Rust 的应该都知道,tokio 是异步编程的基石,很多框架都是基于 tokio 之上构建的。目前一些 ClickHouse 的客户端代码比较陈旧,使用...

    Nauu
  • 【Rust 日报】2021-06-06 TheAlgorithms Rust 实现

    大名鼎鼎的 The Algorithms 的 Rust 版本,使用 Rust 实现所有算法。

    MikeLoveRust
  • 【Rust每周一库】hyper - 底层http库

    现在说到写应用,网络框架肯定是必不可少的。今天就给大家简单介绍一下hyper。hyper是一个偏底层的http库,支持HTTP/1和HTTP/2,支持异步Rus...

    MikeLoveRust
  • Rust的第二次接触-写个小服务器程序

    蛮久前入门了一下 Rust 语言。它的设计模型非常地吸引C/C++的开发者。但是学习语言嘛还是要练习一下,之前也用它给我们项目写了个命令行小工具。这回拿来写个小...

    owent
  • Rust学习资源(持续更新)

    袁承兴
  • 【Rust日报】 2019-07-16:「新手向」Rust vs C++ : 实现神经网络

    该站点专注于记录世界各地Rust各大活动的时间线,开源项目,大家可以提交活动信息。

    MikeLoveRust
  • Linkerd最先进的Rust代理|Linkerd2-proxy

    部分由于Linkerd的性能数字和一流的安全审计报告,最近对Linkerd2-proxy(Linkerd使用的底层代理)的兴趣激增。作为一名Linkerd2维护...

    CNCF
  • 【Rust日报】2021-06-07 是什么让 Rust 撩动了我

    这篇文章的目标是那些从 垃圾回收语言 转到 Rust 的人, 例如 python 或者 javascript.

    MikeLoveRust
  • Rust 视界 | async-std 团队发布 Async Http 套件

    本文是对Yoshua Wuyts 博客文章的摘录,以及一些私人观点。原文地址:https://blog.yoshuawuyts.com/async-http/ ...

    MikeLoveRust
  • 【Rust日报】 2019-08-11:C++工程师的Rust迁移之道 组合与集成

    对于上述提到的3个问题,在Rust中有一个统一的解决方案,那就是trait系统, 更多内容请看正文。

    MikeLoveRust

扫码关注云+社区

领取腾讯云代金券