首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Flink on Hive构建流批一体数仓

Flink使用HiveCatalog可以通过或者的方式来处理Hive中的表。这就意味着Flink既可以作为Hive的一个批处理引擎,也可以通过流处理的方式来读写Hive中的表,从而为实时数仓的应用和流批一体的落地实践奠定了坚实的基础。本文将以Flink1.12为例,介绍Flink集成Hive的另外一个非常重要的方面——Hive维表JOIN(Temporal Table Join)与Flink读写Hive表的方式。以下是全文,希望本文对你有所帮助。

Flink写入Hive表

Flink支持以批处理(Batch)流处理(Streaming)的方式写入Hive表。当以批处理的方式写入Hive表时,只有当写入作业结束时,才可以看到写入的数据。批处理的方式写入支持append模式和overwrite模式

批处理模式写入

向非分区表写入数据

向分区表写入数据

流处理模式写入

流式写入Hive表,不支持**Insert overwrite **方式,否则报如下错误:

下面的示例是将kafka的数据流式写入Hive的分区表

关于Hive表的一些属性解释:

partition.time-extractor.timestamp-pattern

默认值:(none)

解释:分区时间抽取器,与 DDL 中的分区字段保持一致,如果是按天分区,则可以是$dt,如果是按年(year)月(month)日(day)时(hour)进行分区,则该属性值为:$year-$month-$day $hour:00:00,如果是按天时进行分区,则该属性值为:$day $hour:00:00;

sink.partition-commit.trigger

process-time:不需要时间提取器和水位线,当当前时间大于分区创建时间 + sink.partition-commit.delay 中定义的时间,提交分区;

partition-time:需要 Source 表中定义 watermark,当 watermark > 提取到的分区时间 +sink.partition-commit.delay 中定义的时间,提交分区;

默认值:process-time

解释:分区触发器类型,可选process-time 或partition-time

sink.partition-commit.delay

默认值:0S

解释:分区提交的延时时间,如果是按天分区,则该属性的值为:1d,如果是按小时分区,则该属性值为1h;

metastore:添加分区的元数据信息,仅Hive表支持该值配置

success-file:在表的存储路径下添加一个文件

默认值:(none)

解释:提交分区的策略,用于通知下游的应用该分区已经完成了写入,也就是说该分区的数据可以被访问读取。可选的值如下:

可以同时配置上面的两个值,比如metastore,success-file

执行流式写入Hive表

同时查看Hive表的分区数据:

尖叫提示:

1.Flink读取Hive表默认使用的是batch模式,如果要使用流式读取Hive表,需要而外指定一些参数,见下文。

2.只有在完成 Checkpoint 之后,文件才会从 In-progress 状态变成 Finish 状态,同时生成文件,所以,Flink流式写入Hive表需要开启并配置 Checkpoint。对于Flink SQL Client而言,需要在flink-conf.yaml中开启CheckPoint,配置内容为:

state.backend: filesystem

state.savepoints.dir: hdfs://kms-1:8020/flink-savepoints

Flink读取Hive表

Flink支持以批处理(Batch)和流处理(Streaming)的方式读取Hive中的表。批处理的方式与Hive的本身查询类似,即只在提交查询的时刻查询一次Hive表。流处理的方式将会持续地监控Hive表,并且会增量地提取新的数据。默认情况下,Flink是以批处理的方式读取Hive表

关于流式读取Hive表,Flink既支持分区表又支持非分区表。对于分区表而言,Flink将会监控新产生的分区数据,并以增量的方式读取这些数据。对于非分区表,Flink会监控Hive表存储路径文件夹里面的新文件,并以增量的方式读取新的数据。

Flink读取Hive表可以配置一下参数:

streaming-source.enable

默认值:false

解释:是否开启流式读取 Hive 表,默认不开启。

默认值:all

解释:配置读取Hive的分区,包括两种方式:all和latest。all意味着读取所有分区的数据,latest表示只读取最新的分区数据。值得注意的是,latest方式只能用于开启了流式读取Hive表,并用于维表JOIN的场景。

streaming-source.monitor-interval

默认值:None

解释:持续监控Hive表分区或者文件的时间间隔。值得注意的是,当以流的方式读取Hive表时,该参数的默认值是1m,即1分钟。当temporal join时,默认的值是60m,即1小时。另外,该参数配置不宜过短 ,最短是1 个小时,因为目前的实现是每个 task 都会查询 metastore,高频的查可能会对metastore 产生过大的压力。

