专栏首页Rust学习专栏Rust高并发编程总结
原创

Rust高并发编程总结

Serverless的概念火了,业界已经不再讨论要不要用Serverless的问题了,而是高喊Serverless First的口号力求快速拥抱Serverless,无服务器并不是Serverless的本质,不需要关心服务器的情况就能高效工作,才是Serverless胜出的核心要义。互联网时代流量的大起大落,很多科技巨头在面对流量的冲击时也都败下阵来,针对前几个月B站的崩溃事件,笔者还曾写过《B站的前端崩了,后端的你别慌》来分析来龙去脉,而Serverless凭借快速伸缩的自动弹性特点,可以从容应对类似的冲击,这也让这种新技术出尽的风头。

在Serverless的喧嚣背后,Rust看似牢牢占据了C位,但其实在高并发这个话题下要总结的模式与套路其实很多,尤其是像Tokio专业的编程框架,对于程序员编写高性能程序的帮助很大。因此本文把之前介绍过的Tokio相关知识点进行一下补充和总结。

Future到底是个什么概念

简单来讲Future不是一个值,而是一种值类型,一种在未来才能得到的值类型。Future对象必须实现Rust标准库中的std::future:: future接口。Future的输出Output是Future完成后才能生成的值。在Rust中Future通过管理器调用Future::poll来推动Future的运算。Future本质上是一个状态机,而且可以嵌套使用,我们来看一下面这个例子,在main函数中,我们实例化MainFuture并调用.await,而MainFuture除了在几个状态之间迁移以外,还会调用一个Delay的Future,从而实现Future的嵌套。

MainFuture以State0状态做为初始状态。当调度器调用poll方法时,MainFuture会尝试尽可能地提升其状态。如果future完成,则返回Poll::Ready,如果MainFuture没有完成,则是由于它等待的DelayFuture没有达到Ready状态,那么此时返回Pending。调度器收到Pending结果,会将这个MainFuture重新放回待调度的队列当中,稍后会再度调用Poll方法来推进Future的执行。具体如下

use std::future::Future;

use std::pin::Pin;

use std::task::{Context, Poll};

use std::time::{Duration, Instant};



struct Delay {

    when: Instant,

}



impl Future for Delay {

    type Output = &'static str;



    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)

        -> Poll<&'static str>

    {

        if Instant::now() >= self.when {

            println!("Hello world");

            Poll::Ready("done")

        } else {

           

            cx.waker().wake_by_ref();

            Poll::Pending

        }

    }

}

enum MainFuture {

    

    State0,

    State1(Delay),

    Terminated,

}



impl Future for MainFuture {

    type Output = ();



    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)

        -> Poll<()>

    {

        use MainFuture::*;

      

        loop {

            match *self {

                State0 => {

                    let when = Instant::now() +

                        Duration::from_millis(1);

                    let future = Delay { when };

                    println!("init status");

                    *self = State1(future);

                }

                State1(ref mut my_future) => {

                    match Pin::new(my_future).poll(cx) {

                        Poll::Ready(out) => {

                            assert_eq!(out, "done");

                            println!("delay finished this future is ready");

                            *self = Terminated;

                            return Poll::Ready(());

                        }

                        Poll::Pending => {

                            println!("not ready");

                            return Poll::Pending;

                        }

                    }

                }

                Terminated => {

                    panic!("future polled after completion")

                }

            }

        }

    }

}

#[tokio::main]

async fn main() {

    let when = Instant::now() + Duration::from_millis(10);

   

    let mainFuture=MainFuture::State0;

    mainFuture.await;

   

}

当然这个Future的实现存在一个明显的问题,通过运行结果也可以知道调试器明显在需要等待的情况下还执行了很多次的Poll操作,理想状态下需要当Future有进展时再执行Poll操作。不断轮徇的Poll其实就退化成了低效的轮询。

解决之道在于poll函数中的Context参数,这个Context就是Future的waker(),通过调用waker可以向执行器发出信号,表明这个任务应该进行Poll操作了。当Future的状态推进时,调用wake来通知执行器,才是正解这就需要把Delay部分的代码改一下:

let waker = cx.waker().clone();

            let when = self.when;



            // Spawn a timer thread.

            thread::spawn(move || {

                let now = Instant::now();



                if now < when {

                    thread::sleep(when - now);

                }



                waker.wake();

            });

无论是哪种高并发框架,本质上讲都是基于这种Task/Poll机制的调度器, poll做的本质工作就是监测链条上前续Task的执行状态。

let waker = cx.waker().clone();

let when = self.when;

// Spawn a timer thread.

thread::spawn(move || {

let now = Instant::now();

if now < when {

thread::sleep(when - now);

}

waker.wake();

});

用好Poll的机制,就能避免上面出现事件循环定期遍历整个事件队列的调度算法,Poll的精髓就是把状态为ready的事件通知给对应的处理程序,而基于poll设计的如tokio框架进行应用开发时,程序员根本不必关心整个消息传递,只需要用and_then、spawn等方法建立任务链条并让系统工作起来就可以了。

数据祯的实现

