前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache Hudi 入门学习总结

Apache Hudi 入门学习总结

作者头像
IT技术分享社区
发布2022-10-31 16:30:24
1.2K0
发布2022-10-31 16:30:24
举报
文章被收录于专栏:IT技术分享社区IT技术分享社区

前言

学习和使用Hudi近一年了,由于之前忙于工作和学习,没时间总结,现在从头开始总结一下,先从入门开始

Hudi 概念

Apache Hudi 是一个支持插入、更新、删除的增量数据湖处理框架,有两种表类型:COW和MOR,可以自动合并小文件,Hudi自己管理元数据,元数据目录为.hoodie, 具体的概念可以查看官网https://hudi.apache.org/cn/docs/0.9.0/overview

Hudi 学习

  • Hudi 官网 https://hudi.apache.org/cn/docs/0.9.0/overview/(因本人最开始学习时Hudi的版本为0.9.0版本,所以这里列的也是0.9.0的连接)
  • Hudi 官方公众号号:ApacheHudi (Hudi PMC leesf 运营的),自己搜索即可,这里不贴二维码了
  • Github https://github.com/leesf/hudi-resources 这个是Hudi PMC leesf整理的公众号上的文章,PC 浏览器上看比较方便
  • GitHub 源码 https://github.com/apache/hudi 想要深入学习,还是得看源码并多和社区交流

Hudi 安装

只需要将Hudi的jar包放到Spark和Hive对应的路径下,再修改几个配置

Spark

Hudi支持Spark程序读写Hudi表,同时也支持Spark SQL insert/update/delete/merge等

包名:hudi-spark-bundle_2.11-0.9.0.jar 下载地址:https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark-bundle_2.11/0.9.0/hudi-spark-bundle_2.11-0.9.0.jar 包名:hudi-utilities-bundle_2.11-0.9.0.jar 下载地址:https://repo1.maven.org/maven2/org/apache/hudi/hudi-utilities-bundle_2.11/0.9.0/hudi-utilities-bundle_2.11-0.9.0.jar

将hudi-spark-bundle_2.11-0.9.0.jar 和 hudi-utilities-bundle_2.11-0.9.0.jar拷贝到

Hive

Hudi可以将元数据同步到Hive表中,Hive只能用来查询,不能insert/update/delete

包名:hudi-hadoop-mr-bundle-0.9.0.jar 下载地址:https://repo1.maven.org/maven2/org/apache/hudi/hudi-hadoop-mr-bundle/0.9.0/hudi-hadoop-mr-bundle-0.9.0.jar 1、将hudi-hadoop-mr-bundle-0.9.0.jar 拷贝至SPARKHOME/jars,当前版本目录为/usr/hdp/3.1.0.0−78/spark2/jars/版本说明:0.9.0为hudi发行版本,2.11为HDP中Spark对应的scala版本这里提供的是Maven的下载地址,对于其他版本,Maven上可以下载到,当然也可以自己打包¨K25KHudi可以将元数据同步到Hive表中,Hive只能用来查询,不能insert/update/delete包名:hudi−hadoop−mr−bundle−0.9.0.jar下载地址:[https://repo1.maven.org/maven2/org/apache/hudi/hudi−hadoop−mr−bundle/0.9.0/hudi−hadoop−mr−bundle−0.9.0.jar](https://repo1.maven.org/maven2/org/apache/hudi/hudi−utilities−bundle2.11/0.9.0/hudi−utilities−bundle2.11−0.9.0.jar)1、将hudi−hadoop−mr−bundle−0.9.0.jar拷贝至HIVE_HOME/lib,当前版本目录为:/usr/hdp/3.1.0.0-78/hive/lib/ 2、修改hive配置(在hive-site.xml) hive.input.format=org.apache.hudi.hadoop.HoodieParquetInputFormat hive.resultset.use.unique.column.names=false (修改这里的配置是因为如果我们用hudi-utilities-bundle中的工具类HoodieDeltaStreamer,其中的JdbcbasedSchemaProvider解析Hive表Schema时需要设置这个属性,否则解析异常,关于HoodieDeltaStreamer的使用我会单独在另一篇文章中总结) 3、重启hive

Tez

1、将上述hudi-hadoop-mr-bundle-0.9.0.jar 打到/hdp/apps/${hdp.version}/tez/tez2.tar.gz中 注意:这里的路径是指HDFS路径 2、修改hive配置(在hive-site.xml) hive.tez.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat 3、重启Tez、Hive

