前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >时序数据库Influx-IOx源码学习八(Chunk持久化)

时序数据库Influx-IOx源码学习八(Chunk持久化)

作者头像
刘涛华
发布2021-04-28 11:09:22
3510
发布2021-04-28 11:09:22
举报

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。


上一章介绍了Chunk是怎样被管理的,以及各个阶段的操作。详情见: https://my.oschina.net/u/3374539/blog/5029926

这一章记录一下Chunk是怎样持久化的。

代码语言:javascript
复制
ChunkState::Moved(_) if would_write => {
                    let partition_key = chunk_guard.key().to_string();
                    let table_name = chunk_guard.table_name().to_string();
                    let chunk_id = chunk_guard.id();
                    std::mem::drop(chunk_guard);
                    write_active = true;
                    //处于Moved状态下的Chunk会调用write_to_object_store方法进行持久化
                    self.write_to_object_store(partition_key, table_name, chunk_id);
                }

//write_to_object_store实际调用到write_chunk_to_object_store_in_background方法来进行持久化
pub fn write_chunk_to_object_store_in_background(
        self: &Arc<Self>,
        partition_key: String,
        table_name: String,
        chunk_id: u32,
    ) -> TaskTracker<Job> {
        //获取数据库名称
        let name = self.rules.read().name.clone();
        //新建一个后台任务的管理器,用来记录db中都在执行哪些任务及状态,
        let (tracker, registration) = self.jobs.register(Job::WriteChunk {
            db_name: name.to_string(),
            partition_key: partition_key.clone(),
            table_name: table_name.clone(),
            chunk_id,
        });

        let captured = Arc::clone(&self);
        //异步写入
        let task = async move {
            let result = captured
                //真正的写入方法
                .write_chunk_to_object_store(&partition_key, &table_name, chunk_id)
                .await;
            if let Err(e) = result {
                info!(?e, %name, %partition_key, %chunk_id, "background task error loading object store chunk");
                return Err(e);
            }
            Ok(())
        };

        tokio::spawn(task.track(registration));

        tracker
    }

后面的方法有点儿长,希望能够耐心观看。。

代码语言:javascript
复制
  pub async fn write_chunk_to_object_store(
        &self,
        partition_key: &str,
        table_name: &str,
        chunk_id: u32,
    ) -> Result<Arc<DbChunk>> {
        //从catalog中取回chunk
        let chunk = {
            //先找partition
            let partition =
                self.catalog
                    .valid_partition(partition_key)
                    .context(LoadingChunkToParquet {
                        partition_key,
                        table_name,
                        chunk_id,
                    })?;
            let partition = partition.read();
            //从partition里根据表名和chunk_id拿到chunk
            partition
                .chunk(table_name, chunk_id)
                .context(LoadingChunkToParquet {
                    partition_key,
                    table_name,
                    chunk_id,
                })?
        };


        let rb_chunk = {
            //先加写锁
            let mut chunk = chunk.write();
            //修改Chunk的状态为WritingToObjectStore
            chunk
                .set_writing_to_object_store()
                .context(LoadingChunkToParquet {
                    partition_key,
                    table_name,
                    chunk_id,
                })?
        };

        //获取所有Chunk下所有表的Statistics信息
        let table_stats = rb_chunk.table_summaries();

        //创建一个parquet Chunk,这个在上一章里有提到各种Chunk类型
        let mut parquet_chunk = Chunk::new(
            partition_key.to_string(),
            chunk_id,
            //用来统计parquet占用的内存
            self.memory_registries.parquet.as_ref(),
        );
        //创建一个Storage结构,使用的是启动数据库时候指定的存储类型,这个在第3章里有提到
        let storage = Storage::new(
            Arc::clone(&self.store),
            self.server_id,
            self.rules.read().name.to_string(),
        );
        //遍历所有表的统计数据
        for stats in table_stats {
            //构建一个空的查询,也就是 select * from table,不加where
            let predicate = read_buffer::Predicate::default();

            //从rb_chunk筛选数据, Selection::All代表所有列,predicate代表没有where条件
            //意思就是 `stats` 指向的单个表内的所有数据
            let read_results = rb_chunk
                .read_filter(stats.name.as_str(), predicate, Selection::All)
                .context(ReadBufferChunkError {
                    table_name,
                    chunk_id,
                })?;
            //再拿出来schema信息,因为arrow是分开存的,所以需要拿两次
            let arrow_schema: ArrowSchemaRef = rb_chunk
                .read_filter_table_schema(stats.name.as_str(), Selection::All)
                .context(ReadBufferChunkSchemaError {
                    table_name,
                    chunk_id,
                })?
                .into();
            //再拿出来这个表里的最大最小的时间
            //这个是从readBuffer::Column::from里完成的最大最小时间统计
            //也就是当从mutbuffer转移到readbuffer的时候
            let time_range = rb_chunk.table_time_range(stats.name.as_str()).context(
                ReadBufferChunkTimestampError {
                    table_name,
                    chunk_id,
                },
            )?;
            //创建一个ReadFilterResultsStream
            //官方文档里面说的是这是一个转变ReadFilterResults为异步流的适配器
            let stream: SendableRecordBatchStream = Box::pin(
                streams::ReadFilterResultsStream::new(read_results, Arc::clone(&arrow_schema)),
            );

            // 写到持久化存储当中
            let path = storage
                .write_to_object_store(
                    partition_key.to_string(),
                    chunk_id,
                    stats.name.to_string(),
                    stream,
                )
                .await
                .context(WritingToObjectStore)?;

            // 这里就是把写入parquet的摘要信息存储在内存中
            let schema = Arc::clone(&arrow_schema)
                .try_into()
                .context(SchemaConversion)?;
            let table_time_range = time_range.map(|(start, end)| TimestampRange::new(start, end));
            parquet_chunk.add_table(stats, path, schema, table_time_range);
        }

        //对`catlog::chunk`加写锁,然后更新这个chunk的状态为WrittenToObjectStore
        let mut chunk = chunk.write();
        let parquet_chunk = Arc::clone(&Arc::new(parquet_chunk));
        chunk
            .set_written_to_object_store(parquet_chunk)
            .context(LoadingChunkToParquet {
                partition_key,
                table_name,
                chunk_id,
            })?;
        //包装`catlog::chunk`为`ParquetChunk`
        Ok(DbChunk::snapshot(&chunk))
    }