帧是数据传输中的最小单位,帧粒度以下的字节数据对于应用来说没有任何意义,同时不完整的帧也应该在帧的处理层进行过滤,read_frame方法在返回之前等待接收到整个帧。对TcpStream::read()的单个调用可能返回任意数量的数据。它可以包含整个框架,部分框架,或多个框架。如果接收到部分帧,数据将被缓冲,并从套接字读取更多数据。如果接收到多个帧,则返回第一个帧,其余的数据将被缓冲,直到下一次调用read_frame。要实现这一点,Connection需要一个读缓冲区字段。数据从套接字读入读缓冲区。当一个帧被解析时,相应的数据将从缓冲区中删除。我们将使用BytesMut作为缓冲区类型。

use bytes::BytesMut;use tokio::net::TcpStream;

pub struct Connection {

    stream: TcpStream,

    buffer: BytesMut,

}

impl Connection {

    pub fn new(stream: TcpStream) -> Connection {

        Connection {

            stream,

            // Allocate the buffer with 4kb of capacity.

            buffer: BytesMut::with_capacity(4096),

        }

    }

}

read_frame函数尝试解析帧。如果有足够的数据来解析帧,则将帧返回给read_frame()的调用者。否则,将尝试从套接字中读取更多数据到缓冲区中。读取更多数据后,再次调用parse_frame()。这一次,如果接收到足够的数据,解析可能会成功。当从流中读取数据时,返回值为0表示不再从对等端接收数据。如果读缓冲区中仍然有数据,这表明已经接收到部分帧,连接正在突然终止。这是一个错误条件,并返回Err。

use mini_redis::{Frame, Result};

pub async fn read_frame(&mut self)

    -> Result<Option<Frame>>

{

    loop {

        if let Some(frame) = self.parse_frame()? {

            return Ok(Some(frame));

        }



        // Ensure the buffer has capacity

        if self.buffer.len() == self.cursor {

            // Grow the buffer

            self.buffer.resize(self.cursor * 2, 0);

        }



        // Read into the buffer, tracking the number

        // of bytes read

        let n = self.stream.read(

            &mut self.buffer[self.cursor..]).await?;



        if 0 == n {

            if self.cursor == 0 {

                return Ok(None);

            } else {

                return Err("connection reset by peer".into());

            }

        } else {

            // Update our cursor

            self.cursor += n;

        }

    }

}

一定要小心的Select

另外还有一个值得注意的点是select,在使用一个以上的通道时,任何一个通道都可以先完成。选择select!关键字将在所有的通道上等待,并将提到最先返回通道上的值。注意select!当等到第一个返回之后,其它未完成的任务将被取消。具体如下:

use tokio::sync::oneshot;



async fn some_operation() -> String {

    

    "hello beyondma".to_string()

}



#[tokio::main]

async fn main() {

    let (mut tx1, rx1) = oneshot::channel();

    let (tx2, rx2) = oneshot::channel();



      tokio::spawn(async {

        let _ = tx1.send("hello beyondma");

    });





    tokio::spawn(async {

        let _ = tx2.send("hi beyondma");

    });



    tokio::select! {

        val = rx1 => {

            println!("rx1 completed first with {:?}", val);

        }

        val = rx2 => {

            println!("rx2 completed first with {:?}", val);

        }

    }

}

以上这段代码的执行结果要不是

hello beyondma

要么是

hi beyondma

不可能出现两个都被输出的情况。

为了解释select的机制,我们自行设计一个MySelect的future,在对MySelect进行poll操作时,将轮询第一个分支。如果已经准备好,则使用该值并完成MySelect。在MySelect.await接收到一个Ready后,整个future被丢弃。具体如下:

use tokio::sync::oneshot;

use std::future::Future;

use std::pin::Pin;

use std::task::{Context, Poll};



struct MySelect {

    rx1: oneshot::Receiver<&'static str>,

    rx2: oneshot::Receiver<&'static str>,

}



impl Future for MySelect {

    type Output = ();



    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {

        if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {

            println!("rx1 completed first with {:?}", val);

            return Poll::Ready(());

        }



        if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {

            println!("rx2 completed first with {:?}", val);

            return Poll::Ready(());

        }



        Poll::Pending

    }

}



#[tokio::main]

async fn main() {

    let (tx1, rx1) = oneshot::channel();

    let (tx2, rx2) = oneshot::channel();



    // use tx1 and tx2

     tokio::spawn(async {

        let _ = tx1.send("hello beyondma");

    });



    tokio::spawn(async {

        let _ = tx2.send("hi beyondma");

    });

    MySelect {

        rx1,

        rx2,

    }.await;

}

Rust高并发总结

Rust是近些年来随着Serverless一起新兴起的语言,表面上看他像是C,既没有JVM虚拟机也没有GC垃圾回收器,但仔细一瞧他还不是C,Rust特别不信任程序员,力图让Rust编译器把程序中的错误杀死在在生成可执行文件之前的Build阶段。由于没有GC所以Rust当中独创了一套变量的生命周期及借调用机制。开发者必须时刻小心变量的生命周期是否存在问题。

