前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >hudi 模式演化

hudi 模式演化

作者头像
从大数据到人工智能
发布2022-01-19 08:01:25
4370
发布2022-01-19 08:01:25
举报
文章被收录于专栏:大数据-BigData

模式演化是数据管理的一个非常重要的方面。 Hudi支持常见的模式演变场景,比如添加一个空字段或提升一个字段的数据类型,开箱即用。 此外,该模式可以跨引擎查询,如Presto、Hive和Spark SQL。 下表总结了与不同Hudi表类型兼容的模式更改类型。

Schema Change

COW

MOR

Remarks

Add a new nullable column at root level at the end

Yes

Yes

Yes means that a write with evolved schema succeeds and a read following the write succeeds to read entire dataset.

Add a new nullable column to inner struct (at the end)

Yes

Yes

Add a new complex type field with default (map and array)

Yes

Yes

Add a new nullable column and change the ordering of fields

No

No

Write succeeds but read fails if the write with evolved schema updated only some of the base files but not all. Currently, Hudi does not maintain a schema registry with history of changes across base files. Nevertheless, if the upsert touched all base files then the read will succeed.

Add a custom nullable Hudi meta column, e.g. _hoodie_meta_col

Yes

Yes

Promote datatype from int to long for a field at root level

Yes

Yes

For other types, Hudi supports promotion as specified in Avro schema resolution.

Promote datatype from int to long for a nested field

Yes

Yes

Promote datatype from int to long for a complex type (value of map or array)

Yes

Yes

Add a new non-nullable column at root level at the end

No

No

In case of MOR table with Spark data source, write succeeds but read fails. As a workaround, you can make the field nullable.

Add a new non-nullable column to inner struct (at the end)

No

No

Change datatype from long to int for a nested field

No

No

Change datatype from long to int for a complex type (value of map or array)

No

No

让我们通过一个示例来演示Hudi中的模式演化支持。 在下面的示例中,我们将添加一个新的字符串字段,并将字段的数据类型从int改为long。

代码语言:javascript
复制
Welcome to
    ____              __
    / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
    /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
    /_/

    Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_292)
    Type in expressions to have them evaluated.
    Type :help for more information.

scala> import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.QuickstartUtils._

scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._

scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._

scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._

scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._

scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val tableName = "hudi_trips_cow"
    tableName: String = hudi_trips_cow
scala> val basePath = "file:///tmp/hudi_trips_cow"
    basePath: String = file:///tmp/hudi_trips_cow
scala> val schema = StructType( Array(
    | StructField("rowId", StringType,true),
    | StructField("partitionId", StringType,true),
    | StructField("preComb", LongType,true),
    | StructField("name", StringType,true),
    | StructField("versionId", StringType,true),
    | StructField("intToLong", IntegerType,true)
    | ))
    schema: org.apache.spark.sql.types.StructType = StructType(StructField(rowId,StringType,true), StructField(partitionId,StringType,true), StructField(preComb,LongType,true), StructField(name,StringType,true), StructField(versionId,StringType,true), StructField(intToLong,IntegerType,true))

scala> val data1 = Seq(Row("row_1", "part_0", 0L, "bob", "v_0", 0),
    |                Row("row_2", "part_0", 0L, "john", "v_0", 0),
    |                Row("row_3", "part_0", 0L, "tom", "v_0", 0))
    data1: Seq[org.apache.spark.sql.Row] = List([row_1,part_0,0,bob,v_0,0], [row_2,part_0,0,john,v_0,0], [row_3,part_0,0,tom,v_0,0])

scala> var dfFromData1 = spark.createDataFrame(data1, schema)
scala> dfFromData1.write.format("hudi").
    |   options(getQuickstartWriteConfigs).
    |   option(PRECOMBINE_FIELD_OPT_KEY.key, "preComb").
    |   option(RECORDKEY_FIELD_OPT_KEY.key, "rowId").
    |   option(PARTITIONPATH_FIELD_OPT_KEY.key, "partitionId").
    |   option("hoodie.index.type","SIMPLE").
    |   option(TABLE_NAME.key, tableName).
    |   mode(Overwrite).
    |   save(basePath)

scala> var tripsSnapshotDF1 = spark.read.format("hudi").load(basePath + "/*/*")
    tripsSnapshotDF1: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 9 more fields]

scala> tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")