这里面看起来有点儿绕,不容易理解的就是chunk.set_written_to_object_store这种方法。

因为Rust中enum是存在变种的,所以基于这种特性,虽然都是Chunk,但是存储的内容变化了。

代码语言:javascript
复制
pub enum ChunkState {
    ....省略
    //这里就是mutbuffer里的chunk
    Moving(Arc<MBChunk>),
    //这里就变成存储的readbuffer的chunk结构
    Moved(Arc<ReadBufferChunk>),
    //这里又开始存储ParquetChunk结构
    WrittenToObjectStore(Arc<ReadBufferChunk>, Arc<ParquetChunk>),
}

还需要继续查看storage.write_to_object_store这个逻辑,这里涉及到了从memarrow结构转为Parquet结构,就不在文章中展示了,使用的是arrowArrowWriter直接转换的。

代码语言:javascript
复制
    //这里直接跳跃到ObjectStore的put方法里,来看怎么组织的写入
    async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()>
    where
        S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
    {
        use ObjectStoreIntegration::*;
        //匹配启动时候配置的存储方式,转到真正的实现去,这里只看文件的
        match (&self.0, location) {
             ...省略

           //文件存储 
            (File(file), path::Path::File(location)) => file
                .put(location, bytes, length)
                .await
                .context(FileObjectStoreError)?,
            _ => unreachable!(),
        }

        Ok(())
    }

//为File实现了ObjectStoreApi trait,相当于文件存储时候的实际实现
async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()>
    where
        S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
    {
        //读取之前ReadFilterResultsStream里的所有数据到content里
        let content = bytes
            .map_ok(|b| bytes::BytesMut::from(&b[..]))
            .try_concat()
            .await
            .context(UnableToStreamDataIntoMemory)?;
        //这里就是一个验证长度否则报错DataDoesNotMatchLength。宏编程,不用关注
        if let Some(length) = length {
            ensure!(
                content.len() == length,
                DataDoesNotMatchLength {
                    actual: content.len(),
                    expected: length,
                }
            );
        }
        //获取文件路径,就是启动时候配置的根路径加上数据路径
        let path = self.path(location);
        //创建这个文件出来
        let mut file = match fs::File::create(&path).await {
            Ok(f) => f,
            //如果是没有找到父路径,那就从新创建一次
            Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
                let parent = path
                    .parent()
                    .context(UnableToCreateFile { path: &path, err })?;
                fs::create_dir_all(&parent)
                    .await
                    .context(UnableToCreateDir { path: parent })?;

                match fs::File::create(&path).await {
                    Ok(f) => f,
                    Err(err) => return UnableToCreateFile { path, err }.fail(),
                }
            }
            //否则就失败了
            Err(err) => return UnableToCreateFile { path, err }.fail(),
        };
        //这里就是拷贝所有数据到这个文件中去
        tokio::io::copy(&mut &content[..], &mut file)
            .await
            .context(UnableToCopyDataToFile)?;
        //大功告成
        Ok(())
    }

这个写入的逻辑比较庞大了,但是基本也能捋清楚。

  1. 先写入mutBuffer,写到一定大小会关闭
  2. 异步线程来监控是不是该关掉mutBuffer
  3. 生命周期的转换,然后开始写入readBuffer
  4. 之后开始异步的写入持久化存储
  5. 检查内存是不是需要清理readbuffer

大概就这些。源代码中还有很多逻辑没有完成,比如WAL。先整体看完流程再回来看遗漏的,留给Influx写更多完整逻辑的时间。

祝玩儿的开心。


本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据库技术研究 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档