1.引言:为什么选择 TDengine 与 Rust?
TDengine 是一款专为物联网、车联网、工业互联网等时序数据场景优化设计的开源时序数据库,支持高并发写入、高效查询及流式计算,通过“一个数据采集点一张表”与“超级表”的概念显著提升性能。
Rust 作为一门系统级编程语言,近年来在数据库、嵌入式系统、分布式服务等领域迅速崛起,以其内存安全、高性能著称,与 TDengine 的高效特性天然契合,适合构建高可靠、高性能的数据处理系统。
2.TDengine Rust 连接器的设计与架构
TDengine Rust 连接器的主要目标是提供一个高效、安全且易于使用的接口,让开发者能够通过 Rust 语言与 TDengine 数据库进行高效的交互。连接器的设计充分考虑了 Rust 语言的优势,如内存安全、并发处理和高性能,同时确保与 TDengine 数据库之间的通信可靠且高效。
主要模块间的架构如下图:
解释:
taos-query 模块作为核心,负责定义公共接口和数据结构。
taos-optin 模块和 taos-ws 模块分别实现了这些公共接口,负责原生连接和 WebSocket 连接的业务逻辑实现。
taos 模块封装了 taos-optin 和 taos-ws 模块中的实现,同时将 taos-query 模块所定义的公共接口和数据结构予以暴露。
最终,用户只需依赖 taos 模块,即可轻松使用整个系统,而无需了解内部复杂的实现细节。
3.快速入门:从安装到代码开发
在 Cargo.toml 中添加以下内容:
[dependencies] anyhow = "1.0.96" chrono = "0.4.39" serde = "1.0.217" taos = "0.12.3" tokio = "1.43.0"
代码演示,数据写入与查询:
use chrono::{DateTime, Local}; use taos::*; #[tokio::main] async fn main() -> anyhow::Result<()> { // Establish native connection let dsn = "taos://localhost:6030"; let taos = TaosBuilder::from_dsn(dsn)?.build().await?; let db = "power"; // Prepare the database taos.exec_many([ format!("DROP DATABASE IF EXISTS {db}"), format!("CREATE DATABASE {db}"), format!("USE {db}"), ]) .await?; let inserted = taos.exec_many([ // Create a super table "CREATE STABLE meters(ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) \ TAGS (group_id INT, location varchar(20))", // Create a subtable "CREATE TABLE d0 USING meters TAGS(0, 'Beijing Chaoyang')", // Write one record at a time "INSERT INTO d0 VALUES(now, 10.15, 217, 0.33)", // Write Null values "INSERT INTO d0 VALUES(now, NULL, NULL, NULL)", // Automatically create table on insert "INSERT INTO d1 USING meters TAGS(1, 'Beijing Haidian') VALUES(now, 10.1, 119, 0.32)", // Write multiple records at once "INSERT INTO d1 VALUES(now+1s, 10.22, 120, 0.33) (now+2s, 11.23, 121, 0.34) (now+3s, 12.23, 118, 0.32)", ]).await?; assert_eq!(inserted, 6); let mut result = taos.query("SELECT * FROM meters").await?; for field in result.fields() { println!("Get the field: {}", field.name()); } println!(); // Query option 1, using row stream. let mut rows = result.rows(); while let Some(row) = rows.try_next().await? { for (name, value) in row { println!("Get the value of {}: {}", name, value); } println!() } // Query option 2, deserialize using serde. #[derive(Debug, serde::Deserialize)] #[allow(dead_code)] struct Record { // Deserialize timestamp to chrono::DateTime<Local> ts: DateTime<Local>, // Deserialize float to f32 current: Option<f32>, // Deserialize int to i32 voltage: Option<i32>, // Deserialize float to f32 phase: Option<f32>, // Deserialize int to i32 group_id: i32, // Deserialize varchar to String location: String, } let records: Vec<Record> = taos .query("SELECT * FROM meters") .await? .deserialize() .try_collect() .await?; println!("Get records: {:?}", records); Ok(()) }
4.高级功能与最佳实践
4.1 连接池优化:提升高并发性能
频繁创建和销毁数据库连接会带来显著的开销,尤其是在高并发场景(如物联网设备高频上报数据)中。通过连接池复用连接,可减少 TCP 握手、认证等重复操作,提升整体吞吐量。
代码示例:
use taos::*; #[tokio::main] async fn main() -> anyhow::Result<()> { // Create a connection pool let dsn = "taos://localhost:6030"; let pool = TaosBuilder::from_dsn(dsn)?.pool()?; let taos = pool.get().await?; let db = "power"; // Prepare the database taos.exec_many([ format!("DROP DATABASE IF EXISTS {db}"), format!("CREATE DATABASE {db}"), format!("USE {db}"), ]) .await?; let inserted = taos.exec_many([ // Create a super table "CREATE STABLE meters(ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) \ TAGS (group_id INT, location varchar(20))", // Create a subtable "CREATE TABLE d0 USING meters TAGS(0, 'Beijing Chaoyang')", // Write one record at a time "INSERT INTO d0 VALUES(now, 10.15, 217, 0.33)", // Write Null values "INSERT INTO d0 VALUES(now, NULL, NULL, NULL)", // Automatically create table on insert "INSERT INTO d1 USING meters TAGS(1, 'Beijing Haidian') VALUES(now, 10.1, 119, 0.32)", // Write multiple records at once "INSERT INTO d1 VALUES(now+1s, 10.22, 120, 0.33) (now+2s, 11.23, 121, 0.34) (now+3s, 12.23, 118, 0.32)", ]).await?; assert_eq!(inserted, 6); Ok(()) }
4.2 原生连接与 WebSocket 连接
对于原生连接和 WebSocket 连接这两种方式,除了建立连接所使用的 DSN 不同外,其余接口调用没有差异。
代码示例:
use taos::*; #[tokio::main] async fn main() -> anyhow::Result<()> { // let dsn = "taosws://localhost:6041"; // WebSocket connection let dsn = "taos://localhost:6030"; // Native connection let _taos = TaosBuilder::from_dsn(dsn)?.build().await?; Ok(()) }
4.3 同步接口和异步接口
TDengine Rust 连接器的接口分为同步接口和异步接口。通常情况下,同步接口基于异步接口实现,二者的方法签名,除了异步接口多了 async 关键字外,基本一致。
异步接口的代码示例可查阅第 3 章节,同步接口的代码示例如下:
use taos::sync::*; fn main() -> anyhow::Result<()> { // Establish native connection let dsn = "taos://localhost:6030"; let taos = TaosBuilder::from_dsn(dsn)?.build()?; let db = "power"; // Prepare the database taos.exec_many([ format!("DROP DATABASE IF EXISTS {db}"), format!("CREATE DATABASE {db}"), format!("USE {db}"), ])?; let inserted = taos.exec_many([ // Create a super table "CREATE STABLE meters(ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) \ TAGS (group_id INT, location varchar(20))", // Create a subtable "CREATE TABLE d0 USING meters TAGS(0, 'Beijing Chaoyang')", // Write one record at a time "INSERT INTO d0 VALUES(now, 10.15, 217, 0.33)", // Write Null values "INSERT INTO d0 VALUES(now, NULL, NULL, NULL)", // Automatically create table on insert "INSERT INTO d1 USING meters TAGS(1, 'Beijing Haidian') VALUES(now, 10.1, 119, 0.32)", // Write multiple records at once "INSERT INTO d1 VALUES(now+1s, 10.22, 120, 0.33) (now+2s, 11.23, 121, 0.34) (now+3s, 12.23, 118, 0.32)", ])?; assert_eq!(inserted, 6); Ok(()) }
5.结语:开源社区的星辰大海
TDengine Rust 连接器的成长史,是一个典型的技术民主化故事——从社区萌芽到官方支持,从边缘工具到核心生态。它证明了两个事实:
开源协作的效率:在数据库领域,社区贡献者的“需求反哺”比闭门造车更能击中痛点。
Rust 的生态潜力:作为系统级编程语言的 Rust,正在时序数据处理这样的垂直领域开辟新战场。
我们期待你的参与,欢迎提交 PR,一起推动 TDengine Rust 连接器的进步,携手开创更加美好的开源未来!
6.附录
https://docs.taosdata.com/
https://github.com/taosdata/taos-connector-rust
领取专属 10元无门槛券
私享最新 技术干货