关于第一个打包到tez2.tar.gz,我自己写了一个脚本,如下:

代码语言:javascript
复制
jar=$1

sudo rm -r tez_temp
mkdir tez_temp
cd tez_temp
hadoop fs -get /hdp/apps/3.1.0.0-78/tez/tez.tar.gz
mkdir tez
tar -zxvf tez.tar.gz -C tez
mkdir gz
sudo rm -r tez/lib/hudi-hadoop-mr*
cp $jar tez/lib/
cd tez
tar -zcvf  ../gz/tez.tar.gz ./*
hadoop fs -rm -r  /hdp/apps/3.1.0.0-78/tez/tez.tar.gz.back
hadoop fs -mv  /hdp/apps/3.1.0.0-78/tez/tez.tar.gz /hdp/apps/3.1.0.0-78/tez/tez.tar.gz.back
cd ../gz/
hadoop fs -put tez.tar.gz /hdp/apps/3.1.0.0-78/tez/
su - hdfs <<EOF
kinit -kt /etc/security/keytabs/hdfs.headless.keytab hdfs-cluster1@INDATA.COM
hadoop fs -chown hdfs:haoop /hdp/apps/3.1.0.0-78/tez/tez.tar.gz
hadoop fs -chmod 444  /hdp/apps/3.1.0.0-78/tez/tez.tar.gz
hadoop fs -ls /hdp/apps/3.1.0.0-78/tez/
exit
EOF

这个脚本在我自己的环境上是可以正常运行使用的,当然可能因本人水平有限,写的还不够好,不能适用所有环境,可以自行修改,仅做参考

Flink

Hudi也支持Flink,本人目前还不会Flink~,可以参考官网https://hudi.apache.org/cn/docs/0.9.0/flink-quick-start-guide

Hudi 写入

Hudi支持Spark、Flink、Java等多种客户端,本人常用Spark、Java客户端,这俩相比较而言,大家用Spark较多,这里就以Spark代码进行简单的示例总结

Spark 配置参数

代码语言:javascript
复制
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.spark.sql.SaveMode.{Append, Overwrite}
import org.apache.spark.sql.hudi.command.UuidKeyGenerator
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}


 val spark = SparkSession.builder().
      master("local[*]").
      appName("SparkHudiDemo").
      config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
      // 扩展Spark SQL,使Spark SQL支持Hudi
      config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension").
      // 支持Hive,本地测试时,注释掉
      //      enableHiveSupport().
      getOrCreate()

写Hudi并同步到Hive表

代码示例:

代码语言:javascript
复制
    val spark = SparkSession.builder().
      master("local[*]").
      appName("SparkHudiDemo").
      config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
      // 扩展Spark SQL,使Spark SQL支持Hudi
      config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension").
      // 支持Hive,本地测试时,注释掉
      //      enableHiveSupport().
      getOrCreate()

    import spark.implicits._
    val df = Seq((1, "a1", 10, 1000, "2022-05-12")).toDF("id", "name", "value", "ts", "dt")

    val databaseName = "default"
    val tableName1 = "test_hudi_table_1"
    val primaryKey = "id"
    val preCombineField = "ts"
    val partitionField = "dt"
    val tablePath1 = "/tmp/test_hudi_table_1"
    save2HudiSyncHiveWithPrimaryKey(df, databaseName, tableName1, primaryKey, preCombineField, partitionField,
      UPSERT_OPERATION_OPT_VAL, tablePath1, Overwrite)
    spark.read.format("hudi").load(tablePath1).show(false)
    // 删除表
    save2HudiSyncHiveWithPrimaryKey(df, databaseName, tableName1, primaryKey, preCombineField, partitionField,
      DELETE_OPERATION_OPT_VAL, tablePath1, Append)
    spark.read.format("hudi").load(tablePath1).show(false)


  /**
   * 写hudi并同步到hive,有主键,分区字段dt
   *
   */
  def save2HudiSyncHiveWithPrimaryKey(df: DataFrame, databaseName: String, tableName: String, primaryKey: String, preCombineField: String,
                                      partitionField: String, operation: String, tablePath: String, mode: SaveMode): Unit = {
    df.
      write.format("hudi").
      option(RECORDKEY_FIELD.key, primaryKey). // 主键字段
      option(PRECOMBINE_FIELD.key, preCombineField). // 预合并字段
      option(PARTITIONPATH_FIELD.key, partitionField).
      option(TBL_NAME.key, tableName).
      option(KEYGENERATOR_CLASS_NAME.key(), classOf[ComplexKeyGenerator].getName).
      option(OPERATION.key(), operation).
      // 下面的参数和同步hive元数据,查询hive有关
      option(META_SYNC_ENABLED.key, true).
      option(HIVE_USE_JDBC.key, false).
      option(HIVE_DATABASE.key, databaseName).
      option(HIVE_AUTO_CREATE_DATABASE.key, true).
      // 内部表,这里非必须,但是在用saveAsTable时则必须,因为0.9.0有bug,默认外部表
      option(HIVE_CREATE_MANAGED_TABLE.key, true).
      option(HIVE_TABLE.key, tableName).
      option(HIVE_CREATE_MANAGED_TABLE.key, true).
      option(HIVE_STYLE_PARTITIONING.key, true).
      option(HIVE_PARTITION_FIELDS.key, partitionField).
      option(HIVE_PARTITION_EXTRACTOR_CLASS.key, classOf[MultiPartKeysValueExtractor].getName).
      // 为了SparkSQL更新用,0.9.0版本有bug,需要设置这个参数,最新版本已经修复,可以不设置这个参数
      // 详情查看PR:https://github.com/apache/hudi/pull/3745
      option(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key, s"primaryKey=$primaryKey").
      mode(mode)
      .save(tablePath)
  }

