有奖捉虫:办公协同&微信生态&物联网文档专题 HOT
DLC Hudi 当前支持用 spark streaming 和 Flink 实时把外部数据入湖。

Spark Streaming 实时写入

DLC 支持部署 Spark Streaming 作业,推荐使用 DLC Spark 作业写入 DLC Hudi 表。
代码示例
kafkaDF.writeStream
.option("checkpointLocation","cosn://<cos_bucket>/spark_hudi/spark_ck")
.trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
.queryName("write hudi")
.foreachBatch((batchDF:DataFrame,_:Long)=>{
batchDF.write
.mode(SaveMode.Append)
.format("hudi")
.option("hoodie.datasource.write.table.type","MERGE_ON_READ")
.option("hoodie.datasource.write.precombine.field","ts")
.option("hoodie.datasource.write.recordkey.field","uuid")
.option("hoodie.datasource.write.partitionpath.field","partitionpath")
.option("hoodie.datasource.write.table.name","hudi_mor")
.save("cosn://<cos_bucket>/spark_hudi/hudi_mor")
}).start().awaitTermination()
除推荐的 DLC SPARK 作业外,您也可以选择腾讯云流计算 Oceanus 实时写入 DLC Hudi,详情参考 Oceanus 产品文档。 如您想通过自建 Flink 程序写入 DLC Hudi 表,可以参考如下代码示例:
//准备flink stream table执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings) ;
//指定kafka source表
tableEnv.executeSql("CREATE TABLE tbl_kafka (\\n" +
"\\tuuid STRING,\\n" +
"\\trider STRING,\\n" +
"\\tdriver STRING,\\n" +
"\\tbegin_lat DOUBLE,\\n" +
"\\tbegin_lon DOUBLE,\\n" +
"\\tend_lat DOUBLE,\\n" +
"\\tend_lon DOUBLE,\\n" +
"\\tfare DOUBLE,\\n" +
"\\tpartitionpath STRING,\\n" +
"\\tts BIGINT\\n" +
") WITH (\\n" +
" 'connector' = 'kafka',\\n" +
" 'topic' = 'hudi_source',\\n" +
" 'properties.bootstrap.servers' = '<kafka_server>:9092',\\n" +
" 'properties.group.id' = 'test-group-10001',\\n" +
" 'scan.startup.mode' = 'latest-offset',\\n" +
" 'format' = 'json'\\n" +
")");
//指定hudi target表
tableEnv.executeSql("CREATE TABLE hudi_cow (\\n" +
"uuid STRING PRIMARY KEY NOT ENFORCED,\\n" +
"rider STRING,\\n" +
"driver STRING,\\n" +
"begin_lat DOUBLE,\\n" +
"begin_lon DOUBLE,\\n" +
"end_lat DOUBLE,\\n" +
"end_lon DOUBLE,\\n" +
"fare DOUBLE,\\n" +
"partitionpath STRING,\\n" +
"ts BIGINT\\n" +
") " +
"partitioned by(partitionpath) " +
"WITH (\\n" +
" 'connector' = 'hudi',\\n" +
" 'path' = 'cosn://<cos_bucket>/flink_hudi/hudi_cow',\\n" +
" 'fs.cosn.impl' = 'org.apache.hadoop.fs.CosFileSystem',\\n" +
" 'fs.AbstractFileSystem.cosn.impl' = 'org.apache.hadoop.fs.CosN',\\n" +
" 'fs.cosn.bucket.region' = 'ap-chongqing',\\n" +
" 'fs.cosn.credentials.provider' = 'org.apache.hadoop.fs.auth.SimpleCredentialProvider',\\n" +
" 'fs.cosn.userinfo.secretId' = '<secretId>',\\n" +
" 'fs.cosn.userinfo.secretKey' = '<secretKey>',\\n" +
" 'table.type' = 'COPY_ON_WRITE',\\n" +
" 'write.operation' = 'upsert',\\n" +
" 'hoodie.datasource.write.recordkey.field' = 'uuid',\\n" +
" 'write.precombine.field' = 'ts',\\n" +
" 'write.tasks' = '1'\\n" +
")");
//使用flink sql写入hudi表
tableEnv.executeSql("insert into hudi_cow select uuid,rider,driver,begin_lat,begin_lon,end_lat,end_lon,fare,partitionpath,ts from tbl_kafka");

相关配置

