前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Rust投稿】从零实现消息中间件(6)-CLIENT

【Rust投稿】从零实现消息中间件(6)-CLIENT

作者头像
MikeLoveRust
发布2020-03-11 13:49:22
7430
发布2020-03-11 13:49:22
举报
文章被收录于专栏:Rust语言学习交流

功能设计

client实现功能相对比较单一,就是能够向服务器pub消息,然后就会说订阅消息,订阅的主题收到消息以后能够得到通知.因此总结起来就是下面三个功能:

  1. 提供pub接口
  2. 提供sub接口
  3. 处理sub后收到的消息

数据结构定义

提供给用户的接口是上面的三个, 为了实现这三个接口,client一定要有的就是writer以及handler. 而sid则是因为一个client可以有多个sub,每一个sub要有唯一的id,主要是编号用. stop则是为了client正常关闭使用.

代码语言:javascript
复制
#[derive(Debug)]
pub struct Client {
    addr: String,
    writer: Arc<Mutex<WriteHalf<TcpStream>>>,
    pub stop: oneshot::Sender<()>,
    sid: u64,
    handler: Arc<Mutex<HashMap<String,  
                  mpsc::UnboundedSender<Vec<u8>>>>>,
}

接口-connect

connect的功能非常直白就是创建和服务器的连接,同时后台会启动一个任务来处理tcp连接,主要是接收消息.

代码语言:javascript
复制
 pub async fn connect(addr: &str) -> std::io::Result<Client> {}

接口-pub_message

向服务器发布一条pub消息

代码语言:javascript
复制
pub async fn pub_message(&mut self, 
  subject: &str, 
  msg: &[u8]) 
     -> std::io::Result<()> {}

接口-sub_message

向服务器发布一条sub消息,然后等待服务器推送相关消息. 需要说明的是这里的参数subjectqueue完全没有必要使用String,&str即可. 这应该是rust的一个bug,在1.41和nightly 1.43都是编译不过去的.所以退而求其次,使用了String.

代码语言:javascript
复制
    //sub消息格式为SUB subject {queue} {sid}\r\n
    pub async fn sub_message(
        &mut self,
        subject: String,
        queue: Option<String>,
        handler: MessageHandler,
    ) -> std::io::Result<()> {}

receive_task

receive_task主要是做消息的接收,解析,以及将消息派发给合适的handler. 这个其实是本模块最复杂的地方,总体上比较直观. 主要有以下两点

  1. 使用futures::select这个宏来辅助实现同时监控多个future
  2. TcpStream如果read到size为0,说明连接已经关闭,无需继续
代码语言:javascript
复制
    async fn receive_task(
        mut reader: ReadHalf<TcpStream>,
        stop: oneshot::Receiver<()>,
        handler: Arc<Mutex<HashMap<String,
                 mpsc::UnboundedSender<Vec<u8>>>>>,
        writer: Arc<Mutex<WriteHalf<TcpStream>>>,
    ) 

API的使用

pub

代码语言:javascript
复制
 c.pub_message("test", format!("hello{}", i).as_bytes())
            .await?;

sub

代码语言:javascript
复制
 c.sub_message(
        "test".into(),
        None,
        Box::new(move |msg| {
            println!("recevied:{}", unsafe { std::str::from_utf8_unchecked(msg) });
            Ok(())
        }),
    )

代码实现

代码语言:javascript
复制
type MessageHandler = Box<dyn FnMut(&[u8]) -> std::result::Result<(), ()> + Sync + Send>;
//#[derive(Debug)]
pub struct Client {
    addr: String,
    writer: Arc<Mutex<WriteHalf<TcpStream>>>,
    pub stop: Option<oneshot::Sender<()>>,
    sid: u64,
    handler: Arc<Mutex<HashMap<String, MessageHandler>>>,
}

