前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >时序数据库Influx-IOx源码学习七(Chunk的生命周期)

时序数据库Influx-IOx源码学习七(Chunk的生命周期)

作者头像
刘涛华
发布2021-04-26 10:51:59
4780
发布2021-04-26 10:51:59
举报

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。


上一章介绍了数据从客户端写入到服务器端的内存中的整个过程。详情见: https://my.oschina.net/u/3374539/blog/5027429

这一章记录一下数据库中数据管理单元Chunk的生命周期。


在开篇,先介绍一下一个Chunk拥有的生命周期:

代码语言:javascript
复制
//这里需要注意,这些变体里的Chunk结构都是不相同的
//也就是有内存数据拷贝的工作
pub enum ChunkState {
    //内部移动数据时候用的
    Invalid,
    //可以写入
    Open(MBChunk),
    //还能继续写入,但很快会被关闭
    Closing(MBChunk),
    //已经不能写入了,准备移动到readbuffer
    Moving(Arc<MBChunk>),
    //已经被移动到了read buffer
    Moved(Arc<ReadBufferChunk>),
    //准备写入持久化存储
    WritingToObjectStore(Arc<ReadBufferChunk>),
    //写入持久化存储完成
    WrittenToObjectStore(Arc<ReadBufferChunk>, Arc<ParquetChunk>),
}

在第五章中有提到,在Create Database之后,会启动一个后台线程。

该后台线程完成了部分对Chunk的管理功能,通过理解这个后台线程,能够基本理解Chunk的所有生命周期。

代码语言:javascript
复制
    //后台线程的方法入口,在创建完成数据库后,就会调用到这个方法
    pub async fn background_worker(
        self: &Arc<Self>,
        shutdown: tokio_util::sync::CancellationToken,
    ) {
        //创建一个定时器,周期性的执行
        let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
        let mut lifecycle_manager = LifecycleManager::new(Arc::clone(&self));
        //没有收到停止服务器时候的信号就一直执行,1秒一次
        while !shutdown.is_cancelled() {
            //记录执行的次数,每次加1,Ordering::Relaxed代表的单线程里的原子操作
            self.worker_iterations.fetch_add(1, Ordering::Relaxed);
            //进入生命周期的管理
            lifecycle_manager.check_for_work();
            //收到不同信号之后的处理方法
            tokio::select! {
                _ = interval.tick() => {},
                _ = shutdown.cancelled() => break
            }
        }

        info!("finished background worker");
    }

前方高能,请注意:

代码语言:javascript
复制
fn check_for_work(&mut self, now: DateTime<Utc>) {
        //获取创建数据库的时候,对于Chunk的相关配置
        let rules = self.rules();
        //根据配置的排序规则,获取出内存里所有的chunk
        let chunks = self.chunks(&rules.sort_order);

        let mut buffer_size = 0;

        //判断是不是有其他的任务正在执行,move我理解针对于read buffer,write对于持久化
        let mut move_active = self.is_move_active();
        let mut write_active = self.is_write_active();

         //遍历所有块,检查哪些块可以被持久化
        for chunk in &chunks {
            //获取当前chunk的锁
            let chunk_guard = chunk.upgradable_read();
            //获取chunk占用的内存大小
            buffer_size += Self::chunk_size(&*chunk_guard);
            //没有移动任务并且Chunk里最后的写入时间比较老
            let would_move = !move_active && can_move(&rules, &*chunk_guard, now);
            //没有写出任务,并且开启了持久化
            let would_write = !write_active && rules.persist;

            //判断chunk的生命周期
            match chunk_guard.state() {
                 //属于open状态,并且是需要移动的(上面的逻辑里有展示什么是需要移动的)
                 //这里我理解就是相当于实时写入时候的一个补充方案
                 //试想,如果一个chunk一直不写入数据,可能有一年了,查询都不再用这些数据了,内存却被一直占用
                ChunkState::Open(_) if would_move => {
                    let mut chunk_guard = RwLockUpgradableReadGuard::upgrade(chunk_guard);
                    //切换状态到closing
                    chunk_guard.set_closing().expect("cannot close open chunk");
                    let partition_key = chunk_guard.key().to_string();
                    let chunk_id = chunk_guard.id();
                    std::mem::drop(chunk_guard);
                    move_active = true;
                    //移动到read_buffer,变为不可写入状态(启动了一个异步的线程,后面看)
                    self.move_to_read_buffer(partition_key, chunk_id);
                }
                //这里有几种情况,同样会在别处触发为closing
                //例如:chunk大小超过了设置的可变内存大小的时候
                ChunkState::Closing(_) if would_move => {
                    let partition_key = chunk_guard.key().to_string();
                    let chunk_id = chunk_guard.id();
                    std::mem::drop(chunk_guard);
                    move_active = true;
                    //移动到read_buffer
                    self.move_to_read_buffer(partition_key, chunk_id);
                }
                //已经被挪动到readbuffer中的
                ChunkState::Moved(_) if would_write => {
                    let partition_key = chunk_guard.key().to_string();
                    let chunk_id = chunk_guard.id();
                    std::mem::drop(chunk_guard);
                    write_active = true;
                    //写入到对象存储
                    self.write_to_object_store(partition_key, chunk_id);
                }
                _ => {}
            }
        }
        //这里是主要检查内存限制的逻辑,当所有chunk的大小超过限制的时候就要清理Chunk
        if let Some(soft_limit) = rules.buffer_size_soft {
            let mut chunks = chunks.iter();

            while buffer_size > soft_limit.get() {
                match chunks.next() {
                    Some(chunk) => {
                        //获取读锁
                        let chunk_guard = chunk.read();
                        //如果配置了可以清理未持久化数据,那么处在read_buffer里的数据也会被清理
                        //一定会清理已经被持久化到对象存储上的数据
                        if (rules.drop_non_persisted
                            && matches!(chunk_guard.state(), ChunkState::Moved(_)))
                            || matches!(chunk_guard.state(), ChunkState::WrittenToObjectStore(_, _))
                        {
                            let partition_key = chunk_guard.key().to_string();
                            let chunk_id = chunk_guard.id();
                            buffer_size =
                                buffer_size.saturating_sub(Self::chunk_size(&*chunk_guard));
                            std::mem::drop(chunk_guard);
                            //真真正正的删除逻辑后面看
                            self.drop_chunk(partition_key, chunk_id)
                        }
                    }
                    //没有什么可以释放的了
                    None => {
                        warn!(db_name=self.db_name(), soft_limit, buffer_size,
                              "soft limited exceeded, but no chunks found that can be evicted. Check lifecycle rules");
                        break;
                    }
                }
            }
        }
    }

