前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >时序数据库Influx-IOx源码学习十二(物理计划的执行)

时序数据库Influx-IOx源码学习十二(物理计划的执行)

作者头像
刘涛华
发布2021-05-13 15:04:46
5530
发布2021-05-13 15:04:46
举报

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。


上一章介绍了一个SQL是怎样从字符串转换到物理执行计划的,详情见:

https://my.oschina.net/u/3374539/blog/5035628

这一章主要记录一下物理计划是怎样执行的。


在上一篇文章的末尾,我们展示了物理计划之中存储的数据,这些数据代表了当前整个数据库中,能够与用户输入的查询表相关联的所有数据。

对于一般数据库来讲,在物理计划中更应该是指向索引相关的信息,举例来说:select * from table1 ,在物理计划里,应该是要拿到table1的表描述、存储数据的文件路径、文件大小、等等,而不是拿到真实数据。在文章最末尾中,有一段省略的数据,为什么会出现数据呢?其实这是数据库设计的缓存,缓存的数据本来就没有落到磁盘上,所以直接在物理计划中也会持有RBChunk和MBChunk的数据引用。

对于一个过滤而言,会在物理计划中产生对应的信息,展示如下:

代码语言:javascript
复制
select * from myMeasurement where fieldKey like 'value1';

input: FilterExec { predicate: BinaryExpr { left: Column { name: "fieldKey" }, op: Like, right: Literal { value: Utf8("value1") } }

接下来看物理计划的执行代码:

代码语言:javascript
复制
pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
    match plan.output_partitioning().partition_count() {
        0 => Ok(vec![]),
        //单一块的时候直接取出数据
        1 => {
            let it = plan.execute(0).await?;
            common::collect(it).await
        }
        //多个数据块的时候就需要进行合并数据
        _ => {
            let plan = MergeExec::new(plan.clone());
            assert_eq!(1, plan.output_partitioning().partition_count());
            //这里分为了两步execute 和 collect
            common::collect(plan.execute(0).await?).await
        }
    }
}

接下来看plan.execute方法:

代码语言:javascript
复制
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
       。。。省略
      tokio::spawn(async move {
           //这里的input就代表了上面展示的filter的input或者是数据的input
          let mut stream = match input.execute(part_i).await {
              Err(e) => {
                  let arrow_error = ArrowError::ExternalError(Box::new(e));
                  sender.send(Err(arrow_error)).await.ok();
                  return;
              }
              Ok(stream) => stream,
          };
          //计划执行完成之后返回一个stream,这里就是一直next获取完
          while let Some(item) = stream.next().await {
              sender.send(item).await.ok();
          }
      });
      。。。省略
}

上面的input代表了以下这么多东西:

上面展示的为datafusion框架里的Plan,也就是通用sql都需要实现的功能,下面是iox项目中实现的Plan是完成数据获取的。

Plan之间的关系是嵌套的,想象一下上一章的大图,比如coalesceBatchesExec里可能还会包含filter,主要就是描述整个sql语句中都出现了什么。所有出现的plan就会对数据进行一次全面的过滤。

姑且不看过滤的细节,只看获取数据的部分(ExecutionPlan for IOxReadFilterNode)。

代码语言:javascript
复制
async fn execute(
        &self,
        partition: usize,
    ) -> datafusion::error::Result<SendableRecordBatchStream> {
        //因为在前面物理计划中得到了所有列,这里拿出列的名字
        let fields = self.schema.fields();
        let selection_cols = fields.iter().map(|f| f.name() as &str).collect::<Vec<_>>();
        //多个分区的时候可以根据分区号拿出chunk信息
        let ChunkInfo {
            chunk,
            chunk_table_schema,
        } = &self.chunk_and_infos[partition];

        //过滤出来列名字对应的arrow的filed,这里就存在不对应的问题,假如用户输入了ABC,但是chunk_table_schema中并不存在,这里就会是一个空
        let selection_cols = restrict_selection(selection_cols, &chunk_table_schema);
        let selection = Selection::Some(&selection_cols);
        //使用predicate过滤一次,但是我调试的时候一直是空的,也就是查询出所有数据。
        let stream = chunk
            .read_filter(&self.table_name, &self.predicate, selection)
            .map_err(|e| {
                DataFusionError::Execution(format!(
                    "Error creating scan for table {} chunk {}: {}",
                    self.table_name,
                    chunk.id(),
                    e
                ))
            })?;
        //这里使用SchemaAdapterStream的结构来填充空值列
        let adapter = SchemaAdapterStream::try_new(stream, Arc::clone(&self.schema))
            .map_err(|e| DataFusionError::Internal(e.to_string()))?;

        Ok(Box::pin(adapter))
    }