代码说明:本地测试需要把同步Hive的代码部分注释掉,因为同步Hive需要连接Hive metaStore 服务器spark-shell里可以跑完整的代码,可以成功同步Hive,0.9.0版本同步Hive时会抛出一个关闭Hive的异常,这个可以忽略,这是该版本的一个bug,虽然有异常但是已同步成功,最新版本已经修复该bug,具体可以查看PR:https://github.com/apache/hudi/pull/3364

读Hudi

Spark 读取如上述代码示例:

代码语言:javascript
复制
spark.read.format("hudi").load(tablePath1).show(false)

结果:

代码语言:javascript
复制
+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---+----+-----+----+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                       |id |name|value|ts  |dt        |
+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---+----+-----+----+----------+
|20220512101542     |20220512101542_0_1  |id:1              |2022-05-12            |38c1ff87-8bc9-404c-8d2c-84f720e8133c-0_0-20-12004_20220512101542.parquet|1  |a1  |10   |1000|2022-05-12|
+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---+----+-----+----+----------+

可以看到多了几个Hudi元数据字段其中_hoodie_record_key为Hudi主键,如果设置了RECORDKEY_FIELD,比如这里的ID,那么_hoodie_record_key是根据我们设置字段生成的,默认不是复合主键,这里代码示例改为了复合主键,具体配置为

代码语言:javascript
复制
option(KEYGENERATOR_CLASS_NAME.key(), classOf[ComplexKeyGenerator].getCanonicalName).

这里主要为了和SparkSQL保持一致,因为SparkSQL默认的为复合主键,如果不一致,那么upsert/delete时会有问题

默认情况RECORDKEY_FIELD是必须设置的,RECORDKEY_FIELD的默认值为uuid,如果不设置,则会去找uuid,因为schema里没有uuid,那么会报错

Hive

在服务器上运行示例代码是可以成功同步到Hive表的,我们看一下Hive表情况:

代码语言:javascript
复制
show create table test_hudi_table_1;

下面是Hive Hudi表的建表语句,和普通的Hive表的建表语句的区别可以自己比较,其中SERDEPROPERTIES里的内容是为了SparkSQL用的,可以看到这里包含了'primaryKey'='id',在0.9.0版本,Spark SQL获取Hudi的主键字段是根据Hive表里这里的'primaryKey'获取的,如果没有这个属性,那么 Spark SQL认为该表不是主键表,则不能进行update等操作

代码语言:javascript
复制
+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE `test_hudi_table_1`(                  |
|   `_hoodie_commit_time` string,                    |
|   `_hoodie_commit_seqno` string,                   |
|   `_hoodie_record_key` string,                     |
|   `_hoodie_partition_path` string,                 |
|   `_hoodie_file_name` string,                      |
|   `id` int,                                        |
|   `name` string,                                   |
|   `value` int,                                     |
|   `ts` int)                                        |
| PARTITIONED BY (                                   |
|   `dt` string)                                     |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
| WITH SERDEPROPERTIES (                             |
|   'hoodie.query.as.ro.table'='false',              |
|   'path'='/tmp/test_hudi_table_1',                  |
|   'primaryKey'='id')                               |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hudi.hadoop.HoodieParquetInputFormat'  |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION                                           |
|   'hdfs://cluster1/tmp/test_hudi_table_1'           |
| TBLPROPERTIES (                                    |
|   'last_commit_time_sync'='20220512101500',        |
|   'spark.sql.sources.provider'='hudi',             |
|   'spark.sql.sources.schema.numPartCols'='1',      |
|   'spark.sql.sources.schema.numParts'='1',         |
|   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":false,"metadata":{}},{"name":"ts","type":"integer","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
|   'spark.sql.sources.schema.partCol.0'='dt',       |
|   'transient_lastDdlTime'='1652320902')            |
+----------------------------------------------------+