这里基本看清楚了Chunk的周期:

  1. 在写入时候,如果没有Chunk就会open一个,并处在open状态。
  2. 如果写入超过了一些限制,就会被标记为closing;如果数据时间超过了配置的时间,也会被标记为closing。标记为closing的会添加一个后台进程,准备将Chunk移动到read_buffer中。
  3. 后台任务启动后,会标记为moving状态,此时禁止Chunk再写入任何数据。
  4. 一旦移动完成,会被标记为moved
  5. 程序会对moved状态下的Chunk开始进行持久化。
  6. 扫描任务会不断判断内存使用是否超过了限制,如果超过限制,会清理已经持久化的Chunk。如果配置了drop_non_persisted,会把read_buffer中未持久化的也删除掉。

然后继续看程序是怎样将一个chunk移动到read_buffer的,因为篇幅的影响,将会在下一篇介绍数据是怎样真正写入到持久化存储当中的。

代码语言:javascript
复制
pub async fn load_chunk_to_read_buffer(
        &self,
        partition_key: &str,
        chunk_id: u32,
    ) -> Result<Arc<DbChunk>> {
        //根据partition_key及chunk_id获取内存中存储的Chunk
        let chunk = {
            let partition = self
                .catalog
                .valid_partition(partition_key)
                .context(LoadingChunk {
                    partition_key,
                    chunk_id,
                })?;
            let partition = partition.read();

            partition.chunk(chunk_id).context(LoadingChunk {
                partition_key,
                chunk_id,
            })?
        };

        //设置当前的Chunk为Moving状态
        let mb_chunk = {
            let mut chunk = chunk.write();
            chunk.set_moving().context(LoadingChunk {
                partition_key,
                chunk_id,
            })?
        };
        info!(%partition_key, %chunk_id, "chunk marked MOVING, loading tables into read buffer");
        let mut batches = Vec::new();
        //这里是拿到Chunk中每个Cloumn的统计信息,分别是min,max,count
        let table_stats = mb_chunk.table_summaries();

        //从新创建一个ReadBufferChunk,后面准备把所有数据都拷贝到这里
        //还需要告诉内存管理这里新申请了多少空间
        let rb_chunk =
            ReadBufferChunk::new_with_memory_tracker(chunk_id, &self.memory_registries.read_buffer);

        for stats in table_stats {
            //把内存中的数据,全部重新拷贝一次,转换为arrow格式
            mb_chunk
                .table_to_arrow(&mut batches, &stats.name, Selection::All)
                //这里应该是还没有写完,如果出现错误,这个Chunk该怎么处理?
                .expect("Loading chunk to mutable buffer");
            //循环拷贝
            for batch in batches.drain(..) {
                rb_chunk.upsert_table(&stats.name, batch)
            }
        }

        let mut chunk = chunk.write();
        //更新写入缓存里的Chunk为Moved状态,同时Chunk内容修改为了ReadBuffer的Chunk
        //对于Chunk的结构后面看
        chunk.set_moved(Arc::new(rb_chunk)).context(LoadingChunk {
            partition_key,
            chunk_id,
        })?;

        //工作全部都完成了,调用做快照的方法,方法里什么都没做,返回新Chunk的一个Arc指针
        Ok(DbChunk::snapshot(&chunk))
    }

到这里基本清楚了整个Chunk的工作方式,因为Chunk这个名字被代码中重复使用到了,所以特意在文章末尾说一下都有什么Chunk

代码语言:javascript
复制
//主要是存储一个数据块的描述信息,名字、最后写入时间等
Server::db::catalog::chunk
//数据从客户端直接写入的内存块
mutable_buffer::chunk
//在moving时候拷贝的新数据块,arrow结构
read_buffer::chunk
//parquet对应的chunk
parquet_file::chunk
//query模块下对PartitionChunk重新命名了一下
//对于相同的partition key的数据抽象的行为
query -> type Chunk: PartitionChunk;
//实现PartitionChunk定义的方法,对不同位置下的chunk的操作
//如ParquetFile、MutableBuffer等
server::db::chunk

好了就到这里,希望你也学到了很多

祝玩儿的开心

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据库技术研究 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档