前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >电力行业数据湖方案:Flink基于HiveCatalog增删改查Hudi实践攻略及TableNotPartitioned异常修复

电力行业数据湖方案:Flink基于HiveCatalog增删改查Hudi实践攻略及TableNotPartitioned异常修复

作者头像
用户9421738
发布2024-11-04 10:32:44
发布2024-11-04 10:32:44
10600
代码可运行
举报
文章被收录于专栏:大数据从业者
运行总次数:0
代码可运行

前言

本文主要记录电力行业客户数据湖技术架构演进遇到的问题,已有架构为FlinkSQL实时写入Hudi、Hive on Spark查询,现在准备引入FlinkSQL增删改查Hudi的流程,逐步去掉Hive on Spark流程。欢迎关注微信公众号:大数据从业者

原因很简单,Hive官方文档早已宣布3.x(最终版本3.1.3)不再维护更新、强烈建议用户升级到4.x继续维护的版本。而且4.x版本鼓励用户使用Hive on Tez引擎,Hive on Spark引擎相关代码已剔除。Hive官方文档介绍如下:

Hive源码剔除Hive on Spark代码PR于2022年已经合入master,如下:

问题描述

客户反馈根据Hudi官方文档Flink Quick Start章节update data部分时候遇到异常,详细堆栈如下:

代码语言:javascript
代码运行次数:0
复制
Flink SQL> UPDATE hudi_table SET fare = 25.0 WHERE uuid = '334e26e9-8355-45cc-97c6-c31daf0df330';

[ERROR] Could not execute SQL statement. Reason:

org.apache.flink.table.catalog.exceptions.TableNotPartitionedException: Table default.hudi_table in catalog myhive is not partitioned.

如果使用Flink默认memory catalog,并不会遇到上述问题。而使用Hive catalog上述问题必现。

代码语言:javascript
代码运行次数:0
复制
https://hudi.apache.org/docs/flink-quick-start-guide             

问题分析

从现象看明显属于Flink Hive catalog问题,通过搜索Flink源码找到上述异常属于TableNotPartitionedException类:

代码语言:javascript
代码运行次数:0
复制
public class TableNotPartitionedException extends Exception {          
         
    private static final String MSG = "Table %s in catalog %s is not partitioned.";          
         
    public TableNotPartitionedException(String catalogName, ObjectPath tablePath) {          
        this(catalogName, tablePath, null);          
    }          
         
    public TableNotPartitionedException(String catalogName, ObjectPath tablePath, Throwable cause) {          
        super(String.format(MSG, tablePath.getFullName(), catalogName), cause);          
    }          
}

该类被GenericInMemoryCatalog和HiveCatalog两种catalog使用,当对分区表操作的时候,如果检查发现没有分区,则抛出该异常。GenericInMemoryCatalog这里不再介绍。HiveCatalog的检查流程如下:

代码语言:javascript
代码运行次数:0
复制
private void ensurePartitionedTable(ObjectPath tablePath, Table hiveTable)          
        throws TableNotPartitionedException {          
    if (!isTablePartitioned(hiveTable)) {          
        throw new TableNotPartitionedException(getName(), tablePath);          
    }          
}
private static boolean isTablePartitioned(Table hiveTable) {          
    return hiveTable.getPartitionKeysSize() != 0;          
}

其中,hiveTable属于Hive Metastore Table类对象,getPartitionKeysSize方法访问metastore发现分区数为0,则抛出异常。那么问题来了!难道FlinkSQL创建Hudi分区表的分区信息,没同步写入到metastore数据库中?很显然答案:是。通过beeline执行show create table可以佐证这一点,如下:

从上图可以看到,分区信息只是被同步到表属性当中,并没有同步到表PARTITIONED BY信息。所以,导致getPartitionKeysSize=0!

解决思路很清晰,要么修正同步到Hive的分区信息,要么修改判断分区表的规则!我选择后者,因为Hudi在Hdfs上的数据目录中已经包含字段schema信息和分区信息,所以并不需要FlinkSQL操作Hudi时候重复进行任何判断动作。

修改方法很暴力:注释掉HiveCatalog中判断是否为分区表的逻辑,如下:

上述修改已经提交到我的Flink分支,有需要请自行参考修改使用:

代码语言:javascript
代码运行次数:0
复制
https://github.com/felixzh2020/flink/tree/apache/release-1.18.1  

实践验证

按照上述方式修改以后,构建环境进行验证。考虑到此次修改与Flink、Hudi版本无关,这里我选择自建环境验证,相应组件版本信息如下:

Hadoop

3.2.3

Hive

3.1.3

Hudi

1.0.0

Flink

1.18.1

使用我的Flink分支源码编译打包,如下:

代码语言:javascript
代码运行次数:0
复制
git clone -b apache/release-1.18.1  https://github.com/felixzh2020/flink.git
mvn clean package -DskipTests -Dfast -Dhadoop.version=3.2.3 -Dhive.version=3.1.3 -Pscala-2.12 -T 1C

拷贝flink-connectors/flink-sql-connector-hive-3.1.3/target/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar到flink/lib

使用我的Hudi分支源码(已经适配Hadoop3.2.3、Flink1.18.1)编译打包,如下:

代码语言:javascript
代码运行次数:0
复制
git clone -b apache/release-1.18.1  https://github.com/felixzh2020/flink.git
mvn clean package -DskipTests -Dmaven.test.skip=true -Dfast -Dscala-2.12 -Dflink1.18 -Pflink-bundle-shade-hive3 -Drat.skip=true -Dcheckstyle.ski

拷贝packaging/hudi-flink-bundle/target/hudi-flink1.18-bundle-1.0.0-beta2.jar到flink/lib

通过sql-client.sh创建Hive Catalog,如下:

代码语言:javascript
代码运行次数:0
复制
CREATE CATALOG myhive WITH (    
   'type' = 'hive',
   'hive-conf-dir' = '/home/myHadoopCluster/apache-hive-3.1.3-bin/conf'
 );

use catalog myhive;

然后,根据Hudi官方文档Flink Quick Start章节依次操作如下:

1.创建MOR分区表

2.插入数据行

3.查询表数据

4.更新表、查询表

5.删除行、查询表

总结

本文主要记录电力行业客户数据湖技术架构演进路线,随着Hive on Spark功能废弃,考虑引入FlinkSQL操作Hudi表。后续会考虑继续引入SparkSQL与OLAP(Presto/Trino/Openlookeng)等操作Hudi表。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-11-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据从业者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 问题描述
  • 问题分析
  • 实践验证
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档