Hive查询Hudi表:

代码语言:javascript
复制
select * from test_hudi_table_1;
代码语言:javascript
复制
+----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+-------+--------+-------+-------------+--+
| _hoodie_commit_time  | _hoodie_commit_seqno  | _hoodie_record_key  | _hoodie_partition_path  |                             _hoodie_file_name                             | id  | name  | value  |  ts   |     dt      |
+----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+-------+--------+-------+-------------+--+
| 20220513150854       | 20220513150854_0_1    | id:1                | dt=2022-05-12           | dd4ef080-97b6-4046-a337-abb01e26943e-0_0-21-12005_20220513150854.parquet  | 1   | a1    | 10     | 1000  | 2022-05-12  |
+----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+-------+--------+-------+-------------+--+

Hive是可以查询Hudi表的,但是不能update/delete,要想使用update/delete等语句,只能使用Spark SQL,另外Hive可以增量查询。关于如何使用Hudi Spark SQL和Hive的增量查询,这里不展开描述,以后会单独写

配置项说明

这里只说明几个比较重要的配置,其他相关的配置可以看官网和源码

  • RECORDKEY_FIELD:默认情况RECORDKEY_FIELD是必须设置的,RECORDKEY_FIELD的默认值为uuid,如果不设置,则会去找uuid,因为schema里没有uuid,那么会报错。另外Hudi0.9.0支持非主键Hudi表,只需要配置 option(KEYGENERATOR_CLASS_NAME.key, classOf[UuidKeyGenerator].getName).即可,但是在后面的版本已经不支持了
  • KEYGENERATOR_CLASS_NAME:默认值为SimpleKeyGenerator,默认不支持复合主键,默认情况下上述_hoodie_record_key的内容为1,而不是id:1,而SparkSQL的默认值为SqlKeyGenerator,该类是ComplexKeyGenerator的子类:
代码语言:javascript
复制
class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)

也就是本示例所使用的的复合主键类,当使用SimpleKeyGeneratorComplexKeyGenerator同时upsert一个表时,那么会生成两条记录,因为_hoodie_record_key的内容不一样,所以一张表的 KEYGENERATOR_CLASS_NAME必须保证一致(父类和子类也是一致的)

  • PRECOMBINE_FIELD: 预合并字段,默认值:ts,想详细了解预合并可以参考我的另外两篇博客https://dongkelun.com/2021/07/10/hudiPreCombinedField/和https://dongkelun.com/2021/11/30/hudiPreCombineField2/ upsert时,预合并是必须的,如果我们的表里没有预合并字段,或者不想使用预合并,不设置的话是会抛异常的,因为默认去找ts字段,找不到则跑异常,那么我们可以将预合并字段设置为主键字段
  • PARTITIONPATH_FIELD: Hudi的分区字段,默认值partitionpath,对于没有分区的表,我们需要将该字段设置为空字符串option(PARTITIONPATH_FIELD.key, ""),否则可能会因找不到默认值partitionpath而抛异常。最新版本已经去掉分区字段默认值,详情可见:https://github.com/apache/hudi/pull/4195
  • OPERATION: Hudi的写操作类型,默认值为UPSERT_OPERATION_OPT_VAL即upsert,Hudi支持多种操作类型 如:upsert、insert、bulk_insert、delete等,具体每个版本支持哪些操作类型,可以查看官网或源码,可以根据自己的需求选择选择操作类型。本代码展示了upsert成功后,又删除成功。

