前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >时序数据库Influx-IOx源码学习五(创建数据库)

时序数据库Influx-IOx源码学习五(创建数据库)

作者头像
刘涛华
发布2021-04-26 10:50:53
5900
发布2021-04-26 10:50:53
举报

上篇介绍到:InfluxDB-IOx的Run命令启动过程,详情见:https://my.oschina.net/u/3374539/blog/5021654

这章记录一下Database create命令的执行过程。


在第三章命令行中介绍了,所有的子命令都有一个独立的参数或配置称为subcommand

代码语言:javascript
复制
enum Command {
    Convert { // 省略 ...},
    Meta {// 省略 ...},
    Database(commands::database::Config),
    Run(Box<commands::run::Config>),
    Stats(commands::stats::Config),
    Server(commands::server::Config),
    Writer(commands::writer::Config),
    Operation(commands::operations::Config),
}

这章我们打开看一眼commands::database下的config包含了什么。

代码语言:javascript
复制
pub struct Config {
    #[structopt(subcommand)]
    command: Command,
}
//见名知意,基本猜测一下就行了,慢慢使用到再回来看
enum Command {
    Create(Create),
    List(List),
    Get(Get),
    Write(Write),
    Query(Query),
    Chunk(chunk::Config),
    Partition(partition::Config),
}

先来看一下create命令的执行。

代码语言:javascript
复制
Command::Create(command) => {
            //创建一个grpc的client
            let mut client = management::Client::new(connection);
            //设置基本的配置项
            let rules = DatabaseRules {
                //数据库名字
                name: command.name,
                //内存的各种配置,包含缓存大小,时间等等
                lifecycle_rules: Some(LifecycleRules {
                    //省略。。
                }),
                //设置分区的策略
                partition_template: Some(PartitionTemplate {
                    //省略。。
                }),

                 //其它都填充default
                ..Default::default()
            };
            //使用配置信息创建数据库,这里是生成了一个CreateDatabaseRequest去调用了远程服务器的方法
            client.create_database(rules).await?;

            println!("Ok");
        }

在上一章中提到了grpc的启动,这里就涉及到了之前提到的grpc的框架tonic,在tonic中使用#[tonic::async_trait]了标记一个服务器端的实现开始。我在ide中搜索,可以在src/influxdb_ioxd/rpc/management.rs:50行中找到ManagementService相关的实现。

有关tonic更多的资料请阅读:https://github.com/hyperium/tonic

代码语言:javascript
复制
#[tonic::async_trait]
impl<M> management_service_server::ManagementService for ManagementService<M>
where
    M: ConnectionManager + Send + Sync + Debug + 'static,
{
    //省略其它方法。。。

 async fn create_database(
        &self,
        //这里就是接收CreateDatabaseRequest的请求
        request: Request<CreateDatabaseRequest>,
    ) -> Result<Response<CreateDatabaseResponse>, Status> {

         //对数据进行一下校验,然后获得在上面配置的rules规则
        let rules: DatabaseRules = request
            .into_inner()
            .rules
            .ok_or_else(|| FieldViolation::required(""))
            .and_then(TryInto::try_into)
            .map_err(|e| e.scope("rules"))?;

        //这里就是在第三章中提到的server_id,如果没配置就会报错了
        let server_id = match self.server.require_id().ok() {
            Some(id) => id,
            None => return Err(NotFound::default().into()),
        };
        //这里就是真正的去创建,在下面继续跟踪
        match self.server.create_database(rules, server_id).await {
            Ok(_) => Ok(Response::new(CreateDatabaseResponse {})),
            Err(Error::DatabaseAlreadyExists { db_name }) => {
                return Err(AlreadyExists {
                    resource_type: "database".to_string(),
                    resource_name: db_name,
                    ..Default::default()
                }
                .into())
            }
            Err(e) => Err(default_server_error_handler(e)),
        }
    }
}

接下来要继续查看数据库真正的被创建出来,我读到这里存在一个问题,文件格式是什么样子的?

代码语言:javascript
复制
pub async fn create_database(&self, rules: DatabaseRules, server_id: NonZeroU32) -> Result<()> {
        //检查server_id
        self.require_id()?;
        //把数据库名字存储到内存中,最终保存到一个btreemap中
        let db_reservation = self.config.create_db(rules)?;
        //对数据进行持久化保存
        self.persist_database_rules(db_reservation.rules().clone())
            .await?;
        //启动数据库后台线程,在内存中写入数据库状态
        db_reservation.commit(server_id, Arc::clone(&self.store), Arc::clone(&self.exec));

        Ok(())
    }

来解答上面的疑问,文件是怎样持久化、格式是什么样子的。

代码语言:javascript
复制
pub async fn persist_database_rules<'a>(&self, rules: DatabaseRules) -> Result<()> {
        //生成一个新的数据库路径
        let location = object_store_path_for_database_config(&self.root_path()?, &rules.name);
        //序列化DatabaseRules这个pb到byte流
        let mut data = BytesMut::new();
        rules.encode(&mut data).context(ErrorSerializing)?;
        let len = data.len();
        let stream_data = std::io::Result::Ok(data.freeze());
        //将pb的内容进行存储
        self.store
            .put(
                &location,
                futures::stream::once(async move { stream_data }),
                Some(len),
            )
            .await
            .context(StoreError)?;
        Ok(())
    }

这里调用了rules.encode()转换到pb的格式,这里是rust语言的一个方法,实现了From特性的,就得到了一个into的方法,如:impl From<DatabaseRules> for management::DatabaseRules.

到这里数据库的一个描述文件rules.pb就被写入到磁盘中了,路径是启动命令中指定的--data-dir参数路径 + --writer-id + 数据库名字。

例如,我的启动和创建命令为:

代码语言:javascript
复制
./influxdb_iox run --writer-id 1 --object-store file --data-dir ~/influxtest/
./influxdb_iox database create test

那么得到的路径就为:~/influxtest/1/test/rules.pb. 之后可以运行一个pb的脚本来反查rules.pb中的数据内容,如下:

代码语言:javascript
复制
$ ./scripts/prototxt decode influxdata.iox.management.v1.DatabaseRules \
    < ~/influxtest/1/test/rules.pb

influxdata/iox/management/v1/service.proto:6:1: warning: Import google/protobuf/field_mask.proto is unused.
name: "test"
partition_template {
  parts {
    time: "%Y-%m-%d %H:00:00"
  }
}
lifecycle_rules {
  mutable_linger_seconds: 300
  mutable_size_threshold: 10485760
  buffer_size_soft: 52428800
  buffer_size_hard: 104857600
  sort_order {
    order: ORDER_ASC
    created_at_time {
    }
  }
}

看到这里已经知道整个生成过程及文件内容。

祝玩儿的开心。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-04-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据库技术研究 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档