StreamLoad 导入方式是 Doris 提供的 HTTP 连接导入方式,适用于大部分的导入场景。
无论是文件、数据流,还是日志、ETL 数据等,都可以借助 StreamLoad 的方式快速入库,同时 Flink-Doris-Connector、Spark-Doris-Connector、SeaTunnel-Doris-Connector、Hop-Doris-Connector 等各类数据流转连接器底层实现,也皆由 StreamLoad 完成。
StreamLoad 可以直接读取 CSV、JSON、Parquet、ORC 等格式的文件,也可以直接在内存中完成数据解析和推送,可以不经过落盘动作完成数据 Load。
在实际开发过程中,使用 StreamLoad 做导入的代码植入还是有一定复杂度的,想进一步规范的用好这种方式更是具有一定挑战性,所以本篇将通过阐述 Doris-StreamLoad-SDK 的设计思路,再辅以一个 Maven 样例工程和一个 SpringBoot 简易导入项目作为 Demo,全面阐释 SDK 设计原理与使用最佳实践。
当前已申请个人 Maven 组织 io.github.freeoneplus
上传打包好的 StreamLoad-Core Jar 包,后续将视情况申请归为 Apache Doris 子项目,以 org.apache.doris
组织名义发布。
SDK源码:https://github.com/FreeOnePlus/doris-streamload-sdk 若对原理无兴趣的同学,可直接跳至最佳实践学习如何引入并使用。
StreamLoad-Core 代码目录如下:
├── src/main/java/org/apache/doris
│ └── streamload
│ └── core
│ ├── IConvertor.java // 数据处理接口
│ ├── StreamLoad.java // 核心类
│ ├── exception
│ │ └── StreamLoadException.java // 异常类
│ ├── input
│ │ └── StreamLoadInputStream.java // 导入数据类型实现类
│ └── params
│ ├── DorisContentParams.java // 连接 Doris 的参数 Bean 类
│ ├── FormatType.java // 可导入数据类型枚举类
│ ├── StreamLoadParams.java // StreamLoad 导入参数类
│ └── StreamLoadResult.java // StreamLoad 返回值封装类
└── pom.xml // POM 文件
整个代码目录比较简单,主要目的是便于使用者可以快速引入,减少封装规范成本,同时增强代码复用性。
下面我们主要介绍一下最主要的三个类的实现逻辑,为了降低学习成本,不贴具体实现代码,只以文字形式描述每个类做了哪些事情:
1.StreamLoad 核心类
public class StreamLoad {
// 构建一个 HttpClientBuilder,且在构建时处理 307 转发诉求
private final HttpClientBuilder httpClientBuilder = HttpClients();
// 构建一个 Gson 对象,且 Gson 对象预制了小驼峰命名转下划线命名的转义策略
private final Gson gson = new GsonBuilder();
public StreamLoad(IConvertor convertor){
// 有参构造器,指定必须传值数据处理接口实现类
}
private String basicAuthHeader(String username, String password) {
// 使用 Base64 做连接 doris 的用户名和密码的加密方法,返回加密密文
}
public StreamLoadResult run(Object data
, DorisContentParams dorisContentParams
, StreamLoadParams streamLoadParams) throws StreamLoadException {
// 根据传入的不同 StreamLoad-Format,选择使用不同 Convertor 数据处理逻辑方法
// 数据处理完成后,交由 doLoad 方法进行数据导入
}
private StreamLoadResult doLoad(Object data
, DorisContentParams dorisContentParams
, StreamLoadParams streamLoadParams) throws StreamLoadException {
// 创建 HttpPut 请求,传入 StreamLoad 导入参数,且处理导入时的 inputStream 数据流
// 将传值好的 Put 请求发送,并接收返回值,且封装为返回实体
// 处理在导入数据时的各类异常
}
}
2.IConvertor 接口
public interface IConvertor {
// 需要实现 Format 为 Csv 时的数据处理逻辑
String convertorToCsv(Object input);
// 需要实现 Format 为 CsvWithNames 时的数据处理逻辑
String convertorToCsvWithNames(Object input);
// 需要实现 Format 为 CsvWithNamesAndTypes 时的数据处理逻辑
String convertorToCsvWithNamesAndTypes(Object input);
// 需要实现 Format 为 Json 时的数据处理逻辑
String convertorToJson(Object input);
}
3.StreamLoadInputStream 类
public class StreamLoadInputStream extends InputStream {
private InputStream innerStream; // 内部InputStream对象
private byte[] buffer; // 用于存储临时数据的缓冲区
private int position; // 缓冲区中的当前位置
private int limit; // 缓冲区中有效数据的上限
// 接收字节数组的构造函数
public StreamLoadInputStream(byte[] data) {}
// 接收字符串的构造函数
public StreamLoadInputStream(String data) {}
// 接收整数的构造函数
public StreamLoadInputStream(int data) {}
// 接收浮点数的构造函数
public StreamLoadInputStream(float data) {}
// 接收布尔值的构造函数
public StreamLoadInputStream(boolean data) {}
// 接收File对象的构造函数
public StreamLoadInputStream(File file) throws IOException {}
// 接收URL对象的构造函数
public StreamLoadInputStream(URL url) throws IOException {}
// 通用读取方法
@Override
public int read() throws IOException {}
// 重写read(byte[] b, int off, int len)方法
@Override
public int read(byte[] b, int off, int len) throws IOException {}
// 重写available()方法
@Override
public int available() throws IOException {}
// 关闭流的方法
@Override
public void close() throws IOException {}
}
该项目更多为简易演示使用代码,仅为帮助理解 SDK 如何使用所用,不作为实际生产使用范本。
https://github.com/FreeOnePlus/doris-streamload-sdk/tree/maven-demo
public class StreamLoadDemo {
// Doris-StreamLoad 连接属性对象
private DorisContentParams dorisContentParams;
// 数据转换实现类
private IConvertor convertor;
// 无参构造初始化两个对象
public StreamLoadDemo() {
this.dorisContentParams = getDorisContentParams();
this.convertor = getConvertor();
}
// 创建一个连接对象并赋值
// FE/BE Host、FE/BE HttpPort、Database、Table、Username、Password
private DorisContentParams getDorisContentParams() {
return new DorisContentParams(
"127.0.0.1", 8030, "demo", "app_log", "root", ""
);
}
// 根据业务逻辑自定义实现数据转换类,数据将使用该转换类完成格式化转换
private IConvertor getConvertor() {
return new IConvertor() {
public String convertorToCsv(Object input) {
List<String> dataList = (List<String>) input;
String data = dataList.stream().collect(Collectors.joining("\n"));
return data;
}
public String convertorToCsvWithNames(Object input) {
return null;
}
public String convertorToCsvWithNamesAndTypes(Object input) {
return null;
}
public String convertorToJson(Object input) {
String data = (String) input;
return data;
}
};
}
// JSON 格式导入示例
public StreamLoadResult loadJsonData() {
// 数据体
String jsonStr = "{\"id\":1,\"name\":\"张三\",\"age\":17}";
try {
// 创建 StreamLoad 对象,并将实现的数据转换接口类传入
StreamLoad streamLoad = new StreamLoad(convertor);
// 运行 run 方法,执行 数据转换->数据加载->返回结果值 的运行流程
// 其中使用 Builder 构造器构造 StreamLoad 任务的运行参数对象并传入参数
StreamLoadResult streamLoadResult = streamLoad.run(jsonStr, dorisContentParams
, new StreamLoadParams.Builder()
.setFormat("json")
.build()
);
return streamLoadResult;
} catch (StreamLoadException e) {
throw new RuntimeException(e);
}
}
// CSV 格式导入示例
public StreamLoadResult loadCsvData() {
// 数据体
List<String> csvList = new ArrayList<>();
csvList.add("2,李四,19");
csvList.add("3,赵六,20");
try {
// 创建 StreamLoad 对象,并将实现的数据转换接口类传入
StreamLoad streamLoad = new StreamLoad(convertor);
// 运行 run 方法,执行 数据转换->数据加载->返回结果值 的运行流程
// 其中使用 Builder 构造器构造 StreamLoad 任务的运行参数对象并传入参数
StreamLoadResult streamLoadResult = streamLoad.run(csvList, dorisContentParams
, new StreamLoadParams.Builder()
.setFormat("csv")
.setColumnSeparator(",")
.build()
);
return streamLoadResult;
} catch (StreamLoadException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
// 创建一个 Demo 演示类实体对象
StreamLoadDemo streamLoadDemo = new StreamLoadDemo();
// 执行 Csv 导入示例
StreamLoadResult csvResult = streamLoadDemo.loadCsvData();
System.out.println(csvResult.toString());
// 执行 Json 导入示例
StreamLoadResult jsonResult = streamLoadDemo.loadJsonData();
System.out.println(jsonResult.toString());
}
}
对应表结构:
CREATE TABLE `app_log` (
`id` bigint(20) NULL COMMENT '用户ID',
`name` varchar(255) NULL COMMENT '用户名',
`age` int NULL COMMENT '用户年龄'
) UNIQUE KEY (`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
<dependency>
<groupId>io.github.freeoneplus</groupId>
<artifactId>streamload-core</artifactId>
<version>0.1.0</version>
</dependency>
该项目更贴合于实际生产环境,但也仅为生产环境演示DEMO。
https://github.com/FreeOnePlus/doris-streamload-sdk/tree/springboot-demo
项目代码结构如下:
└─src/main/java/com/doris
└─streamload
├─ Application.java // SpringBoot 启动类
└─demo
├─beans
│ ├─DataBean.java // 数据封装 Bean 类
│ └─DataValueEnums.java // 数据制造枚举类
├─conf
│ └─DorisConfig.java // 连接 Doris 的 Conf 类
├─controller
│ └─DataLoadController.java // 数据导入 Web Controller 类
└─services
├─ DataLoadService.java // 数据业务逻辑处理接口
└─impl
├─ DataLoadServiceImpl.java // 数据业务逻辑处理实现类
└─convertor
├─ IConvertorImpl.java // Convertor 数据处理逻辑实现类 I
└─ MyConvertorImpl.java // Convertor 数据处理逻辑实现类 II
使用 SpringBoot 模拟一个简易的标准 Web 项目,该项目的代码实现逻辑如下:
/insert/json
为制造 JSON 类型的数据进行灌入,传入变量为 dataSize
,数据类型为 INT 整形。/insert/csv
为制造 CSV 类型的数据进行灌入,传入变量为 dataSize
,数据类型为 INT 整形。整体工程运行逻辑如下:
Services/DataLoadServiceImpl/sourceData()
造数方法制造数据<dependency>
<groupId>io.github.freeoneplus</groupId>
<artifactId>streamload-core</artifactId>
<version>0.1.0</version>
</dependency>
引入后即可直接使用 StreamLoad-SDK 中已编译好的方法和类,一共需要以下几步:
@Override
public String sinkDataWithJSON(int dataSize) {
try {
StreamLoadResult streamLoadResult;
ArrayList<DataBean> dataList = new ArrayList<>();
long beginTime = System.currentTimeMillis();
// 1. 根据 DataSize 进行循环
for (int i = 0; i < dataSize; i++) {
// 2. 运行生成数据的方法,得到一个随机的数据对象
DataBean dataBean = sourceData();
dataList.add(dataBean);
long lastTime = System.currentTimeMillis();
// 3. 控制攒批数量,满足非第一次循环且满足每 10000 条、或者到最后一条数据、或者间隔时间超过10s三个条件其中之一,提交一次
if ((i != 0 && i % 10000 == 0) || (i == dataSize - 1) || lastTime - beginTime >= 10000) {
// 4. 创建实现了 IConvertor 接口的实现类
IConvertor testConvertor = new TestConvertorImpl();
// 5. 创建 Doris 连接参数对象,并传入必须的六个参数
DorisContentParams dorisContentParams = new DorisContentParams(
"127.0.0.1", 8030, "demo", "app_log", "root", "Password"
);
// 6. 提交数据
// 7. 创建 StreamLoad 对象,并传入已实现的接口类
StreamLoad streamLoad = new StreamLoad(testConvertor);
// 8. 使用 Builder 构建 StreamLoadParams 对象,并传入 StreamLoad 参数
StreamLoadParams streamLoadParams = new StreamLoadParams.Builder()
.setFormat("json")
.setFuzzyParse("true")
.setStripOuterArray("true")
.build();
// 9. 执行 run 方法,传入【数据体、Doris 连接参数对象、StreamLoad 参数对象】
streamLoadResult = streamLoad.run(
dataList
, dorisContentParams
, streamLoadParams);
// 10. 处理返回结果值
System.out.println(streamLoadResult.toString());
// 11. 清空单批次导入后的 List 集合,避免数据重复
dataList.clear();
beginTime = lastTime;
}
}
} catch (StreamLoadException e) {
// 这里需要处理运行 StreamLoad 时异常情况
}
return "load data is success!";
}
根据自己业务逻辑,接收并转换传入的数据集或者数据对象,转换加工为指定的数据类型格式
@Override
public String convertorToJson(Object input) {
// Java 中 Bean 类的成员变量应为小驼峰命名规则,而在数据库中应为下划线命名规则
// Gson 提供了默认的由小驼峰可转换为下划线命名规则的策略,根据该策略构建一个 Gson 对象可减少转换工作量
Gson gson = new GsonBuilder()
.setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES)
.create();
// 判断传入对象集合类型是否合规
if (input instanceof List) {
// 强转为指定集合类型
List list = (List) input;
// 判断集合内是否为空,若不为空则判断存在的数据类型是否合规
if (list.size() > 0 && list.get(0) instanceof DataBean){
// 两层都合规则强转为指定的集合格式
List<DataBean> dataBeanList = (List<DataBean>) input;
// 执行 JSON 转换方法,转换为指定 JSON 类型
return gson.toJson(dataBeanList);
}
return null;
}
return null;
}
StreamLoad.run() 方法运行结束后,会得到 StreamLoadResult 结果集,根据结果集的不同 Status,处理不同异常情况,提升业务代码健壮性。
同时应处理 StreamLoad-SDK 抛出的 StreamLoadException,避免运行过程中出现异常后无法自动终止及时报错导致资源浪费和业务不能正常运行。
本篇介绍了如何使用 Apache Doris StreamLoad SDK 进行数据导入开发,并辅以一个 Maven 项目和一个 SpringBoot 项目的 DEMO 作为演示程序。
最大的目的是为了让使用 StreamLoad 开发的同学减轻规范化开发的工作量,只需要引入 Jar 包即可通过几步快速完成 StreamLoad 的攒批导入。
当前着重实现了最常用的 JSON 和 CSV 导入格式,后续版本将陆续推出 Arrow、ORC、Parquet 等各类文件格式,手动/自动化并发导入能力,甚至 File、URL 等类型的数据接入能力,欢迎大家提交 PR 一起共建该子项目。
同时将基于该 SDK 构建更多的各类导入场景例子作为实际开发参考对象,以便于不怎么熟悉 Doris 数据导入的开发同学也可以快速正确的导入数据~
本文分享自 Apache Doris 补习班 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!