前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >时序数据库Influx-IOx源码学习六-1(数据写入之分区)

时序数据库Influx-IOx源码学习六-1(数据写入之分区)

作者头像
刘涛华
发布2021-04-26 10:51:14
4400
发布2021-04-26 10:51:14
举报

上一章说到如何创建一个数据库,并且数据库的描述信息是如何保存的。详情见:https://my.oschina.net/u/3374539/blog/5025128

这一章记录一下,数据是如何写入并保存的,具体会分为两篇来写:

  • 一篇介绍分区是如何完成的
  • 一篇介绍具体的写入

说到数据写入,必然是需要能够连接到服务器。IOx项目为提供了多种方式可以与服务器进行交互,分别是GrpcHttp基于这两种通信方式,又扩展支持了influxdb2_client以及influxdb_iox_client

基于influxdb_iox_client我写了一个数据写入及查询的示例来观测接口是如何组织的,代码如下:

代码语言:javascript
复制
#[tokio::main]
async fn main() {
    {
        let connection = Builder::default()
            .build("http://127.0.0.1:8081")
            .await
            .unwrap();
        write::Client::new(connection)
            .write("a", r#"myMeasurement,tag1=value1,tag2=value2 fieldKey="123" 1556813561098000000"#)
            .await
            .expect("failed to write data");
    }

    let connection = Builder::default()
        .build("http://127.0.0.1:8081")
        .await
        .unwrap();

    let mut query = flight::Client::new(connection)
        .perform_query("a", "select * from myMeasurement")
        .await
        .expect("query request should work");

    let mut batches = vec![];

    while let Some(data) = query.next().await.expect("valid batches") {
        batches.push(data);
    }

    let format1 = format::QueryOutputFormat::Pretty;
    println!("{}", format1.format(&batches).unwrap());
}


+------------+--------+--------+-------------------------+
| fieldKey   | tag1   | tag2   | time                    |
+------------+--------+--------+-------------------------+
| 123        | value1 | value2 | 2019-05-02 16:12:41.098 |
| 123        | value1 | value2 | 2019-05-02 16:12:41.098 |
| fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 |
| fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 |
| 123        | value1 | value2 | 2019-05-02 16:12:41.098 |
+------------+--------+--------+-------------------------+

因为我多运行了几次,所以能看到数据被重复插入了。

这里还需要说一下的是写入的语句格式可以参见:

[LineProtocol] https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format


write::Client中的write方法生成了一个WriteRequest结构,并使用RPC调用远程的write方法。打开src/influxdb_ioxd/rpc/write.rs : 22行可以看到方法的具体实现。

代码语言:javascript
复制
async fn write(
        &self,
        request: tonic::Request<WriteRequest>,
    ) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
        let request = request.into_inner();
        //得到上面在客户端中写入的数据库名字,在上面的例子中传入的"a"
        let db_name = request.db_name;
        //这里得到了写入的LineProtocol
        let lp_data = request.lp_data;
        let lp_chars = lp_data.len();
        //解析LineProtocol的内容
        //示例中的lp会被解析为:
        //measurement: "myMeasurement"
        //tag_set: [("tag1", "value1"), ("tag2", "value2")]
        //field_set: [("fieldKey", "123")]
        //timestamp: 1556813561098000000
        let lines = parse_lines(&lp_data)
            .collect::<Result<Vec<_>, influxdb_line_protocol::Error>>()
            .map_err(|e| FieldViolation {
                field: "lp_data".into(),
                description: format!("Invalid Line Protocol: {}", e),
            })?;

        let lp_line_count = lines.len();
        debug!(%db_name, %lp_chars, lp_line_count, "Writing lines into database");
        //对数据进行保存
        self.server
            .write_lines(&db_name, &lines)
            .await
            .map_err(default_server_error_handler)?;
        //返回成功
        let lines_written = lp_line_count as u64;
        Ok(Response::new(WriteResponse { lines_written }))
    }

继续看self.server.write_lines的执行:

代码语言:javascript
复制
 pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> {
        self.require_id()?;
        //验证一下名字,然后拿到之前创建数据库时候在内存中存储的相关信息
        let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?;
        let db = self
            .config
            .db(&db_name)
            .context(DatabaseNotFound { db_name: &*db_name })?;
        //这里就开始执行分片相关的策略
        let (sharded_entries, shards) = {
            //读取创建数据库时候配置的分片策略
            let rules = db.rules.read();
            let shard_config = &rules.shard_config;
            //根据数据和shard策略,把逐个数据对应的分区找到
            //写入到一个List<分区标识,List<数据>>这样的结构中
            //具体的结构信息后面看
            let sharded_entries = lines_to_sharded_entries(lines, shard_config.as_ref(), &*rules)
                .context(LineConversion)?;
            //再把所有分区的配置返回给调用者
            let shards = shard_config
                .as_ref()
                .map(|cfg| Arc::clone(&cfg.shards))
                .unwrap_or_default();

            (sharded_entries, shards)
        };

        //根据上面返回的集合进行map方法遍历,写到每个分区中
        futures_util::future::try_join_all(
            sharded_entries
                .into_iter()
                .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)),
        )
        .await?;

        Ok(())
    }