streaming-source.partition-order

默认值:partition-name

解释:streaming source的分区顺序。默认的是partition-name,表示使用默认分区名称顺序加载最新分区,也是推荐使用的方式。除此之外还有两种方式,分别为:create-time和partition-time。其中create-time表示使用分区文件创建时间顺序。partition-time表示使用分区时间顺序。指的注意的是,对于非分区表,该参数的默认值为:create-time

streaming-source.consume-start-offset

默认值:None

解释:流式读取Hive表的起始偏移量。

partition.time-extractor.kind

默认值:default

分区时间提取器类型。用于从分区中提取时间,支持default和自定义。如果使用default,则需要通过参数配置时间戳提取的正则表达式。

在 SQL Client 中需要显示地开启 SQL Hint 功能

使用SQLHint流式查询Hive表

Hive维表JOIN

Flink 1.12 支持了 Hive 最新的分区作为时态表的功能,可以通过 SQL 的方式直接关联 Hive 分区表的最新分区,并且会自动监听最新的 Hive 分区,当监控到新的分区后,会自动地做维表数据的全量替换。

Flink支持的是processing-time的temporal join,也就是说总是与最新版本的时态表进行JOIN。另外,Flink既支持非分区表的temporal join,又支持分区表的temporal join。对于分区表而言,Flink会监听Hive表的最新分区数据。值得注意的是,Flink尚不支持 event-time temporal join。

Temporal Join最新分区

对于一张随着时间变化的Hive分区表,Flink可以读取该表的数据作为一个无界流。如果Hive分区表的每个分区都包含全量的数据,那么每个分区将做为一个时态表的版本数据,即将最新的分区数据作为一个全量维表数据。值得注意的是,该功能特点仅支持Flink的STREAMING模式。

使用 Hive 最新分区作为 Tempmoral table 之前,需要设置必要的两个参数:

除此之外还有一些其他的参数,关于参数的解释见上面的分析。我们在使用Hive维表的时候,既可以在创建Hive表时指定具体的参数,也可以使用SQL Hint的方式动态指定参数。一个Hive维表的创建模板如下:

有了上面的Hive维表,我们就可以使用该维表与Kafka的实时流数据进行JOIN,得到相应的宽表数据。

除了在定义Hive维表时指定相关的参数,我们还可以通过SQL Hint的方式动态指定相关的参数,具体方式如下:

Temporal Join最新表

对于Hive的非分区表,当使用temporal join时,整个Hive表会被缓存到Slot内存中,然后根据流中的数据对应的key与其进行匹配。使用最新的Hive表进行temporal join不需要进行额外的配置,我们只需要配置一个Hive表缓存的TTL时间,该时间的作用是:当缓存过期时,就会重新扫描Hive表并加载最新的数据。

默认值:60min

解释:表示缓存时间。由于 Hive 维表会把维表所有数据缓存在 TM 的内存中,当维表数据量很大时,很容易造成 OOM。当然TTL的时间也不能太短,因为会频繁地加载数据,从而影响性能。

尖叫提示:

1.每一个子任务都需要缓存一份维表的全量数据,一定要确保TM的task Slot 大小能够容纳维表的数据量;

3.当缓存的维表数据需要重新刷新时,目前的做法是将整个表进行加载,因此不能够将新数据与旧数据区分开来。

Hive维表JOIN示例

假设维表的数据是通过批处理的方式(比如每天)装载至Hive中,而Kafka中的事实流数据需要与该维表进行JOIN,从而构建一个宽表数据,这个时候就可以使用Hive的维表JOIN。

创建一张kafka数据源表,实时流

创建一张Hive维表

关联Hive维表的最新数据

使用SQL Hint方式,关联非分区的Hive维表:

总结

本文以最新版本的Flink1.12为例,介绍了Flink读写Hive的不同方式,并对每种方式给出了相应的使用示例。在实际应用中,通常有将实时数据流与 Hive 维表 join 来构造宽表的需求,Flink提供了Hive维表JOIN,可以简化用户使用的复杂度。本文在最后详细说明了Flink进行Hive维表JOIN的基本步骤以及使用示例,希望对你有所帮助。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20210106A01H1Y00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券