前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Rust投稿】从零实现消息中间件(2)-PARSER

【Rust投稿】从零实现消息中间件(2)-PARSER

作者头像
MikeLoveRust
发布2020-03-05 14:35:20
7240
发布2020-03-05 14:35:20
举报

消息格式

服务器和客户端来往的消息只有三种,分别是订阅(SUB),发布(PUB),推送消息(MSG). 其中前两种是从客户端向服务端推送,最后一种则是服务端向客户端推送.

服务端需要解析的消息格式

pub
代码语言:javascript
复制
PUB <subject> <size>\r\n
<message>\r\n
sub
代码语言:javascript
复制
SUB <subject> <sid>\r\n
SUB <subject> <queue> <sid>\r\n

客户端需要解析的消息格式

MSG
代码语言:javascript
复制
MSG <subject> <sid> <size>\r\n
<message>\r\n

消息格式解析的思路

出于性能考虑,应该考虑如下问题:

  1. 尽可能的避免内存分配
  2. 尽可能的避免内存复制(zero copy)
  3. 不要使用正则表达式去匹配

实现

根据上述原则,我最终选择使用状态机,这是最灵活的方式,虽然代码绍魏复杂一点,但是可以调整以尽可能满足上述三个原则. 注意这里的实现只针对服务端,相关代码都位于我的github

错误处理

错误处理这是在所有的系统中都要处理的事情,这里我先把可能发生的错误都列在这里,然后定义.

代码语言:javascript
复制
#[derive(Debug)]
pub const ERROR_PARSE: i32 = 1;
pub const ERROR_MESSAGE_SIZE_TOO_LARGE: i32 = 2;
pub const ERROR_INVALID_SUBJECT: i32 = 3;
pub const ERROR_SUBSCRIBTION_NOT_FOUND: i32 = 4;
pub const ERROR_CONNECTION_CLOSED: i32 = 5;
pub const ERROR_UNKOWN_ERROR: i32 = 1000;

pub struct NError {
    pub err_code: i32,
}
impl NError {
    pub fn new(err_code: i32) -> Self {
        Self { err_code }
    }
    pub fn error_description(&self) -> &'static str {
        match self.err_code {
            ERROR_PARSE => return "parse error",
            ...
            _ => return "unkown error",
        }
    }
}

状态定义

这里采用的是逐个byte解析的方式. 只处理pub和sub两种消息. 其中sub支持可选的queue来做负载均衡.

代码语言:javascript
复制
#[derive(Debug, Clone)]
enum ParseState {
    OpStart,
    OpS,
    OpSu,
    OpSub,
    OPSubSpace,
    OpSubArg,
    OpP,
    OpPu,
    OpPub, //pub argument
    OpPubSpace,
    OpPubArg,
    OpMsg, //pub message
    OpMsgFull,
}

Parser以及parse结果

在和汉东老师聊的过程中,他和我都认为rust代码写之前最好先定义清楚数据结构. 当然其他语言也需要这样,不过我感觉rust语言如果不限这么做,后续调整起来会更麻烦.

返回结果

parse的结果不外乎四种情况

  1. 出错了
  2. 到目前为止还没有收到完整的消息 比如只收到了SUB SUBJECT ,消息不完整,当然不能处理
  3. 一条PUB消息
  4. 一条SUB消息 rust #[derive(Debug, PartialEq)] pub struct SubArg<'a> { pub subject: &'a str, //为什么是str而不是String,就是为了避免内存分配, pub sid: &'a str, pub queue: Option<&'a str>, } #[derive(Debug, PartialEq)] pub struct PubArg<'a> { pub subject: &'a str, pub size_buf: &'a str, // 1024 字符串形式,避免后续再次转换 pub size: usize, //1024 整数形式 pub msg: &'a [u8], } #[derive(Debug, PartialEq)] pub enum ParseResult<'a> { NoMsg, //buf="sub top.stevenbai.blog" sub消息不完整,我肯定不能处理 Sub(SubArg<'a>), Pub(PubArg<'a>), }
Parser

Parser的定义这个版本我们尽量去满足上述三个原则,但是考虑到第二条zero-copy会让代码中 到处都是if-else,所以暂时先不考虑. 后续我们在做优化的时候会进行benchmark.

代码语言:javascript
复制
/*
这个长度很有关系,必须能够将一个完整的主题以及参数放进去,
所以要限制subject的长度
*/
const BUF_LEN: usize = 512;
pub struct Parser {
    state: ParseState,
    buf: [u8; BUF_LEN], //消息解析缓冲区,如果消息体+消息头不超过512,直接用这个,超过了就必须另分配
    arg_len: usize,
    msg_buf: Option<Vec<u8>>,
    //解析过程中受到新消息,那么 新消息的总长度是msg_total_len,已收到部分应该是msg_len
    msg_total_len: usize,
    msg_len: usize,
    debug: bool,
}

