专栏首页Rust学习专栏Rust网络编程框架-深入理解Tokio中的管道
原创

Rust网络编程框架-深入理解Tokio中的管道

我们在上文《Rust网络编程框架-Tokio进阶》介绍了async/await和锁的基本用法,并完成了一个Server端的DEMO代码。本文继续来探讨这个话题。

客户端代码DEMO

上文中依靠telnet来触发服务端代码的执行,本文我们将自己实现一个客户端。由于笔者也没有从之前比如GO、JAVA等语言的套路中完全走出来,我最初的实现是这样的

#[tokio::main]async fn main() {

      let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // 生成一个读取key的任务

    let t1 = tokio::spawn(async {

        let res = client.get("hello").await;

    });

    // 生成一个设置key的任务

    let t2 = tokio::spawn(async {

        client.set("foo", "bar".into()).await;

    });



    t1.await.unwrap();

    t2.await.unwrap();

}

但是以上代码根本就无法编译,因为tokio任务T1和T2都需要使用client,但是client并没有像上文中Arc::<Mutex::<HashMap>>一样实现copy方法,你还不能clone一个client分别给t1和t2使用,当然我们可以使用Mutex来解决任务之间的矛盾问题,但正如我们上文所说互斥锁的最大问题就是在同一时刻只能有一个任务执行到被加锁的关键代码,这样做法的效率又是问题。

使用消息传递的方案

使用channel管道进行消息传递,其实就是我们在并发编程框架中常用的生产者消费者模式。这个设计模式在本例当中其实就是生成两个任务,一个专门用来产生消息,另一个专门用来向服务端发送消息,channel管道其实就是一个消息的缓冲区,在发送任务繁忙时,产生的消息其实都在消息队列中缓冲,一旦有发送任务缓过劲来,就可以从管道里取新消息进行发送,与Mutex的互斥锁方案相比,channel管理的方式明显可以做得更大的性能与吞吐量。

在Tokio中提供以下四种管道的工作模式 Mpsc:Multi-Producer,Single-Consumer,也就是多生产者,单一消费者模式。

Oneshot:单一模式,也就是单一生产者,单一消费模式。

广播(broadcast)模式:Multi-producer multi-consumer。多生产者,多消费者的多对多模式。

观察(watch)模式:Single-Producer,multi-consumer。单生产者,多消费者的模式,这个模式与其它模式略有不同,每个接收者都只能看到最近的值。

这里笔者要特别提示大家,注意Tokio当中的channel管道与Rust原生channel和crossbeam提供的Channel不是同一个概念,Tokio中对于消费者来说,调用recv API返回的还是一个Future对象,recv接收消息操作并不会阻塞进程,这也是Tokio设计的一贯风格。以MPSC为例,使用样例如下:

use tokio::sync::mpsc;

#[tokio::main]async fn main() {

    let (tx, mut rx) = mpsc::channel(32);

    let tx2 = tx.clone();//clone之后可以将channel指派给不同任务

    tokio::spawn(async move {

        tx.send("sending from first handle").await;//必须调用await才会阻塞

    });



    tokio::spawn(async move {

        tx2.send("sending from second handle").await;

    });



    while let Some(message) = rx.recv().await {

        println!("GOT = {}", message);

    }

}

使用管道方式完整的客户端代码及注释如下:

use tokio::sync::mpsc;

use mini_redis::client;

use mini_redis::Command::*;

use bytes::Bytes;

//先定义redis的命令类型

#[derive(Debug)]

enum Command {

    Get {

        key: String,

    },

    Set {

        key: String,

        val: Bytes,

    }

}



#[tokio::main]

async fn main() {

//首先建立MPSC模式的通道



    let (tx, mut rx) = mpsc::channel(32);

//消费者允许多个,可以克隆

let tx2 = tx.clone();

//t1任务执行get操作

    let t1 = tokio::spawn(async move {

    let cmd = Command::Get {

        key: "hello".to_string(),

    };



    tx.send(cmd).await.unwrap();

   });

//t2任务执行set操作

    let t2 = tokio::spawn(async move {

    let cmd = Command::Set {

        key: "foo".to_string(),

        val: "bar".into(),

    };



    tx2.send(cmd).await.unwrap();

});

//manager任务是消费者,接收消息,并向服务端发送信息。



    let manager = tokio::spawn(async move {

    let mut client = client::connect("127.0.0.1:6379").await.unwrap();



    while let Some(cmd) = rx.recv().await {

       use Command::*;

        match cmd {

            Get { key } => {

                

                client.get(&key).await;

                println!("get command send");



            }

            Set { key, val } => {

                client.set(&key, val).await;

                 println!("set command send");

            }

        }

    }

});



t1.await.unwrap();

t2.await.unwrap();

manager.await.unwrap();



}