而且Rust难的像火星语言,多路通道在使用之前要clone,带锁的哈希表用之前要先unwrap,种种用法和Java、Go完全不同,但是也正在由于这样严格的使用限制,我们刚刚所提到的Go语言中Gorotine会出现的问题,在Rust中都不会出现,因为Go的那些用法,通通不符合Rust变量生命周期的检查,想编译通过都是不可能完成的任务。

所以Rust很像逍遥派,想入门非常难,但只要能出师,写的程序能通过编译,那你百分百是一位高手,所以这是一门下限很高,上限同样也很高的极致语言。

目前Rust的高并发编程框架最具代表性的就是Tokio,本文开头Future的例子就是基于Tokio框架编写的,这里也不加赘述了。

根据官方的说法每个Rust的Tokio任务只有64字节大小,这比直接通过folk线程去网络请求,效率会提升几个数量级,在高并发框架的帮助下,开发者完全可以做到极限压榨硬件的性能。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 6.并发编程,总结

    编程思想,模型,设计模式,理论等等,都是交给你一种编程的方法,以后你遇到类似的情况,套用即可.

    changxin7
  • 网络编程与并发编程总结

    缺点:服务端更新后,客户端也得跟着更新,访问多个服务端需要下载对应的软件,占用客户端计算机的硬件资源大。

    GH
  • Go语言并发编程总结

    Golang :不要通过共享内存来通信,而应该通过通信来共享内存。这句风靡在Go社区的话,说的就是 goroutine中的 channel …….

    李海彬
  • Go语言并发编程总结

    Golang :不要通过共享内存来通信,而应该通过通信来共享内存。这句风靡在Go社区的话,说的就是 goroutine中的 channel ....... 他在...

    李海彬
  • Go语言并发编程总结

    Golang :不要通过共享内存来通信,而应该通过通信来共享内存。这句风靡在Go社区的话,说的就是 goroutine中的 channel ....... 他在...

    李海彬
  • Go语言并发编程总结

    Golang :不要通过共享内存来通信,而应该通过通信来共享内存。这句风靡在Go社区的话,说的就是 goroutine中的 channel ....... 他在...

    李海彬
  • java高级进阶|我的并发编程总结路线

    自己在读这篇内容时,觉得当时的语言陈述太简单了吧,删删减减,重新整理一下语言。这篇文章主要包含但不限于java并发编程总结路线图的分享,这篇文章也是对以往自己知...

    码农王同学
  • Java并发编程实战总结 (一)

    首先该场景是一个酒店开房的业务。为了朋友们阅读简单,我把业务都简化了。 业务:开房后会添加一条账单,添加一条房间排期记录,房间排期主要是为了房间使用的时间不冲...

    Johnson木木
  • 并发编程-synchronized关键字大总结

    可重入(可以避免死锁、单个线程可以重复拿到某个锁,锁的粒度是线程而不是调用)、不可中断(其实也就是上面的原子性)

    Wizey
  • Java并发编程的总结和思考

    编写优质的并发代码是一件难度极高的事情。Java语言从第一版本开始内置了对多线程的支持,这一点在当年是非常了不起的,但是当我们对并发编程有了更深刻的认识和更多的...

    大龄老码农-昊然
  • 并发编程的三个核心总结

    上一篇文章这次走进并发的世界,请不要错过 给大家带了并发编程的开胃菜,接下来我们逐步上正餐,在吃正餐之前,我还要引用那首诗词: 「横看成岭侧成峰,远近高低各不同...

    用户1516716
  • 高并发编程-happens-before

    happens-before是Java内存模型中定义的两个操作之间的偏序关系,即如果操作A在操作B之前先发生,那么操作A产生的操作结果,操作B可以观察到,或者说...

    JavaQ
  • 高并发编程系列

    放在静态方法上面,由于静态没有this可以锁定,不需要new 出对象,运用了反射.

    后端码匠
  • 高并发编程-happens-before

    happens-before是JMM最核心的概念,理解happens-before是理解JMM的关键

    小小工匠
  • 关于Java并发编程的总结和思考

    编写优质的并发代码是一件难度极高的事情。Java语言从第一版本开始内置了对多线程的支持,这一点在当年是非常了不起的,但是当我们对并发编程有了更深刻的认识和更多的...

    Java团长
  • java并发编程的艺术——第一章总结

    并发编程的挑战 1.1上下文切换 1.1.1多线程一定快吗 1.1.2测试上下文切换次数和时长 1.1.3如何减少上下文切换 1.1.4减少上下文切换实战   ...

    用户1134788
  • java并发编程的艺术——第四章总结

    第四章并发编程基础   java语言是内置对多线程支持的。   为什么使用多线程:     首先线程是操作系统最小的调度单元,多核心、多个线程可以同时执行,能够...

    用户1134788
  • Java并发编程与高并发之线程并发容器

    1、并发容器及安全共享策略总结,并发容器J.U.C(即java.util.concurrent)。J.U.C同步器AQS。

    别先生
  • Java并发编程与高并发之多线程

    1、线程池,初始化好线程池的实例以后,将要执行的任务丢到线程池里面,等待任务的调度执行。

    别先生

扫码关注云+社区

领取腾讯云代金券