消息解析

有了这些定义以后,真正的消息解析过程就会清晰很多.

parse 函数的定义
代码语言:javascript
复制
 /**
    对收到的字节序列进行解析,解析完毕后得到pub或者sub消息,
    同时有可能没有消息或者缓冲区里面还有其他消息
    返回结果中的usize指的是消耗了缓冲区中多少字节
    */
    pub fn parse(&mut self, buf: &[u8]) -> Result<(ParseResult, usize)>
parse函数的使用
代码语言:javascript
复制
    fn test_sub2() {
        let mut p = Parser::new();
        let mut buf = "SUB subject 1\r\nSUB subject2 2\r\n".as_bytes();
        loop {
            let r = p.parse(buf);
            assert!(!r.is_err());
            let r = r.unwrap();
            buf = &buf[r.1..];
            match r.0 {
                ParseResult::Sub(sub) => {
                    println!("sub.subect={}", sub.subject);
                }
                _ => panic!(),
            }
            if buf.len() == 0 {
                break;
            }
        }
    }
完整parse的实现
代码语言:javascript
复制
impl Parser {
    pub fn new() -> Self {
        Self {
            state: ParseState::OpStart,
            buf: [0; BUF_LEN],
            arg_len: 0,
            msg_buf: None,
            msg_total_len: 0,
            msg_len: 0,
            debug: true,
        }
    }
    /**
    对收到的字节序列进行解析,解析完毕后得到pub或者sub消息,
    同时有可能没有消息或者缓冲区里面还有其他消息
    */
    pub fn parse(&mut self, buf: &[u8]) -> Result<(ParseResult, usize)> {
        let mut b;
        let mut i = 0;
        if self.debug {
            println!(
                "parse string:{},state={:?}",
                unsafe { std::str::from_utf8_unchecked(buf) },
                self.state
            );
        }
        while i < buf.len() {
            use ParseState::*;
            b = buf[i] as char;
            //            println!("state={:?},b={}", self.state, b);
            match self.state {
                OpStart => match b {
                    'S' => self.state = OpS,
                    'P' => self.state = OpP,
                    _ => parse_error!(),
                },
                OpS => match b {
                    'U' => self.state = OpSu,
                    _ => parse_error!(),
                },
                OpSu => match b {
                    'B' => self.state = OpSub,
                    _ => parse_error!(),
                },
                OpSub => match b {
                    //sub stevenbai.top 3 是ok的,但是substevenbai.top 3就不允许
                    ' ' | '\t' => self.state = OPSubSpace,
                    _ => parse_error!(),
                },
                OPSubSpace => match b {
                    ' ' | '\t' => {}
                    _ => {
                        self.state = OpSubArg;
                        self.arg_len = 0;
                        continue;
                    }
                },
                OpSubArg => match b {
                    '\r' => {}
                    '\n' => {
                        self.state = OpStart;
                        let r = self.process_sub()?;
                        return Ok((r, i + 1));
                    }
                    _ => {
                        self.add_arg(b as u8)?;
                    }
                },
                OpP => match b {
                    'U' => self.state = OpPu,
                    _ => parse_error!(),
                },
                OpPu => match b {
                    'B' => self.state = OpPub,
                    _ => parse_error!(),
                },
                OpPub => match b {
                    ' ' | '\t' => self.state = OpPubSpace,
                    _ => parse_error!(),
                },
                OpPubSpace => match b {
                    ' ' | '\t' => {}
                    _ => {
                        self.state = OpPubArg;
                        self.arg_len = 0;
                        continue;
                    }
                },
                OpPubArg => match b {
                    '\r' => {}
                    '\n' => {
                        //PUB top.stevenbai 5\r\n
                        self.state = OpMsg;
                        let size = self.get_message_size()?;
                        if size == 0 || size > 1 * 1024 * 1024 {
                            //消息体长度不应该超过1M,防止Dos攻击
                            return Err(NError::new(ERROR_MESSAGE_SIZE_TOO_LARGE));
                        }
                        if size + self.arg_len > BUF_LEN {
                            self.msg_buf = Some(Vec::with_capacity(size));
                        }
                        self.msg_total_len = size;
                    }
                    _ => {
                        self.add_arg(b as u8)?;
                    }
                },
                OpMsg => {
                    //涉及消息长度
                    if self.msg_len < self.msg_total_len {
                        self.add_msg(b as u8);
                    } else {
                        self.state = OpMsgFull;
                    }
                }
                OpMsgFull => match b {
                    '\r' => {}
                    '\n' => {
                        self.state = OpStart;
                        let r = self.process_msg()?;
                        return Ok((r, i + 1));
                    }
                    _ => {
                        parse_error!();
                    }
                },
                //                _ => panic!("unkown state {:?}", self.state),
            }
            i += 1;
        }
        Ok((ParseResult::NoMsg, 0))
    }
    //一种是消息体比较短,可以直接放在buf中,无需另外分配内存
    //另一种是消息体很长,无法放在buf中,额外分配了msg_buf空间
    fn add_msg(&mut self, b: u8) {
        if let Some(buf) = self.msg_buf.as_mut() {
            buf.push(b);
        } else {
            //消息体比较短的情况
            if self.arg_len + self.msg_total_len > BUF_LEN {
                panic!("message should allocate space");
            }
            self.buf[self.arg_len + self.msg_len] = b;
        }
        self.msg_len += 1;
    }
    fn add_arg(&mut self, b: u8) -> Result<()> {
        //太长的subject
        if self.arg_len >= self.buf.len() {
            parse_error!();
        }
        self.buf[self.arg_len] = b;
        self.arg_len += 1;
        Ok(())
    }
    //解析缓冲区中的形如stevenbai.top queue 3
    fn process_sub(&self) -> Result<ParseResult> {
        let buf = &self.buf[0..self.arg_len];
        //有可能客户端恶意发送一些无效的utf8字符,这会导致错误.
        let ss = unsafe { std::str::from_utf8_unchecked(buf) };
        let mut arg_buf = [""; 3]; //如果没有queue,长度就是2,否则长度是3
        let mut arg_len = 0;
        for s in ss.split(' ') {
            if s.len() == 0 {
                continue;
            }
            if arg_len >= 3 {
                parse_error!();
            }
            arg_buf[arg_len] = s;
            arg_len += 1;
        }
        let mut sub_arg = SubArg {
            subject: "",
            sid: "",
            queue: None,
        };
        sub_arg.subject = arg_buf[0];
        //长度为2时不包含queue,为3包含queue,其他都说明格式错误
        match arg_len {
            2 => {
                sub_arg.sid = arg_buf[1];
            }
            3 => {
                sub_arg.sid = arg_buf[2];
                sub_arg.queue = Some(arg_buf[1]);
            }
            _ => parse_error!(),
        }
        Ok(ParseResult::Sub(sub_arg))
    }
    //解析缓冲区中以及msg_buf中的形如stevenbai.top 5hello
    fn process_msg(&self) -> Result<ParseResult> {
        let msg = if self.msg_buf.is_some() {
            self.msg_buf.as_ref().unwrap().as_slice()
        } else {
            &self.buf[self.arg_len..self.arg_len + self.msg_total_len]
        };
        let mut arg_buf = [""; 2];
        let mut arg_len = 0;
        let ss = unsafe { std::str::from_utf8_unchecked(&self.buf[0..self.arg_len]) };
        for s in ss.split(' ') {
            if s.len() == 0 {
                continue;
            }
            if arg_len >= 2 {
                parse_error!()
            }
            arg_buf[arg_len] = s;
            arg_len += 1;
        }
        let pub_arg = PubArg {
            subject: arg_buf[0],
            size_buf: arg_buf[1],
            size: self.msg_total_len,
            msg,
        };
        Ok(ParseResult::Pub(pub_arg))
    }
    pub fn clear_msg_buf(&mut self) {
        self.msg_buf = None;
        self.msg_len = 0;
        self.msg_total_len = 0;
    }
    //从接收到的pub消息中提前解析出来消息的长度
    fn get_message_size(&self) -> Result<usize> {
        //缓冲区中形如top.stevenbai.top 5
        let arg_buf = &self.buf[0..self.arg_len];
        let pos = arg_buf
            .iter()
            .rev()
            .position(|b| *b == ' ' as u8 || *b == '\t' as u8);
        if pos.is_none() {
            parse_error!();
        }
        let pos = pos.unwrap();
        let size_buf = &arg_buf[arg_buf.len() - pos..];
        let szb = unsafe { std::str::from_utf8_unchecked(size_buf) };
        szb.parse::<usize>().map_err(|_| NError::new(ERROR_PARSE))
    }
}

其他

在实现Parser的过程中,我们展示了Rust中enum的使用,错误的处理,字符串的处理等常见的问题. 接下来我会继续实现另一个组件订阅的管理.

相关代码都在我的github rnats 欢迎围观

https://github.com/nkbai/learnrustbynats

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-03-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Rust语言学习交流 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 消息格式
    • 服务端需要解析的消息格式
      • pub
      • sub
    • 客户端需要解析的消息格式
      • MSG
  • 消息格式解析的思路
  • 实现
    • 错误处理
      • 状态定义
        • 返回结果
        • Parser
        • parse 函数的定义
        • parse函数的使用
        • 完整parse的实现
    相关产品与服务
    负载均衡
    负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档