前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >时序数据库Influx-IOx源码学习九(查询主流程)

时序数据库Influx-IOx源码学习九(查询主流程)

作者头像
刘涛华
发布2021-04-29 15:23:01
5810
发布2021-04-29 15:23:01
举报

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。


上一篇粗略的总结了写入的基本流程,详情见:

https://my.oschina.net/u/3374539/blog/5033469

这一篇记录一下查询的主要流程。


在第六章中,写了一个查询示例,如下:

代码语言:javascript
复制
 let mut query = flight::Client::new(connection)
        .perform_query("databaseName", "select * from myMeasurement")
        .await
        .expect("query request should work");

其中connection,代表的建立了一个Grpc的连接。perform_query代表执行查询,其中第一个参数是数据库名字,第二个参数是要执行查询的sql语句。这个perform_query是封装了一下调用协议,然后调用了服务器端的do_get方法,do_get方法在服务器的src/influxdb_ioxd/rpc/flight.rs:139行可以找到,如下:

代码语言:javascript
复制
async fn do_get(
        &self,
        //这个Ticket里就是保存的perform_query方法中封装的json数据
        request: Request<Ticket>,
    ) -> Result<Response<Self::DoGetStream>, tonic::Status> {
        //这里就是把json还原回来
        let ticket = request.into_inner();
        let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicket {
            ticket: ticket.ticket,
        })?;
        //反序列化成了ReadInfo结构
        let read_info: ReadInfo =
            serde_json::from_str(&json_str).context(InvalidQuery { query: &json_str })?;
        //拿到客户端设置的数据库名字
        let database = DatabaseName::new(&read_info.database_name).context(InvalidDatabaseName)?;
        //从内存中查找是否存在这个database名字,如果不存在就会报DatabaseNotFound错误回去
        //这里就是创建数据库的时候写入到内存里的
        //同时还应该记得iox的数据库必须一个节点创建一次。。hhhhha
        let db = self.server.db(&database).context(DatabaseNotFound {
            database_name: &read_info.database_name,
        })?;
        //这个是拿到之前创建数据库时候设置的线程池,可以回去参考第五章
        let executor = db.executor();
        //这里是创建出sql语句对应的physical_plan,后面再看
        let physical_plan = Planner::new(Arc::clone(&executor))
            .sql(db, &read_info.sql_query)
            .await
            .context(Planning)?;

        //使用线程异步的执行查询
        let results = executor
             //复制一下执行时候需要用到的信息
            .new_context()
             //真正的去执行
            .collect(Arc::clone(&physical_plan))
            .await
            .map_err(|e| Box::new(e) as _)
            .context(Query {
                database_name: &read_info.database_name,
            })?;

        //在写入的章节里应该知道了在RBChunk里面存储的是Arrow格式的。
        //在这个方法中就是调用arrow_flight工具包的方法,先把schema序列化到flight_buffer中
        let options = arrow::ipc::writer::IpcWriteOptions::default();
        let schema = physical_plan.schema();
        let schema_flight_data =
            arrow_flight::utils::flight_data_from_arrow_schema(schema.as_ref(), &options);

        let mut flights: Vec<Result<FlightData, tonic::Status>> = vec![Ok(schema_flight_data)];
        //上面得到的结果集,这里进行遍历,封装为要返回的数据结构
        let mut batches: Vec<Result<FlightData, tonic::Status>> = results
            .iter()
            //这个是为了给下面flight_data_from_arrow_batch这个方法打补丁用的
            //因为这个方法即便对于切片类型的batch也是盲目的序列化所有数据
            .map(optimize_record_batch)
            .collect::<Result<Vec<_>, Error>>()?
            .iter()
            //这里就是一条一条的把数据序列化到缓冲区里
            .flat_map(|batch| {
                let (flight_dictionaries, flight_batch) =
                    arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options);
                //把数据包装在Result中
                flight_dictionaries
                    .into_iter()
                    .chain(std::iter::once(flight_batch))
                    .map(Ok)
            })
            .collect();

        //前面是schema,后面是数据
        flights.append(&mut batches);
        //返回一个数据的异步stream,有可能调用一次next就会释放一次cpu?
        let output = futures::stream::iter(flights);
        //数据以flight形式发送到了客户端,客户端先读取schema再读取数据。
        Ok(Response::new(Box::pin(output) as Self::DoGetStream))
    }

这里基本上是整个查询的主逻辑:

  • 异步的将sql转换为plan。
  • 异步的去执行plan并返回结果和结果所对应的schema信息。
  • 将返回的arrow数据封装到flights格式中。
  • 通过Grpc返回

这一篇就到这里吧,下几章准备记录一下:

  1. sql是怎么被执行的
  2. 查询中都经历了什么
  3. 等等。。。

祝玩儿的开心。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据库技术研究 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档