这里描述了写入一条数据的主逻辑:数据写入的时候,先把数据划分到具体的分区里(使用List结构存储下所有的分区对应的数据),然后并行的进行数据写入

接下来看,数据是如何进行分区的:

代码语言:javascript
复制
pub fn lines_to_sharded_entries(
    lines: &[ParsedLine<'_>],
    sharder: Option<&impl Sharder>,
    partitioner: &impl Partitioner,
) -> Result<Vec<ShardedEntry>> {
    let default_time = Utc::now();
    let mut sharded_lines = BTreeMap::new();

    //对所有要插入的数据进行遍历
    for line in lines {
        //先找到符合哪个shard
        let shard_id = match &sharder {
            Some(s) => Some(s.shard(line).context(GeneratingShardId)?),
            None => None,
        };
        //再判断属于哪个分区
        let partition_key = partitioner
            .partition_key(line, &default_time)
            .context(GeneratingPartitionKey)?;
        let table = line.series.measurement.as_str();
        //最后存储到一个map中
        //shard-> partition -> table -> List<data> 的映射关系
        sharded_lines
            .entry(shard_id)
            .or_insert_with(BTreeMap::new)
            .entry(partition_key)
            .or_insert_with(BTreeMap::new)
            .entry(table)
            .or_insert_with(Vec::new)
            .push(line);
    }

    let default_time = Utc::now();
    //最后遍历这个map 转换到之前提到的List结构中
    let sharded_entries = sharded_lines
        .into_iter()
        .map(|(shard_id, partitions)| build_sharded_entry(shard_id, partitions, &default_time))
        .collect::<Result<Vec<_>>>()?;
    Ok(sharded_entries)
}

这里理解shard的概念就是一个或者一组机器,称为一个shard,他们负责真正的存储数据。

partition理解为一个个文件夹,在shard上具体的存储路径。

这里看一下是怎样完成shard的划分的:

代码语言:javascript
复制
impl Sharder for ShardConfig {
    fn shard(&self, line: &ParsedLine<'_>) -> Result<ShardId, Error> {
        if let Some(specific_targets) = &self.specific_targets {
            //如果对数据进行匹配,如果符合规则就返回,可以采用当前的shard
            //官方的代码中只实现了根据表名进行shard的策略
            //这个配置似乎只能通过grpc来进行设置,这样好处可能是将来有个什么管理界面能动态修改
            if specific_targets.matcher.match_line(line) {
                return Ok(specific_targets.shard);
            }
        }
        //如果没有配置就使用hash的方式
        //对整条数据进行hash,然后比较机器的hash,找到合适的节点
        //如果没找到,就放在hashring的第一个节点
        //hash算法见后面
        if let Some(hash_ring) = &self.hash_ring {
            return hash_ring
                .shards
                .find(LineHasher { line, hash_ring })
                .context(NoShardsDefined);
        }

        NoShardingRuleMatches {
            line: line.to_string(),
        }
        .fail()
    }
}
//具体的Hash算法,如果全配置的话分的就会特别散,几乎不同测点都放到了不同的地方
impl<'a, 'b, 'c> Hash for LineHasher<'a, 'b, 'c> {
    fn hash<H: Hasher>(&self, state: &mut H) {
        //如果配置了使用table名字就在hash中加入tablename
        if self.hash_ring.table_name {
            self.line.series.measurement.hash(state);
        }
        //然后按照配置的列的值进行hash
        for column in &self.hash_ring.columns {
            if let Some(tag_value) = self.line.tag_value(column) {
                tag_value.hash(state);
            } else if let Some(field_value) = self.line.field_value(column) {
                field_value.to_string().hash(state);t
            }
            state.write_u8(0); // column separator
        }
    }
}

接下来看默认的partition分区方式:

代码语言:javascript
复制
impl Partitioner for PartitionTemplate {
    fn partition_key(&self, line: &ParsedLine<'_>, default_time: &DateTime<Utc>) -> Result<String> {
        let parts: Vec<_> = self
            .parts
            .iter()
             //匹配分区策略,或者是单一的,或者是复合的
             //目前支持基于表、值、时间
             //其余还会支持正则表达式和strftime模式
            .map(|p| match p {
                TemplatePart::Table => line.series.measurement.to_string(),
                TemplatePart::Column(column) => match line.tag_value(&column) {
                    Some(v) => format!("{}_{}", column, v),
                    None => match line.field_value(&column) {
                        Some(v) => format!("{}_{}", column, v),
                        None => "".to_string(),
                    },
                },
                TemplatePart::TimeFormat(format) => match line.timestamp {
                    Some(t) => Utc.timestamp_nanos(t).format(&format).to_string(),
                    None => default_time.format(&format).to_string(),
                },
                _ => unimplemented!(),
            })
            .collect();
        //最后返回一个组合文件名,或者是 a-b-c 或者是一个单一的值
        Ok(parts.join("-"))
    }
}

到这里分区的工作就完成了,下一篇继续分析是怎样写入的。

祝玩儿的开心

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据库技术研究 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档