使用 SDK 数据写入

最近更新时间:2025-07-03 10:30:56

我的收藏

概述

数据湖计算 DLC 提供用户通过 Java SDK 数据导入。

应用场景

通过 Java SDK 将源数据库的增量变更同步到 DLC 原生表,完成源数据入湖。
适用场景:
1. 需要实时处理数据流的业务场景。
2. 熟悉 Java 并需要自定义逻辑处理的开发人员。
使用限制:
1. 不建议并发场景使用。如果使用并发场景,最大并发不超过20。
2. 数据查询存在分钟级延迟。
3. 使用环境通常为 JVM 业务环境,不建议在 spark 和 flink 等分布式框架中使用。

前置条件

正确开通 DLC,已完成用户权限配置,开通托管存储。
正确创建 DLC 数据库。
正确配置 DLC 数据库数据优化,详细配置请参见开启数据优化

操作步骤

步骤1:下载依赖 Jar

请手动下载依赖 SDK jar 包 dlc-bridage.jar

步骤2:编写 Java 代码


package org.example;
// 建表 create table ingest_stream(id int,name string);

import com.gotocompany.depot.config.DLCSinkConfig;
import com.gotocompany.depot.dlc.DLCSinkFactory;
import com.gotocompany.depot.dlc.models.SinkRecord;
import com.gotocompany.depot.dlc.models.SinkRecordPool;
import com.tencent.dlc.Configuration;
import com.tencent.dlc.DlcClient;
import com.tencent.dlc.RowStream;

import java.io.IOException;
import java.sql.SQLException;

public class RealtimeStreamDemo {
public static void main(String[] args) throws Exception {

String secretId ="";
String secretKey ="";
String endpoint = "dlc.tencentcloudapi.com";
String database ="db1";
String region ="ap-chongqing";
String table="table";
DlcClient client = DlcClient.newBuilder()
.endpoint(endpoint)
.secretId(secretId)
.secretKey(secretKey)
.region(region)
.build();

Configuration config = new Configuration();
config.set("SINK_DLC_COMMIT_INTERVAL_MS","10000");
config.set("SINK_DLC_DATABASE_NAME",database);
config.set("SINK_DLC_TABLE_NAME",table);
config.set("SINK_DLC_CLIENT_ID",secretId);
config.set("SINK_DLC_AUTO_COMMIT_ENABLED","false");
DLCSinkConfig sinkConfig = config.buildSinkConfig(region,secretId,secretKey,false);
DLCSinkFactory dlcSinkFactory = new DLCSinkFactory(sinkConfig,null);
dlcSinkFactory.init();
// 直接通过 Lambda 定义任务
RowStream stream = client.newRealtimeStreamBuilder()
.operate(RowStream.RealTimeOperate.INSERT_ONLY)
.sinkFactory(dlcSinkFactory)
.build();
taskRun(stream);
}

private static void taskRun(RowStream stream) throws IOException, SQLException {
for (int t = 0; t < 100; t++) {
SinkRecord row = SinkRecordPool.borrowObject();
row.setRow("name", "sss");
row.setRow("age", 12);
stream.apply(row);
SinkRecordPool.returnObject(row);
}
stream.flush();
}


}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>com.tencent.cloud.dlc</groupId>
<artifactId>dlc-bridage</artifactId>
<version>1.0</version>
</dependency>

</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

步骤3:DLC 新建目标表

新建目标表详情可参见 DLC 原生表操作配置

步骤4:执行 SDK 程序

登录 DLC 控制台,单击数据探索,查询目标表数据。