在本节中,我们将介绍如何使用DeltaStreamer工具从外部数据源甚至其他Hudi表中获取新的更改,以及如何使用Hudi数据源通过upserts加速大型Spark作业。 然后可以使用各种查询引擎查询这些表。
UPSERT:这是默认操作,通过查找索引,输入记录首先被标记为插入或更新。这些记录最终在运行启发式算法后写入,以确定如何最好地将它们打包到存储上,以优化文件大小等事项。这个操作推荐用于数据库更改捕获这样的用例,因为输入几乎肯定包含更新。目标表永远不会显示重复项。
INSERT:这个操作在启发式/文件大小方面与upsert非常相似,但完全跳过了索引查找步骤。因此,对于日志重复删除之类的用例,它可能比upserts快得多(结合下面提到的过滤重复项的选项)。这也适用于表可以容忍重复,但只需要Hudi的事务性写/增量拉取/存储管理功能的用例。
BULK_INSERT: upsert和insert操作都将输入记录保存在内存中,以加快存储启发式计算(以及其他操作),因此在初始加载/引导Hudi表时可能会很麻烦。BULK_INSERT提供了与插入相同的语义,同时实现了基于排序的数据写入算法,该算法可以很好地扩展到几百tb的初始负载。然而,与像insert /upserts那样保证文件大小相比,这只是在调整文件大小方面做了最大的努力。
HoodieDeltaStreamer实用程序(hudi-utilities-bundle的一部分)提供了从不同来源(如DFS或Kafka)获取数据的方法,具有以下功能。
命令行选项更详细的描述功能如下:
[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` --help
Usage: <main class> [options]
Options:
--checkpoint
Resume Delta Streamer from this checkpoint.
--commit-on-errors
Commit even when some records failed to be written
Default: false
--compact-scheduling-minshare
Minshare for compaction as defined in
https://spark.apache.org/docs/latest/job-scheduling
Default: 0
--compact-scheduling-weight
Scheduling weight for compaction as defined in
https://spark.apache.org/docs/latest/job-scheduling
Default: 1
--continuous
Delta Streamer runs in continuous mode running source-fetch -> Transform
-> Hudi Write in loop
Default: false
--delta-sync-scheduling-minshare
Minshare for delta sync as defined in
https://spark.apache.org/docs/latest/job-scheduling
Default: 0
--delta-sync-scheduling-weight
Scheduling weight for delta sync as defined in
https://spark.apache.org/docs/latest/job-scheduling
Default: 1
--disable-compaction
Compaction is enabled for MoR table by default. This flag disables it
Default: false
--enable-hive-sync
Enable syncing to hive
Default: false
--filter-dupes
Should duplicate records from source be dropped/filtered out before
insert/bulk-insert
Default: false
--help, -h
--hoodie-conf
Any configuration that can be set in the properties file (using the CLI
parameter "--propsFilePath") can also be passed command line using this
parameter
Default: []
--max-pending-compactions
Maximum number of outstanding inflight/requested compactions. Delta Sync
will not happen unlessoutstanding compactions is less than this number
Default: 5
--min-sync-interval-seconds
the min sync interval of each sync in continuous mode
Default: 0
--op
Takes one of these values : UPSERT (default), INSERT (use when input is
purely new data/inserts to gain speed)
Default: UPSERT
Possible Values: [UPSERT, INSERT, BULK_INSERT]
--payload-class
subclass of HoodieRecordPayload, that works off a GenericRecord.
Implement your own, if you want to do something other than overwriting
existing value
Default: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
--props
path to properties file on localfs or dfs, with configurations for
hoodie client, schema provider, key generator and data source. For
hoodie client props, sane defaults are used, but recommend use to
provide basic things like metrics endpoints, hive configs etc. For
sources, referto individual classes, for supported properties.
Default: file:///Users/vinoth/bin/hoodie/src/test/resources/delta-streamer-config/dfs-source.properties
--schemaprovider-class
subclass of org.apache.hudi.utilities.schema.SchemaProvider to attach
schemas to input & target table data, built in options:
org.apache.hudi.utilities.schema.FilebasedSchemaProvider.Source (See
org.apache.hudi.utilities.sources.Source) implementation can implement
their own SchemaProvider. For Sources that return Dataset<Row>, the
schema is obtained implicitly. However, this CLI option allows
overriding the schemaprovider returned by Source.
--source-class
Subclass of org.apache.hudi.utilities.sources to read data. Built-in
options: org.apache.hudi.utilities.sources.{JsonDFSSource (default),
AvroDFSSource, JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}
Default: org.apache.hudi.utilities.sources.JsonDFSSource
--source-limit
Maximum amount of data to read from source. Default: No limit For e.g:
DFS-Source => max bytes to read, Kafka-Source => max events to read
Default: 9223372036854775807
--source-ordering-field
Field within source record to decide how to break ties between records
with same key in input data. Default: 'ts' holding unix timestamp of
record
Default: ts
--spark-master
spark master to use.
Default: local[2]
* --table-type
Type of table. COPY_ON_WRITE (or) MERGE_ON_READ
* --target-base-path
base path for the target hoodie table. (Will be created if did not exist
first time around. If exists, expected to be a hoodie table)
* --target-table
name of the target table in Hive
--transformer-class
subclass of org.apache.hudi.utilities.transform.Transformer. Allows
transforming raw source Dataset to a target Dataset (conforming to
target schema) before writing. Default : Not set. E:g -
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which
allows a SQL query templated to be passed as a transformation function)Copy
该工具采用一个层次结构组成的属性文件,并具有用于提取数据、生成密钥和提供模式的可插拔接口。在hudi-utilities/src/test/resources/delta-streamer-config下提供了从kafka和dfs中获取数据的配置示例。
例如:一旦你有Confluent Kafka, Schema注册表启动并运行,产生一些测试数据使用(impressions,Avro由schema-registry repo提供)
[confluent-5.0.0]$ bin/ksql-datagen schema=../impressions.avro format=avro topic=impressions key=impressionid
Copy
然后按照如下方式摄入。
[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar`
--props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource
--source-ordering-field impresssiontime
--target-base-path file:///tmp/hudi-deltastreamer-op
--target-table uber.impressions
--op BULK_INSERTCopy
在某些情况下,您可能希望提前将现有表迁移到Hudi。请参考迁移指南。
HoodieMultiTableDeltaStreamer是HoodieDeltaStreamer上的一个包装器,它可以让用户在一次进入hudi数据集的时候获取多个表。目前它只支持对要摄入的表的顺序处理和COPY_ON_WRITE存储类型。HoodieMultiTableDeltaStreamer的命令行选项与HoodieDeltaStreamer非常相似,唯一的例外是,您需要在专用配置文件夹的单独文件中提供表配置。引入了以下命令行选项
* --config-folder
the path to the folder which contains all the table wise config files
--base-path-prefix
this is added to enable users to create all the hudi datasets for related tables under one path in FS. The datasets are then created under the path - <base_path_prefix>/<database>/<table_to_be_ingested>. However you can override the paths for every table by setting the property hoodie.deltastreamer.ingestion.targetBasePathCopy
需要正确设置以下属性以使用HoodieMultiTableDeltaStreamer获取数据。
hoodie.deltastreamer.ingestion.tablesToBeIngested
comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2
hoodie.deltastreamer.ingestion.targetBasePath
if you wish to ingest a particular table in a separate path, you can mention that path here
hoodie.deltastreamer.ingestion.<database>.<table>.configFile
path to the config file in dedicated config folder which contains table overridden properties for the particular table to be ingested.Copy
可以在hudi-utilities/src/test/resources/delta-streamer-config下找到表覆盖属性的配置文件示例。运行HoodieMultiTableDeltaStreamer的命令也与运行HoodieDeltaStreamer类似。
[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar`
--props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties
--config-folder file://tmp/hudi-ingestion-config
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource
--source-ordering-field impresssiontime
--base-path-prefix file:///tmp/hudi-deltastreamer-op
--target-table uber.impressions
--op BULK_INSERTCopy
有关如何配置和使用HoodieMultiTableDeltaStreamer的详细信息,请参阅博客部分。
Hudi – Spark模块提供了DataSource API来写入(和读取)一个Spark DataFrame到一个Hudi表中。有许多可供选择的选项:
HoodieWriteConfig
TABLE_NAME (Required)
DataSourceWriteOptions
:
RECORDKEY_FIELD_OPT_KEY (Required):主键字段。记录键唯一地标识每个分区中的一条记录/行。如果想要具有全局唯一性,有两种选择。您可以将数据集设置为非分区的,也可以利用Global索引来确保记录键是惟一的,而不管分区路径如何。记录键可以是单个列,也可以是引用多个列。KEYGENERATOR_CLASS_OPT_KEY属性应该根据它是简单键还是复杂键进行相应设置。例如:“col1”表示简单字段,“col1,col2,col3,etc”表示复杂字段。嵌套字段可以使用点符号指定,例如:a.b.c。
默认值:“uuid”
PARTITIONPATH_FIELD_OPT_KEY (Required):用于对表进行分区的列。为了防止分区,提供空字符串作为值,例如:""。使用KEYGENERATOR_CLASS_OPT_KEY指定分区/不分区。如果分区路径需要url编码,可以设置URL_ENCODE_PARTITIONING_OPT_KEY。如果同步到hive,也使用HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY指定。
默认值:“partitionpath”
PRECOMBINE_FIELD_OPT_KEY (Required):当同一批中的两条记录具有相同的键值时,将选择指定字段中值最大的记录。如果你使用默认负载OverwriteWithLatestAvroPayload的HoodieRecordPayload (WRITE_PAYLOAD_CLASS),传入的记录将总是优先于存储中的记录,忽略这个PRECOMBINE_FIELD_OPT_KEY。
默认值:“t”
OPERATION_OPT_KEY:
要使用的写操作。
可用值:
UPSERT_OPERATION_OPT_VAL(默认值),BULK_INSERT_OPERATION_OPT_VAL, INSERT_OPERATION_OPT_VAL, DELETE_OPERATION_OPT_VAL
TABLE_TYPE_OPT_KEY:要写入的表的类型。注意:在初始创建表之后,当使用Spark SaveMode写入(更新)表时,这个值必须保持一致。追加模式。
可用值:
MOR_TABLE_TYPE_OPT_VAL COW_TABLE_TYPE_OPT_VAL(默认)
KEYGENERATOR_CLASS_OPT_KEY:请参阅下面的密钥生成部分。
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY:如果使用hive,指定表是否应该被分区。
可用值:
名为classOf SlashEncodedDayPartitionValueExtractor。名为MultiPartKeysValueExtractor。classOf getCanonicalName(默认),名为classOf getCanonicalName, TimestampBasedKeyGenerator。名为classOf getCanonicalName, NonPartitionedExtractor。名为classOf getCanonicalName, GlobalDeleteKeyGenerator。getCanonicalName(当OPERATION_OPT_KEY设置为DELETE_OPERATION_OPT_VAL时使用)
例如:Upsert一个DataFrame,为recordKey =>指定必要的字段名;_row_key partitionPath =比;precombineKey => / /预组合键时间戳
inputDF.write()
.format("org.apache.hudi")
.options(clientOpts) //Where clientOpts is of type Map[String, String]. clientOpts can include any other options necessary.
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.mode(SaveMode.Append)
.save(basePath);Copy
hudi- Flink模块为hudi源和汇定义了Flink SQL连接器。sink表有很多选项:
Option Name | Required | Default | Remarks |
---|---|---|---|
path | Y | N/A | Base path for the target hoodie table. The path would be created if it does not exist, otherwise a hudi table expects to be initialized successfully |
table.type | N | COPY_ON_WRITE | Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ |
write.operation | N | upsert | The write operation, that this write should do (insert or upsert is supported) |
write.precombine.field | N | ts | Field used in preCombining before actual write. When two records have the same key value, we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..) |
write.payload.class | N | OverwriteWithLatestAvroPayload.class | Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. This will render any value set for the option in-effective |
write.insert.drop.duplicates | N | false | Flag to indicate whether to drop duplicates upon insert. By default insert will accept duplicates, to gain extra performance |
write.ignore.failed | N | true | Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch. By default true (in favor of streaming progressing over data integrity) |
hoodie.datasource.write.recordkey.field | N | uuid | Record key field. Value to be used as the recordKey component of HoodieKey. Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using the dot notation eg: a.b.c |
hoodie.datasource.write.keygenerator.class | N | SimpleAvroKeyGenerator.class | Key generator class, that implements will extract the key out of incoming record |
write.tasks | N | 4 | Parallelism of tasks that do actual write, default is 4 |
write.batch.size.MB | N | 128 | Batch buffer size in MB to flush data into the underneath filesystem |
如果表类型是MERGE_ON_READ,还可以通过选项指定异步压缩策略:
Option Name | Required | Default | Remarks |
---|---|---|---|
compaction.async.enabled | N | true | Async Compaction, enabled by default for MOR |
compaction.trigger.strategy | N | num_commits | Strategy to trigger compaction, options are ‘num_commits’: trigger compaction when reach N delta commits; ‘time_elapsed’: trigger compaction when time elapsed > N seconds since last compaction; ‘num_and_time’: trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied; ‘num_or_time’: trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied. Default is ‘num_commits’ |
compaction.delta_commits | N | 5 | Max delta commits needed to trigger compaction, default 5 commits |
compaction.delta_seconds | N | 3600 | Max delta seconds time needed to trigger compaction, default 1 hour |
您可以使用SQL INSERT INTO语句写入数据:
INSERT INTO hudi_table select ... from ...; Copy
注意:目前还不支持INSERT OVERWRITE,但我们已经在roadmap上做了规划。
Hudi维护hoodie键(记录键+分区路径),以唯一地标识一个特定的记录。密钥生成器类将从传入的记录中提取这些信息。上面的两个工具都有配置来指定hoodie.datasource.write.keygenerator.class属性。对于DeltaStreamer,这将来自——props中指定的属性文件,DataSource writer使用DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY()直接接受该配置。此配置的默认值是SimpleKeyGenerator。注意:自定义键生成器类也可以在这里编写/提供。主键列应该通过RECORDKEY_FIELD_OPT_KEY选项提供。
Hudi目前支持不同的组合的记录键和分区路径如下-
CustomKeyGenerator.java java (hudi-spark模块的一部分)类为生成上面列出的所有类型的hoodie keys提供了强大的支持。您所需要做的就是正确地为下列属性提供值,以创建所需的键
hoodie.datasource.write.recordkey.field
hoodie.datasource.write.partitionpath.field
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGeneratorCopy
要使用复合记录键,需要提供逗号分隔的字段,如
hoodie.datasource.write.recordkey.field=field1,field2
Copy
这将以field1:value1,field2:value2等格式创建记录键,否则在简单记录键的情况下只能指定一个字段。CustomKeyGenerator类定义了用于配置分区路径的enum PartitionKeyType。它可以接受两个可能的值—SIMPLE和TIMESTAMP。对于分区表,hoodie.datasource.write.partitionpath.field属性的值需要以field1:PartitionKeyType1,field2:PartitionKeyType2等格式提供。例如,如果您想使用country和date这两个字段创建分区路径,其中后者具有基于时间戳的值,并且需要以给定格式自定义,那么您可以指定以下内容
hoodie.datasource.write.partitionpath.field=country:SIMPLE,date:TIMESTAMP
Copy
这将以<country_name>/<date>的格式创建分区路径或国家= <country_name>date=<date>这取决于您是否需要hive样式的分区。
TimestampBasedKeyGenerator类定义了以下属性,这些属性可用于对基于时间戳的分区路径进行定制
hoodie.deltastreamer.keygen.timebased.timestamp.type
This defines the type of the value that your field contains. It can be in string format or epoch format, for example
hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit
This defines the granularity of your field, whether it contains the values in seconds or milliseconds
hoodie.deltastreamer.keygen.timebased.input.dateformat
This defines the custom format in which the values are present in your field, for example yyyy/MM/dd
hoodie.deltastreamer.keygen.timebased.output.dateformat
This defines the custom format in which you want the partition paths to be created, for example dt=yyyyMMdd
hoodie.deltastreamer.keygen.timebased.timezone
This defines the timezone which the timestamp based values belong toCopy
当keygenerator类是CustomKeyGenerator时,可以简单地将属性留空来处理非分区表,例如
hoodie.datasource.write.partitionpath.field=
Copy
对于那些在hudi版本<0.6.0,您可以使用下列键生成器类来实现您的用例-
以上两种工具都支持将表的最新模式同步到Hive metastore,这样查询就可以获取新的列和分区。在这种情况下,最好从命令行或在独立的jvm中运行它,Hudi提供了一个HiveSyncTool,一旦你构建了Hudi -hive模块,可以如下所示调用它。下面是我们如何将上述Datasource Writer写的表同步到Hive metastore。
cd hudi-hive
./run_sync_tool.sh --jdbc-url jdbc:hive2://hiveserver:10000 --user hive --pass hive --partitioned-by partition --base-path <basePath> --database default --table <tableName>Copy
从Hudi 0.5.1版本开始,读时合并表的优化版本默认带有’_ro’后缀。为了向后兼容旧的Hudi版本,提供了一个可选的HiveSyncConfig -——skip-ro-suffix,如果需要,可以关闭’_ro’后缀。使用以下命令探索其他的hive同步选项:
cd hudi-hive
./run_sync_tool.sh
[hudi-hive]$ ./run_sync_tool.sh --helpCopy
通过允许用户指定不同的记录有效负载实现,Hudi支持对存储在Hudi表中的数据实现两种类型的删除。更多信息请参考在Hudi中删除支持。
示例使用硬删除方法2,从数据集deleteDF中存在的表中删除所有记录:
deleteDF // dataframe containing just records to be deleted
.write().format("org.apache.hudi")
.option(...) // Add HUDI options like record-key, partition-path and others as needed for your setup
// specify record_key, partition_key, precombine_fieldkey & usual params
.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")Copy
Hudi还对存储在Hudi表中的数据执行几个关键存储管理功能。在DFS上存储数据的一个关键方面是管理文件大小、计数和回收存储空间。例如,HDFS在处理小文件方面臭名昭著,这对NameNode施加了内存/RPC压力,可能会破坏整个集群的稳定。通常,查询引擎在适当大小的柱状文件上提供更好的性能,因为它们可以有效地分摊获取列统计信息等的成本。即使在一些云数据存储中,列出包含大量小文件的目录也常常是有成本的。
以下是一些有效管理Hudi表存储的方法。
Hudi中的小文件处理特性可以配置传入的工作负载,并将插入分发到现有的文件组,而不是创建新的文件组,这可能导致小文件。
Cleaner可以配置为清理旧的文件片,其积极程度或多或少取决于查询运行的最长时间和增量拉取所需的回看
用户还可以调整base/parquet文件、日志文件和预期压缩比的大小,以便将足够数量的插入分组到同一个文件组中,最终生成大小良好的基本文件。
智能地调优了大容量插入的并行性,可以再次在适当大小的初始文件组中使用。事实上,这一点非常重要,因为一旦创建了文件组,就不能删除,而只能像前面解释的那样简单地展开。
对于需要大量更新的工作负载,读时合并表提供了一种很好的机制,可以快速地将它们合并到较小的文件中,然后通过压缩将它们合并到较大的基本文件中。
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。