客户端执行结果如下:

get command send

set command send

注意:客户端需要在服务端启动的情况下才能运行,完整的服务端代码如下:

use tokio::net::TcpListener;

use std::collections::HashMap;

use std::sync::{Arc, Mutex};

use tokio::net::TcpStream;

use mini_redis::{Connection, Frame};

use bytes::Bytes;

type Db =Arc<std::sync::Mutex<HashMap<String, Bytes>>>;

#[tokio::main]

async fn main() {

    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();



    println!("Listening");



    let db = Arc::new(Mutex::new(HashMap::new()));



    loop {

        let (socket, _) = listener.accept().await.unwrap();

        // Clone the handle to the hash map.

        let db = db.clone();



        println!("Accepted");

        tokio::spawn(async move {

            process(socket, db).await;

        });

    }

}



async fn process(socket: TcpStream, db: Db) {

    use mini_redis::Command::{self, Get, Set};

    let mut connection = Connection::new(socket);



    while let Some(frame) = connection.read_frame().await.unwrap() {

        let response = match Command::from_frame(frame).unwrap() {

            Set(cmd) => {

                let mut db = db.lock().unwrap();

                println!("set command got");

                db.insert(cmd.key().to_string(), cmd.value().clone());

                Frame::Simple("OK".to_string())

            }           

            Get(cmd) => {

                let db = db.lock().unwrap();

                println!("get command got");



                if let Some(value) = db.get(cmd.key()) {

                    Frame::Bulk(value.clone())

                } else {

                    Frame::Null

                }

            }

            cmd => panic!("unimplemented {:?}", cmd),

        };



        // Write the response to the client

        connection.write_frame(&response).await.unwrap();

    }

}

读写分离

Tokio中对于I/O的读写操作方式与标准Rust的API基本相同,只是Tokio的读写都是异步的,在使用Tokio的读(AsyncRead)和写(AsyncWrite)等API,必须与.await一起使用,才能阻塞。比如下列代码是肯定不能编译通过的。

use tokio::fs::File;

use tokio::io::{self, AsyncReadExt};



#[tokio::main]

async fn main() -> io::Result<()> {

    let mut f = File::open("beyondma.txt");

    let mut buffer = [0; 10];



    // read up to 10 bytes

    let n = f.read(&mut buffer[..]);



    println!("The bytes: {:?}", &buffer[..n]);

    Ok(())

}

上述代码需要进行修改才能运行,如下:

use tokio::fs::File;

use tokio::io::{self, AsyncReadExt};



#[tokio::main]

async fn main() -> io::Result<()> {

    let mut f = File::open("beyondma.txt").await?;

    let mut buffer = [0; 10];



    // read up to 10 bytes

    let n = f.read(&mut buffer[..]).await?;



    println!("The bytes: {:?}", &buffer[..n]);

    Ok(())

}

另外注意:当read()返回Ok(0)时,表示Stream已经关闭,对于TcpStream实例,Ok(0)代表socket已关闭,如果代码运行在一个循环当中,此时应该退出循环。

在上一节的示例代码中,对于socket的读写都是由一个任务完成的,为了通过读写分离,来达到更高效率的,我们必须将TcpStream拆分为读和写两个handle。对于tokio的框架来看,读写分享使用io::split来实现。例程如下:

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

use tokio::net::TcpListener;



#[tokio::main]

async fn main() -> io::Result<()> {

    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();



    loop {

        let (mut socket, _) = listener.accept().await?;



        tokio::spawn(async move {

            let mut buf = vec![0; 1024];



            loop {

                match socket.read(&mut buf).await {

                    //记住ok(0)需要直接返回

                    Ok(0) => return,

                    Ok(n) => {

                        // Copy the data back to socket

                        if socket.write_all(&buf[..n]).await.is_err() {

                          

                            return;

                        }

                    }

                    Err(_) => {

                

                        return;

                    }

                }

            }

        });

    }

}

这是一个典型的回显输入的Echo Server,另外启动一个终端执行telnett 结果如下:

telnet 127.0.0.1 6380

Trying 127.0.0.1...

Connected to 127.0.0.1.

Escape character is '^]'.

skldjsfl

skldjsfl

sksdkj