下面的参数和同步hive元数据,查询hive有关

  • META_SYNC_ENABLED: 默认为false,不同步Hive,要想同步Hive可以将该值设为true,另外也可以设置HIVE_SYNC_ENABLED进行同步Hive,作用差不多,至于区别,这里不详细解说
  • HIVE_USE_JDBC: 是否使用jdbc同步hive,默认为true,如果使用jdbc,那么需要设置HIVE_URLHIVE_USERHIVE_PASS等配置,因为url和ip有关,每个环境不一样,用起来比较麻烦,所以这里不采用,另外因为实际使用是和Hive绑定的,可以直接使用HMS进行同步,使用起来比较方便,改为false后默认使用HMS同步Hive,具体逻辑可以看Hudi Hive 同步模块的源码,这里不展开
  • HIVE_STYLE_PARTITIONING: 是否使用Hive格式的分区路径,默认为false,如果设置为true,那么分区路径格式为 =,在这里为dt=2022-05-12,默认情况下只有即2022-05-12,因为我们常用Hive表查询Hudi所以,这里设置为true
  • HIVE_CREATE_MANAGED_TABLE: 同步Hive建表时是否为内部表,默认为false,使用saveAsTable(实际调用的Hudi Spark SQL CTAS)建表时0.9.0版本有,本应该为内部表,但还是为外部表,可以通过设置这个参数修正,最新版本已修复,详情可见PR:https://github.com/apache/hudi/pull/3146
  • HIVE_TABLE_SERDE_PROPERTIES: 同步到Hive表SERDEPROPERTIES,为了Hudi Spark SQL 使用,在0.9.0版本,Spark SQL获取Hudi的主键字段是根据Hive表里这里的'primaryKey'获取的,如果没有这个属性,那么Spark SQL认为该表不是主键表,则不能进行update等操作,而默认情况同步Hive时没有将主键字段同步过去,最新版本已经不需要设置该属性了。相关PR:https://github.com/apache/hudi/pull/3745 这个PR添加了支持HIVE_CREATE_MANAGED_TABLE配置,但是CTAS依旧有bug,代码里的虽然判断表类型是否为内部表,并添加到options中,但是最后并没有将options用到最终写Hudi的参数中。另一个PR:https://github.com/apache/hudi/pull/3998 该PR的主要目的不是为了解决这个bug,但是附带解决了这个问题,因为options最终被正确传到写Hudi的参数中了

其他Hive相关的配置参数不一一解释,可自行查看官网

hoodie.properties

.hoodie目录下有表属性文件.hoodie.properties,内容为:

代码语言:javascript
复制
hoodie.table.precombine.field=ts
hoodie.table.partition.fields=dt
hoodie.table.type=COPY_ON_WRITE
hoodie.archivelog.folder=archived
hoodie.populate.meta.fields=true
hoodie.timeline.layout.version=1
hoodie.table.version=2
hoodie.table.recordkey.fields=id
hoodie.table.base.file.format=PARQUET
hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.table.name=test_hudi_table_1

新版本在该属性文件里增加了很多属性,如HIVE_STYLE_PARTITIONINGhoodie.datasource.write.hive_style_partitioning,增加属性便于使表的属性前后保持统一

非主键表

如上面配置项说明所示,Hudi0.9.0版本支持非主键表,对于纯insert的表有用,这里进行简单的代码示例

代码语言:javascript
复制
    val tableName2 = "test_hudi_table_2"
    val tablePath2 = "/tmp/test_hudi_table_2"
    save2HudiWithNoPrimaryKey(df, tableName2, tablePath2)
    spark.read.format("hudi").load(tablePath2).show(false)

  /**
   * 非主键表,非分区表
   */
  def save2HudiWithNoPrimaryKey(df: DataFrame, tableName: String, tablePath: String): Unit = {
    df.
      write.format("hudi").
      option(KEYGENERATOR_CLASS_NAME.key, classOf[UuidKeyGenerator].getName).
      option(RECORDKEY_FIELD.key, "").
      option(PARTITIONPATH_FIELD.key, "").
      option(TBL_NAME.key, tableName).
      option(OPERATION.key(), INSERT_OPERATION_OPT_VAL).
      mode(Overwrite).
      save(tablePath)
  }

结果:

代码语言:javascript
复制
+-------------------+--------------------+------------------------------------+----------------------+------------------------------------------------------------------------+---+----+-----+----+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key                  |_hoodie_partition_path|_hoodie_file_name                                                       |id |name|value|ts  |dt        |
+-------------------+--------------------+------------------------------------+----------------------+------------------------------------------------------------------------+---+----+-----+----+----------+
|20220512145525     |20220512145525_0_1  |7263eac1-51f6-42eb-834d-bb5dfe13708e|                      |4fe619f1-58b1-4f58-94e6-002f9f5f5155-0_0-20-12004_20220512145525.parquet|1  |a1  |10   |1000|2022-05-12|
+-------------------+--------------------+------------------------------------+----------------------+------------------------------------------------------------------------+---+----+-----+----+----------+