impl Client {
    //1. 建立到服务器的连接
    //2. 启动后台任务
    pub async fn connect(addr: &str) -> std::io::Result<Client> {
        let conn = TcpStream::connect(addr).await?;
        let (reader, writer) = tokio::io::split(conn);
        let (tx, rx) = tokio::sync::oneshot::channel();
        let c = Client {
            addr: addr.into(),
            writer: Arc::new(Mutex::new(writer)),
            stop: Some(tx),
            sid: 0,
            handler: Arc::new(Default::default()),
        };
        let handler = c.handler.clone();
        let writer = c.writer.clone();
        /*
        tokio::spawn 可以认为和go语言中的
        go func(){}()
        */
        tokio::spawn(async move {
            Self::receive_task(reader, rx, handler, writer).await;
        });
        Ok(c)
    }
    /*
    从服务器接收pub消息
    然后推送给相关的订阅方。
    */
    async fn receive_task(
        mut reader: ReadHalf<TcpStream>,
        stop: oneshot::Receiver<()>,
        handler: Arc<Mutex<HashMap<String, MessageHandler>>>,
        writer: Arc<Mutex<WriteHalf<TcpStream>>>,
    ) {
        let mut buf = [0 as u8; 512];
        let mut parser = Parser::new();
        use futures::*;
        let mut stop = stop.fuse();
        loop {
            select! {
                _=stop=>{
                    println!("client closed");
                    return;
                }
                r = reader.read(&mut buf[..]).fuse()=>{
                     let n = {
                        match r {
                            Err(e) => {
                                println!("read err {}", e);
                                let _ = writer.lock().await.shutdown().await;
                                return;
                            }
                            Ok(n) => n,
                        }
                    };
                    if n == 0 {
                        //EOF,说明对方关闭了连接
                        return;
                    }
                    let mut buf2 = &buf[..n];
                    loop {
                        let r = parser.parse(buf2);
                        let (r, n) = match r {
                            Err(e) => {
                                println!("parse err {}", e);
                                let _ = writer.lock().await.shutdown().await;
                                return;
                            }
                            Ok(r) => r,
                        };
                        //                println!("receive msg {:?}", r);
                        match r {
                            ParseResult::NoMsg => {
                                break;
                            }
                            ParseResult::MsgArg(msg) => {
                                Self::process_message(msg, &handler).await;
                                parser.clear_msg_buf();
                            }
                        }
                        //缓冲区处理完毕
                        if n == buf.len() {
                            break;
                        }
                        buf2 = &buf2[n..]
                    }
                }
            }
        }
    }
    /*
    根据消息的subject,找到订阅方,
    然后推送给他们
    */
    pub async fn process_message(
        msg: MsgArg<'_>,
        handler: &Arc<Mutex<HashMap<String, MessageHandler>>>,
    ) {
        //        println!("broadcast msg {}", msg.subject);
        let mut handler = handler.lock().await;
        let h = handler.get_mut(msg.subject);
        if let Some(h) = h {
            let _ = h(msg.msg);
        }
    }
    //pub消息格式为PUB subject size\r\n{message}
    pub async fn pub_message(&self, subject: &str, msg: &[u8]) -> std::io::Result<()> {
        let mut writer = self.writer.lock().await;
        let m = format!("PUB {} {}\r\n", subject, msg.len());
        let _ = writer.write_all(m.as_bytes()).await;
        let _ = writer.write_all(msg).await;
        writer.write_all("\r\n".as_bytes()).await
    }

    //sub消息格式为SUB subject {queue} {sid}\r\n
    //可能由于rustc的bug,导致如果subject是&str,则会报错E0700,暂时使用String来替代
    pub async fn sub_message(
        &mut self,
        subject: String,
        queue: Option<String>,
        handler: MessageHandler,
    ) -> std::io::Result<()> {
        self.sid += 1;
        let mut writer = self.writer.lock().await;
        let m = if let Some(queue) = queue {
            format!("SUB {} {} {}\r\n", subject.as_str(), queue, self.sid)
        } else {
            format!("SUB {} {}\r\n", subject.as_str(), self.sid)
        };
        self.handler.lock().await.insert(subject, handler);
        writer.write_all(m.as_bytes()).await
    }
    pub fn close(&mut self) {
        if let Some(stop) = self.stop.take() {
            if let Err(e) = stop.send(()) {
                println!("stop err {:?}", e);
            }
        }
    }
}

其他

相关代码都在我的github rnats 欢迎围观

https://github.com/nkbai/learnrustbynats

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-03-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Rust语言学习交流 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 功能设计
  • 数据结构定义
    • 接口-connect
      • 接口-pub_message
        • 接口-sub_message
          • receive_task
          • API的使用
            • pub
              • sub
              • 代码实现
              • 其他
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档