sksdkj

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Rust网络编程框架-Tokio进阶

    我们在上文《小朋友也能听懂的Rust网络编程框架知识-Tokio基础篇》对于Tokio的基础知识进行了一下初步的介绍,本文就对于Tokio的用法及原理进行进一步...

    beyondma
  • 【大家的项目】Rbatis - 制作 Rust 语言堪比 Mybatis 的异步 ORM 框架

    因为是复刻Java系的Mybatis,因此框架暂命名 Rbatis。小部分功能还在进行中。github链接https://github.com/rbatis/r...

    MikeLoveRust
  • 那些必须要了解的Serverless时代的并发神器-Rust语言Tokio框架基础

    今天我们继续高并发的话题,传统的云计算技术,本质上都是基于虚拟机的,云平台可以将一些性能强劲的物理服务器,拆分成若干个虚拟机,提供给用户使用,但在互联网发展到今...

    beyondma
  • Rust高并发编程总结

    Serverless的概念火了,业界已经不再讨论要不要用Serverless的问题了,而是高喊Serverless First的口号力求快速拥抱Serverle...

    beyondma
  • 【Rust日报】2021-02-21 Ballista:在Rust中实现的分布式计算平台

    Ballista-0.4.0 已于昨天发布。Ballista是主要在Rust中实现的分布式计算平台,由Apache Arrow支持。

    MikeLoveRust
  • P99 Conf Talk 汇总 | Rust 在高性能低延迟系统中的应用

    P99 Conf[1] 是一个由 Scylladb[2] 组织的新的跨行业的线上Conf,为工程师而设。该活动以低延迟、高性能设计为中心,范围包括操作系统(内核...

    张汉东
  • 【Rust日报】 2019-08-12:Tokio alpha 版发布,新版本支持async/await

    Read More: https://tokio.rs/blog/2019-08-alphas/

    MikeLoveRust
  • Linkerd最先进的Rust代理|Linkerd2-proxy

    部分由于Linkerd的性能数字和一流的安全审计报告,最近对Linkerd2-proxy(Linkerd使用的底层代理)的兴趣激增。作为一名Linkerd2维护...

    CNCF
  • 两张图展示当前 Rust Web 生态

    由图可见,tokio 生态目前在网络服务和Web 开发方面基本的核心组件都已齐全。尤其是随着 Axum 框架的推出,tokio 在 Web 生态已接近完备。

    张汉东
  • 【Rust 日报】2021-09-30 明天就是国庆长假了!祝愿大家能开开心心过长假!

    Read More: https://mp.weixin.qq.com/s/97_PcKN54ktVBYzqvz6g_Q

    MikeLoveRust
  • Rust学习资源(持续更新)

    袁承兴
  • 介绍一款还不错的Rust ClickHouse客户端

    用过 Rust 的应该都知道,tokio 是异步编程的基石,很多框架都是基于 tokio 之上构建的。目前一些 ClickHouse 的客户端代码比较陈旧,使用...

    Nauu
  • 【Rust日报】2021-09-22 kbio基于io_uring的异步 IO 框架

    Blog:https://blog.hidva.com/2021/09/14/kbio/

    MikeLoveRust
  • 【Rust日报】 2019-07-17:微软安全响应中心:一种主动性的方式来提升安全

    本文简单介绍了在Rust中编写一个工程性更强的组件(crate)所必须要遵循的一些原则:

    MikeLoveRust
  • 【Rust日报】2021-06-07 是什么让 Rust 撩动了我

    这篇文章的目标是那些从 垃圾回收语言 转到 Rust 的人, 例如 python 或者 javascript.

    MikeLoveRust
  • 【Rust每周一库】hyper - 底层http库

    现在说到写应用,网络框架肯定是必不可少的。今天就给大家简单介绍一下hyper。hyper是一个偏底层的http库,支持HTTP/1和HTTP/2,支持异步Rus...

    MikeLoveRust
  • 【Rust日报】 2019-08-11:C++工程师的Rust迁移之道 组合与集成

    对于上述提到的3个问题,在Rust中有一个统一的解决方案,那就是trait系统, 更多内容请看正文。

    MikeLoveRust
  • 【Rust日报】 2019-07-16:「新手向」Rust vs C++ : 实现神经网络

    该站点专注于记录世界各地Rust各大活动的时间线,开源项目,大家可以提交活动信息。

    MikeLoveRust
  • Rust的第二次接触-写个小服务器程序

    蛮久前入门了一下 Rust 语言。它的设计模型非常地吸引C/C++的开发者。但是学习语言嘛还是要练习一下,之前也用它给我们项目写了个命令行小工具。这回拿来写个小...

    owent

扫码关注云+社区

领取腾讯云代金券