本文主要介绍如何从客户端导入本地的数据。
Doris 提供多种数据导入方案,可以针对不同的数据源进行选择不同的数据导入方式。
数据源 | 导入方式 |
---|---|
对象存储(s3),HDFS | 使用Broker导入数据 |
本地文件 | 导入本地数据 |
Kafka | 订阅Kafka数据 |
Mysql、PostgreSQL,Oracle,SQLServer | 通过外部表同步数据 |
通过JDBC导入 | 使用JDBC同步数据 |
导入JSON格式数据 | JSON格式数据导入 |
导入方式名称 | 使用方式 |
---|---|
Spark Load | 通过Spark导入外部数据 |
Broker Load | 通过Broker导入外部存储数据 |
Stream Load | 流式导入数据(本地文件及内存数据) |
Routine Load | 导入Kafka数据 |
Insert Into | 外部表通过INSERT方式导入数据 |
S3 Load | S3协议的对象存储数据导入 |
MySQL Load | MySQL客户端导入本地数据 |
不同的导入方式支持的数据格式略有不同。
导入方式 | 支持的格式 |
---|---|
Broker Load | parquet、orc、csv、gzip |
Stream Load | csv、json、parquet、orc |
Routine Load | csv、json |
MySQL Load | csv |
Doris 的每一个导入作业,不论是使用 Broker Load 进行批量导入,还是使用 INSERT 语句进行单条导入,都是一个完整的事务操作。导入事务可以保证一批次内的数据原子生效,不会出现部分数据写入的情况。
同时,一个导入作业都会有一个 Label。这个 Label 是在一个数据库(Database)下唯一的,用于唯一标识一个导入作业。Label 可以由用户指定,部分导入功能也会由系统自动生成。
Label 是用于保证对应的导入作业,仅能成功导入一次。一个被成功导入的 Label,再次使用时,会被拒绝并报错 Label already used。通过这个机制,可以在 Doris 侧做到 At-Most-Once 语义。如果结合上游系统的 At-Least-Once 语义,则可以实现导入数据的 Exactly-Once 语义。
关于原子性保证的最佳实践,可以参阅 导入事务和原子性。
导入方式分为同步和异步。对于同步导入方式,返回结果即表示导入成功还是失败。而对于异步导入方式,返回成功仅代表作业提交成功,不代表数据导入成功,需要使用对应的命令查看导入作业的运行状态。
目前Doris支持两种从本地导入数据的模式:
Stream Load 用于将本地文件导入到 Doris 中。
Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。
Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。
相比于直接使用 curl 的单并发导入,更推荐使用 专用导入工具 Doris Streamloader 该工具是一款用于将数据导入 Doris 数据库的专用客户端工具,可以提供 多并发导入 的功能,降低大数据量导入的耗时。拥有以下功能:
不同于其他命令的提交方式,Stream Load 是通过 HTTP 协议与 Doris 进行连接交互的。
该方式中涉及 HOST:PORT 应为 HTTP 协议端口。
本文文档我们以 curl 命令为例演示如何进行数据导入。
文档最后,我们给出一个使用 Java 导入数据的代码示例
Stream Load 的请求体如下:
PUT /api/{db}/{table}/_stream_load
1. 创建一张表
通过 CREATE TABLE 命令在demo创建一张表用于存储待导入的数据。具体的导入方式请查阅 CREATE TABLE 命令手册。示例如下:
CREATE TABLE IF NOT EXISTS load_local_file_test
(
id INT,
age TINYINT,
name VARCHAR(50)
)
unique key(id)
DISTRIBUTED BY HASH(id) BUCKETS 3;
2. 导入数据
执行以下 curl 命令导入本地文件:
curl -u user:passwd -H "label:load_local_file_test" -T /path/to/local/demo.txt http://host:port/api/demo/load_local_file_test/_stream_load
关于 Stream Load 命令的更多高级操作,请参阅 Stream Load 命令文档。
3. 等待导入结果
Stream Load 命令是同步命令,返回成功即表示导入成功。如果导入数据较大,可能需要较长的等待时间。示例如下:
{
"TxnId": 1003,
"Label": "load_local_file_test",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 1000000,
"NumberLoadedRows": 1000000,
"NumberFilteredRows": 1,
"NumberUnselectedRows": 0,
"LoadBytes": 40888898,
"LoadTimeMs": 2144,
"BeginTxnTimeMs": 1,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 325,
"WriteDataTimeMs": 1933,
"CommitAndPublishTimeMs": 106,
"ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
}
导入建议 Stream Load 只能导入本地文件。 建议一个导入请求的数据量控制在 1 - 2 GB 以内。如果有大量本地文件,可以分批并发提交。
Java 代码示例 这里通过一个简单的 JAVA 示例来执行 Stream Load:
package demo.doris;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.FileEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/*
这是一个 Doris Stream Load 示例,需要依赖
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
*/
public class DorisStreamLoader {
//可以选择填写 FE 地址以及 FE 的 http_port,但须保证客户端和 BE 节点的连通性。
private final static String HOST = "your_host";
private final static int PORT = 8040;
private final static String DATABASE = "db1"; // 要导入的数据库
private final static String TABLE = "tbl1"; // 要导入的表
private final static String USER = "root"; // Doris 用户名
private final static String PASSWD = ""; // Doris 密码
private final static String LOAD_FILE_NAME = "/path/to/1.txt"; // 要导入的本地文件路径
private final static String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
HOST, PORT, DATABASE, TABLE);
private final static HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
// 如果连接目标是 FE,则需要处理 307 redirect。
return true;
}
});
public void load(File file) throws Exception {
try (CloseableHttpClient client = httpClientBuilder.build()) {
HttpPut put = new HttpPut(loadUrl);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
// 可以在 Header 中设置 stream load 相关属性,这里我们设置 label 和 column_separator。
put.setHeader("label","label1");
put.setHeader("column_separator",",");
// 设置导入文件。
// 这里也可以使用 StringEntity 来传输任意数据。
FileEntity entity = new FileEntity(file);
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new IOException(
String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
}
System.out.println("Get load result: " + loadResult);
}
}
}
private String basicAuthHeader(String username, String password) {
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encoded);
}
public static void main(String[] args) throws Exception{
DorisStreamLoader loader = new DorisStreamLoader();
File file = new File(LOAD_FILE_NAME);
loader.load(file);
}
}
注意:这里 http client 的版本要是4.5.13
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
该语句兼容MySQL标准的LOAD DATA语法,方便用户导入本地数据,并降低学习成本。
MySQL Load 同步执行导入并返回导入结果。用户可直接通过SQL返回信息判断本次导入是否成功。
MySQL Load 主要适用于导入客户端本地文件,或通过程序导入数据流中的数据。
MySQL Load和Stream Load功能相似, 都是导入本地文件到Doris集群中, 因此MySQL Load实现复用了StreamLoad的基础导入能力:
1. 创建一张表
通过 CREATE TABLE 命令在demo创建一张表用于存储待导入的数据
CREATE TABLE IF NOT EXISTS load_local_file_test
(
id INT,
age TINYINT,
name VARCHAR(50)
)
unique key(id)
DISTRIBUTED BY HASH(id) BUCKETS 3;
2. 导入数据 在MySQL客户端下执行以下 SQL 命令导入本地文件:
LOAD DATA
LOCAL
INFILE '/path/to/local/demo.txt'
INTO TABLE demo.load_local_file_test
关于 MySQL Load 命令的更多高级操作,请参阅 MySQL Load 命令文档。
3. 等待导入结果
MySQL Load 命令是同步命令,返回成功即表示导入成功。如果导入数据较大,可能需要较长的等待时间。示例如下:
Query OK, 1 row affected (0.17 sec)
Records: 1 Deleted: 0 Skipped: 0 Warnings: 0
4. 导入建议
智能化实时数据处理与分析
增强数据质量和数据治理
优化性能和降低成本
综上所述,实时数仓与机器学习、大模型的结合将推动数据处理和分析的智能化、高效化和安全化发展。这将为企业提供更准确、更及时的业务洞察和决策支持,助力企业在快速变化的市场环境中保持竞争优势