概述
数据湖计算 DLC 提供用户通过 Java SDK 数据导入。
应用场景
通过 Java SDK 将源数据库的增量变更同步到 DLC 原生表,完成源数据入湖。
适用场景:
1. 需要实时处理数据流的业务场景。
2. 熟悉 Java 并需要自定义逻辑处理的开发人员。
使用限制:
1. 不建议并发场景使用。如果使用并发场景,最大并发不超过20。
2. 数据查询存在分钟级延迟。
3. 使用环境通常为 JVM 业务环境,不建议在 spark 和 flink 等分布式框架中使用。
前置条件
正确开通 DLC,已完成用户权限配置,开通托管存储。
正确创建 DLC 数据库。
正确配置 DLC 数据库数据优化,详细配置请参见开启数据优化。
操作步骤
步骤1:下载依赖 Jar
步骤2:编写 Java 代码
package org.example;// 建表 create table ingest_stream(id int,name string);import com.gotocompany.depot.config.DLCSinkConfig;import com.gotocompany.depot.dlc.DLCSinkFactory;import com.gotocompany.depot.dlc.models.SinkRecord;import com.gotocompany.depot.dlc.models.SinkRecordPool;import com.tencent.dlc.Configuration;import com.tencent.dlc.DlcClient;import com.tencent.dlc.RowStream;import java.io.IOException;import java.sql.SQLException;public class RealtimeStreamDemo {public static void main(String[] args) throws Exception {String secretId ="";String secretKey ="";String endpoint = "dlc.tencentcloudapi.com";String database ="db1";String region ="ap-chongqing";String table="table";DlcClient client = DlcClient.newBuilder().endpoint(endpoint).secretId(secretId).secretKey(secretKey).region(region).build();Configuration config = new Configuration();config.set("SINK_DLC_COMMIT_INTERVAL_MS","10000");config.set("SINK_DLC_DATABASE_NAME",database);config.set("SINK_DLC_TABLE_NAME",table);config.set("SINK_DLC_CLIENT_ID",secretId);config.set("SINK_DLC_AUTO_COMMIT_ENABLED","false");DLCSinkConfig sinkConfig = config.buildSinkConfig(region,secretId,secretKey,false);DLCSinkFactory dlcSinkFactory = new DLCSinkFactory(sinkConfig,null);dlcSinkFactory.init();// 直接通过 Lambda 定义任务RowStream stream = client.newRealtimeStreamBuilder().operate(RowStream.RealTimeOperate.INSERT_ONLY).sinkFactory(dlcSinkFactory).build();taskRun(stream);}private static void taskRun(RowStream stream) throws IOException, SQLException {for (int t = 0; t < 100; t++) {SinkRecord row = SinkRecordPool.borrowObject();row.setRow("name", "sss");row.setRow("age", 12);stream.apply(row);SinkRecordPool.returnObject(row);}stream.flush();}}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>com.tencent.cloud.dlc</groupId><artifactId>dlc-bridage</artifactId><version>1.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build></project>
步骤3:DLC 新建目标表
步骤4:执行 SDK 程序

