本文主要记录电力行业客户数据湖技术架构演进遇到的问题,已有架构为FlinkSQL实时写入Hudi、Hive on Spark查询,现在准备引入FlinkSQL增删改查Hudi的流程,逐步去掉Hive on Spark流程。欢迎关注微信公众号:大数据从业者
原因很简单,Hive官方文档早已宣布3.x(最终版本3.1.3)不再维护更新、强烈建议用户升级到4.x继续维护的版本。而且4.x版本鼓励用户使用Hive on Tez引擎,Hive on Spark引擎相关代码已剔除。Hive官方文档介绍如下:
Hive源码剔除Hive on Spark代码PR于2022年已经合入master,如下:
客户反馈根据Hudi官方文档Flink Quick Start章节update data部分时候遇到异常,详细堆栈如下:
Flink SQL> UPDATE hudi_table SET fare = 25.0 WHERE uuid = '334e26e9-8355-45cc-97c6-c31daf0df330';
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.TableNotPartitionedException: Table default.hudi_table in catalog myhive is not partitioned.
如果使用Flink默认memory catalog,并不会遇到上述问题。而使用Hive catalog上述问题必现。
https://hudi.apache.org/docs/flink-quick-start-guide
从现象看明显属于Flink Hive catalog问题,通过搜索Flink源码找到上述异常属于TableNotPartitionedException类:
public class TableNotPartitionedException extends Exception {
private static final String MSG = "Table %s in catalog %s is not partitioned.";
public TableNotPartitionedException(String catalogName, ObjectPath tablePath) {
this(catalogName, tablePath, null);
}
public TableNotPartitionedException(String catalogName, ObjectPath tablePath, Throwable cause) {
super(String.format(MSG, tablePath.getFullName(), catalogName), cause);
}
}
该类被GenericInMemoryCatalog和HiveCatalog两种catalog使用,当对分区表操作的时候,如果检查发现没有分区,则抛出该异常。GenericInMemoryCatalog这里不再介绍。HiveCatalog的检查流程如下:
private void ensurePartitionedTable(ObjectPath tablePath, Table hiveTable)
throws TableNotPartitionedException {
if (!isTablePartitioned(hiveTable)) {
throw new TableNotPartitionedException(getName(), tablePath);
}
}
private static boolean isTablePartitioned(Table hiveTable) {
return hiveTable.getPartitionKeysSize() != 0;
}
其中,hiveTable属于Hive Metastore Table类对象,getPartitionKeysSize方法访问metastore发现分区数为0,则抛出异常。那么问题来了!难道FlinkSQL创建Hudi分区表的分区信息,没同步写入到metastore数据库中?很显然答案:是。通过beeline执行show create table可以佐证这一点,如下:
从上图可以看到,分区信息只是被同步到表属性当中,并没有同步到表PARTITIONED BY信息。所以,导致getPartitionKeysSize=0!
解决思路很清晰,要么修正同步到Hive的分区信息,要么修改判断分区表的规则!我选择后者,因为Hudi在Hdfs上的数据目录中已经包含字段schema信息和分区信息,所以并不需要FlinkSQL操作Hudi时候重复进行任何判断动作。
修改方法很暴力:注释掉HiveCatalog中判断是否为分区表的逻辑,如下:
上述修改已经提交到我的Flink分支,有需要请自行参考修改使用:
https://github.com/felixzh2020/flink/tree/apache/release-1.18.1
按照上述方式修改以后,构建环境进行验证。考虑到此次修改与Flink、Hudi版本无关,这里我选择自建环境验证,相应组件版本信息如下:
Hadoop | 3.2.3 |
---|---|
Hive | 3.1.3 |
Hudi | 1.0.0 |
Flink | 1.18.1 |
使用我的Flink分支源码编译打包,如下:
git clone -b apache/release-1.18.1 https://github.com/felixzh2020/flink.git
mvn clean package -DskipTests -Dfast -Dhadoop.version=3.2.3 -Dhive.version=3.1.3 -Pscala-2.12 -T 1C
拷贝flink-connectors/flink-sql-connector-hive-3.1.3/target/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar到flink/lib
使用我的Hudi分支源码(已经适配Hadoop3.2.3、Flink1.18.1)编译打包,如下:
git clone -b apache/release-1.18.1 https://github.com/felixzh2020/flink.git
mvn clean package -DskipTests -Dmaven.test.skip=true -Dfast -Dscala-2.12 -Dflink1.18 -Pflink-bundle-shade-hive3 -Drat.skip=true -Dcheckstyle.ski
拷贝packaging/hudi-flink-bundle/target/hudi-flink1.18-bundle-1.0.0-beta2.jar到flink/lib
通过sql-client.sh创建Hive Catalog,如下:
CREATE CATALOG myhive WITH (
'type' = 'hive',
'hive-conf-dir' = '/home/myHadoopCluster/apache-hive-3.1.3-bin/conf'
);
use catalog myhive;
然后,根据Hudi官方文档Flink Quick Start章节依次操作如下:
1.创建MOR分区表
2.插入数据行
3.查询表数据
4.更新表、查询表
5.删除行、查询表
本文主要记录电力行业客户数据湖技术架构演进路线,随着Hive on Spark功能废弃,考虑引入FlinkSQL操作Hudi表。后续会考虑继续引入SparkSQL与OLAP(Presto/Trino/Openlookeng)等操作Hudi表。