常用写入配置
参数
默认值
描述
hoodie.datasource.write.table.name
指定写入的 hudi 表名
hoodie.datasource.write.table.type
COPY_ON_WRITE
指定 hudi 表类型,一旦这个表类型被指定,后续禁止修改该参数,可选值:COPY_ON_WRITEMERGE_ON_READ
hoodie.datasource.write.operation
upsert
写 hudi 表指定的操作类型,当前支持 upsertdeleteinsertbulk_insertinsert_overwriteinsert_overwrite_table 方式
hoodie.datasource.write.recordkey.field
uuid
用于指定 hudi 的主键,hudi 表要求有唯一主键
hoodie.datasource.write.partitionpath.field
用于指定分区键,该值配合 hoodie.datasource.write.keygenerator.class 使用可以满足不同的分区场景
hoodie.datasource.write.hive_style_partitioning
false
用于指定分区方式是否和 hive 保持一致,建议该值设置为 true
hoodie.datasource.write.precombine.field
ts
该值用于在写之前对具有相同的 key 的行进行合并去重

Compaction 配置 Compaction 用于合并 mor 表 Base 和 Log 文件,对于 Merge-On-Read 表,数据使用列式 Parquet 文件和行式 Avro 文件存储,更新被记录到增量文件,然后进行同步/异步 compaction 生成新版本的列式文件。 Merge-On-Read 表可减少数据摄入延迟,推荐使用同步产生 compaction 调度计划,异步执行 compaction 调度计划的方式。
参数
默认值
描述
hoodie.compact.schedule.inline
false
每次任务完成是否生成 compact plan,建议设置为 true
hoodie.compact.inline
false
是否在一个事务完成后内联执行压缩操作,这里开启并不一定每次都会触发索引操作后面还有策略判断
hoodie.compact.inline.trigger.strategy
CompactionTriggerStrategy.NUM_COMMITS
压缩策略参数,该参数有 NUM_COMMITS、TIME_ELAPSED、NUM_AND_TIME、NUM_OR_TIME
NUM_COMMITS 根据提交次数来判断是否进行压缩
TIME_ELAPSED 根据时间来判断是否进行压缩NUM_AND_TIME 根据提交次数和时间来判断是否进行压缩
NUM_OR_TIME 根据提交次数或时间来判断是否进行压缩
hoodie.compact.inline.max.delta.commits
5
设置提交多少次后触发压缩策略。在 NUM_COMMITS、NUM_AND_TIME和NUM_OR_TIME 策略中生效
hoodie.compact.inline.max.delta.seconds
60 * 60(1小时)
设置在经过多长时间后触发压缩策略。在TIME_ELAPSED、NUM_AND_TIME和NUM_OR_TIME 策略中生效
hoodie.parquet.small.file.limit
104857600(100MB)
小于这个值的是小文件,新增的数据会优先往小文件里写

单表并发写入控制

如果写入时 Hudi 表只有一个客户端在写入,此时不会遇到写数据冲突的情况。 但在实际应用中,如果存在多个客户端同时写入,例如多个流程序需要同时写入同一张 Hudi 表的场景,就会出现写冲突造成任务失败的情况,这种情况我们称之为并发写,要解决并发写问题可以借助 DLC Metastore 实现基于乐观锁的并发写。
启用并发写入机制:
hoodie.write.concurrency.mode=optimistic_concurrency_control

hoodie.cleaner.policy.failed.writes=LAZY
设置并发锁方式为 DLC Metastore 方式:
hoodie.write.lock.provider=org.apache.hudi.hive.HiveMetastoreBasedLockProvider

hoodie.write.lock.hivemetastore.database=<database_name>

hoodie.write.lock.hivemetastore.table=<table_name>
DLC Spark MultiWriter 示例:
kafkaDF.writeStream

.option("checkpointLocation","cosn://<cos_bucket>/spark_hudi/spark_ck/writer2")

.trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))

.queryName("write hudi")

.foreachBatch((batchDF:DataFrame,_:Long)=>{

batchDF.write

.mode(SaveMode.Append)

.format("hudi")

.option("hoodie.datasource.write.table.type","MERGE_ON_READ")

.option("hoodie.datasource.write.precombine.field","ts")

.option("hoodie.datasource.write.recordkey.field","uuid")

.option("hoodie.datasource.write.partitionpath.field","partitionpath")

.option("hoodie.datasource.write.table.name","multi_writer")

.option("hoodie.write.concurrency.mode","optimistic_concurrency_control")

.option("hoodie.cleaner.policy.failed.writes","LAZY")

.option("hoodie.write.lock.provider","org.apache.hudi.hive.transaction.lock.HiveMetastoreBasedLockProvider")

.option("hoodie.write.lock.hivemetastore.database","spark_hudi")

.option("hoodie.write.lock.hivemetastore.table","multi_writer")

.save("cosn://<cos_bucket>/spark_hudi/multi_writer")

}).start().awaitTermination()