可以看到Hudi的主键为uuid,_hoodie_partition_path为空,即非主键非分区表

备注:insert默认是会随机更新的(如果是主键表,大家可以将程序改为主键表,自行测试),随机指某些情况下,这和Hudi合并小文件有关,原理这里不详细解释,可以自行查看源码(以后可能会单独总结一篇相关的文章,和Hudi写文件、合并文件有关)。 要想是insert操作不更新,可以使用以下配置:

代码语言:javascript
复制
hoodie.merge.allow.duplicate.on.inserts = true

相关PR:https://github.com/apache/hudi/pull/3644,这个PR是在Java客户端支持这个参数的,Spark客户端本身(在这之前)就支持这个参数

saveAsTable

利用saveAsTable写Hudi并同步Hive,实际最终调用的是Spark SQL CTAS(CreateHoodieTableAsSelectCommand) CTAS 先用的insert into(InsertIntoHoodieTableCommand),再建表,默认insert,这里展示怎么配置参数使用bulk_insert,并且不使用预合并,这对于转化没有重复数据的历史表时很有用。 insert into SQL 默认是insert,配置一些参数就可以使用upsert/bulk_insert,具体可以看InsertIntoHoodieTableCommand源码

代码语言:javascript
复制
    val tableName3 = "test_hudi_table_3"
    save2HudiWithSaveAsTable(df, databaseName, tableName3, primaryKey)
    spark.table(tableName3).show()

  def save2HudiWithSaveAsTable(df: DataFrame, databaseName: String, tableName: String, primaryKey: String): Unit = {
    df.
      write.format("hudi").
      option(RECORDKEY_FIELD.key(), primaryKey).
      // 不需要预合并,所以设置为primaryKey
      // 当insert/bulk_insert等操作,并且关闭了相关参数,则不需要设置
      // SparkSQL中如果没有显示配置预合并字段,则默认将预合并字段设置为schema的最后一个字段
      // 如果为默认值的话,则可能会报null异常,所以设置为主键
      // `PRECOMBINE_FIELD.key -> tableSchema.fields.last.name`
      // 相关issue:https://github.com/apache/hudi/issues/4131
      option(PRECOMBINE_FIELD.key(), primaryKey).
      option(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key, s"primaryKey=$primaryKey").
      option(TBL_NAME.key(), tableName).
      option(HIVE_CREATE_MANAGED_TABLE.key, true).
      // 关闭预合并,虽然默认值为false,但是0.9.0版本SparkSQL,当有主键时,设置为了true
      option(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key, false).
      // 使用bulk_insert
      option(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key, true).
      // 这里虽然为Overwrite,但是Hudi CTAS要求目录必须为空,否则会报验证错误
      mode(Overwrite).
      saveAsTable(s"$databaseName.$tableName")
  }

这段代码本地是可以直接跑通的,结果为:

代码语言:javascript
复制
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+----+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|name|value|  ts|        dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+----+----------+
|     20220512154039|  20220512154039_0_1|              id:1|                      |de3c99a2-3858-462...|  1|  a1|   10|1000|2022-05-12|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+----+----------+

本地测试并没有同步到Hive表,因为并没有开启enableHiveSupport()(本地验证时,注释掉这个配置),当在服务器上运行时,则可以成功同步到Hive表,可以自己试试,用saveAsTable的好处是,很多配置比如同步Hive都在Hudi Spark SQL的源码里配置了,所以配置较少。CTAS也有一些限制,比如不能覆盖写,不如save(path)灵活

代码

完整代码地址:https://gitee.com/dongkelun/spark-hudi/blob/master/src/main/scala/com/dkl/blog/hudi/SparkHudiDemo.scala 备注:以后可能因重构地址有所变动

总结

本文对Hudi安装、读写进行了简单的总结,因为精力原因写的可能没有很全面,希望对刚入门Hudi的同学有所帮助,后面会继续总结Hudi Spark SQL 等其他方面的知识。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-10-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小明互联网技术分享社区 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • Hudi 概念
  • Hudi 学习
  • Hudi 安装
    • Spark
      • Hive
        • Tez
          • Flink
          • Hudi 写入
            • Spark 配置参数
              • 写Hudi并同步到Hive表
                • 读Hudi
                  • Hive
                  • 配置项说明
                  • hoodie.properties
                  • 非主键表
                  • saveAsTable
                  • 代码
                  • 总结
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档