scala> spark.sql("desc hudi_trips_snapshot").show()
    +--------------------+---------+-------+
    |            col_name|data_type|comment|
    +--------------------+---------+-------+
    | _hoodie_commit_time|   string|   null|
    |_hoodie_commit_seqno|   string|   null|
    |  _hoodie_record_key|   string|   null|
    |_hoodie_partition...|   string|   null|
    |   _hoodie_file_name|   string|   null|
    |               rowId|   string|   null|
    |         partitionId|   string|   null|
    |             preComb|   bigint|   null|
    |                name|   string|   null|
    |           versionId|   string|   null|
    |           intToLong|      int|   null|
    +--------------------+---------+-------+

scala> spark.sql("select rowId, partitionId, preComb, name, versionId, intToLong from hudi_trips_snapshot").show()
    +-----+-----------+-------+----+---------+---------+
    |rowId|partitionId|preComb|name|versionId|intToLong|
    +-----+-----------+-------+----+---------+---------+
    |row_3|     part_0|      0| tom|      v_0|        0|
    |row_2|     part_0|      0|john|      v_0|        0|
    |row_1|     part_0|      0| bob|      v_0|        0|
    +-----+-----------+-------+----+---------+---------+

// In the new schema, we are going to add a String field and 
// change the datatype `intToLong` field from  int to long.
scala> val newSchema = StructType( Array(
    | StructField("rowId", StringType,true),
    | StructField("partitionId", StringType,true),
    | StructField("preComb", LongType,true),
    | StructField("name", StringType,true),
    | StructField("versionId", StringType,true),
    | StructField("intToLong", LongType,true),
    | StructField("newField", StringType,true)
    | ))
    newSchema: org.apache.spark.sql.types.StructType = StructType(StructField(rowId,StringType,true), StructField(partitionId,StringType,true), StructField(preComb,LongType,true), StructField(name,StringType,true), StructField(versionId,StringType,true), StructField(intToLong,LongType,true), StructField(newField,StringType,true))

scala> val data2 = Seq(Row("row_2", "part_0", 5L, "john", "v_3", 3L, "newField_1"),
    |                Row("row_5", "part_0", 5L, "maroon", "v_2", 2L, "newField_1"),
    |                Row("row_9", "part_0", 5L, "michael", "v_2", 2L, "newField_1"))
    data2: Seq[org.apache.spark.sql.Row] = List([row_2,part_0,5,john,v_3,3,newField_1], [row_5,part_0,5,maroon,v_2,2,newField_1], [row_9,part_0,5,michael,v_2,2,newField_1])

scala> var dfFromData2 = spark.createDataFrame(data2, newSchema)
scala> dfFromData2.write.format("hudi").
    |   options(getQuickstartWriteConfigs).
    |   option(PRECOMBINE_FIELD_OPT_KEY.key, "preComb").
    |   option(RECORDKEY_FIELD_OPT_KEY.key, "rowId").
    |   option(PARTITIONPATH_FIELD_OPT_KEY.key, "partitionId").
    |   option("hoodie.index.type","SIMPLE").
    |   option(TABLE_NAME.key, tableName).
    |   mode(Append).
    |   save(basePath)

scala> var tripsSnapshotDF2 = spark.read.format("hudi").load(basePath + "/*/*")
    tripsSnapshotDF2: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 10 more fields]

scala> tripsSnapshotDF2.createOrReplaceTempView("hudi_trips_snapshot")

scala> spark.sql("desc hudi_trips_snapshot").show()
    +--------------------+---------+-------+
    |            col_name|data_type|comment|
    +--------------------+---------+-------+
    | _hoodie_commit_time|   string|   null|
    |_hoodie_commit_seqno|   string|   null|
    |  _hoodie_record_key|   string|   null|
    |_hoodie_partition...|   string|   null|
    |   _hoodie_file_name|   string|   null|
    |               rowId|   string|   null|
    |         partitionId|   string|   null|
    |             preComb|   bigint|   null|
    |                name|   string|   null|
    |           versionId|   string|   null|
    |           intToLong|   bigint|   null|
    |            newField|   string|   null|
    +--------------------+---------+-------+

scala> spark.sql("select rowId, partitionId, preComb, name, versionId, intToLong, newField from hudi_trips_snapshot").show()
    +-----+-----------+-------+-------+---------+---------+----------+
    |rowId|partitionId|preComb|   name|versionId|intToLong|  newField|
    +-----+-----------+-------+-------+---------+---------+----------+
    |row_3|     part_0|      0|    tom|      v_0|        0|      null|
    |row_2|     part_0|      5|   john|      v_3|        3|newField_1|
    |row_1|     part_0|      0|    bob|      v_0|        0|      null|
    |row_5|     part_0|      5| maroon|      v_2|        2|newField_1|
    |row_9|     part_0|      5|michael|      v_2|        2|newField_1|
    +-----+-----------+-------+-------+---------+---------+----------+
Copy

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://cloud.tencent.com/developer/article/1936496

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-11-,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档