这个SchemaAdapterStream在代码中给了一个特别形象的描述:

代码语言:javascript
复制
///
///                       ┌────────────────┐                         ┌─────────────────────────┐
///                       │ ┌─────┐┌─────┐ │                         │ ┌─────┐┌──────┐┌─────┐  │
///                       │ │  A  ││  C  │ │                         │ │  A  ││  B   ││  C  │  │
///                       │ │  -  ││  -  │ │                         │ │  -  ││  -   ││  -  │  │
/// ┌──────────────┐      │ │  1  ││ 10  │ │     ┌──────────────┐    │ │  1  ││ NULL ││ 10  │  │
/// │    Input     │      │ │  2  ││ 20  │ │     │   Adapter    │    │ │  2  ││ NULL ││ 20  │  │
/// │    Stream    ├────▶ │ │  3  ││ 30  │ │────▶│    Stream    ├───▶│ │  3  ││ NULL ││ 30  │  │
/// └──────────────┘      │ │  4  ││ 40  │ │     └──────────────┘    │ │  4  ││ NULL ││ 40  │  │
///                       │ └─────┘└─────┘ │                         │ └─────┘└──────┘└─────┘  │
///                       │                │                         │                         │
///                       │  Record Batch  │                         │      Record Batch       │
///                       └────────────────┘                         └─────────────────────────┘
///

接下来看如何实现数据查找的:

代码语言:javascript
复制
fn read_filter(
        &self,
        table_name: &str,
        predicate: &Predicate,
        selection: Selection<'_>,
    ) -> Result<SendableRecordBatchStream, Self::Error> {
         //chunk存在变体,这里就是先判断是什么chunk,有三种MB,RB,ParquetFile
        match self {
            //还是在写入阶段的buffer,暂时不支持查询条件
            Self::MutableBuffer { chunk, .. } => {
                if !predicate.is_empty() {
                    return InternalPredicateNotSupported {
                        predicate: predicate.clone(),
                    }
                    .fail();
                }
                let batch = chunk
                    .read_filter(table_name, selection)
                    .context(MutableBufferChunk)?;

                Ok(Box::pin(MemoryStream::new(vec![batch])))
            }
            //不可写阶段的buffer,对数据进行过滤
            Self::ReadBuffer { chunk, .. } => {
                let rb_predicate =
                    to_read_buffer_predicate(&predicate).context(PredicateConversion)?;
                //读取数据并过滤
                let read_results = chunk
                    .read_filter(table_name, rb_predicate, selection)
                    .context(ReadBufferChunkError {
                        chunk_id: chunk.id(),
                    })?;
                //读取schema信息并过滤
                let schema = chunk
                    .read_filter_table_schema(table_name, selection)
                    .context(ReadBufferChunkError {
                        chunk_id: chunk.id(),
                    })?;
                //ReadFilterResultsStream是对不同的chunk类型实现的读取接口
                Ok(Box::pin(ReadFilterResultsStream::new(
                    read_results,
                    schema.into(),
                )))
            }
            //Parquet同理
            Self::ParquetFile { chunk, .. } => chunk
                .read_filter(table_name, predicate, selection)
                .context(ParquetFileChunkError {
                    chunk_id: chunk.id(),
                }),
        }
    }

数据到了这里就会按照你选择的表名、列名,将数据全部查询出来了。在代码中的predicate,一直是空的,暂时不确定是如何填充的,后面再看。

数据从这里全部查询出来之后,会返回给datafusion框架,继续按照开头写到的过滤器进行过滤,就是遍历一遍数据判断大于、小于或者like等等。

好了查询就先写到这里。

祝玩儿的开心!!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-05-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据库技术研究 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档