前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >0911-7.1.7-如何在CDP集群使用Flink SQL Client并与Hive集成

0911-7.1.7-如何在CDP集群使用Flink SQL Client并与Hive集成

作者头像
Fayson
发布2023-12-04 10:27:55
3410
发布2023-12-04 10:27:55
举报
文章被收录于专栏:Hadoop实操Hadoop实操

1 文档概述

在前面Fayson介绍了《0876-7.1.7-如何在CDP中部署Flink1.14》,同时Flink也提供了SQL Client的能力,可以通过一种简单的方式来编写、调试和提交程序到Flink集群,而无需编写一行Java或Scala代码。本篇文章主要介绍如何在CDP集群中使用Flink SQL Client与Hive集成。Flink与Hive的集成,主要有如下两个目的:

首先,可以利用Hive的Metastore作为一个持久目录和Flink的HiveCatalog来跨会话存储Flink特定的元数据。例如:用户可以使用HiveCatalog将Kafka和ElasticSearch表存储在HiveMetastore中,然后在SQL查询中重复使用。

其次,Flink可以作为读写Hive的替代引擎。

  • • 测试环境
    1. 1. CM7.4.4和CDP7.1.7
    2. 2. 操作系统Redhat7.6
    3. 3. Flink1.14.0-csa1.6.1.0
    4. 4. 集群未启用Kerberos

2 与Hive集成说明及依赖准备

1.Flink支持的Hive版本如下:

注意:Hive不同版本与Flink的集成有不同的功能差异,是Hive本身支持的问题,目前CDP中的Hive版本为3.1.3000,并不在当前的支持列表中。

  • • 1.2及更高版本支持Hive内置函数
  • • 3.1及更高版本支持列约束(即PRIMARY KEY和NOT NULL)
  • • 1.2.0及更高版本支持更改表统计信息
  • • 1.2.0及更高版本支持DATE列统计信息
  • • 2.0.x不支持写入ORC表

2.Hive与Flink的集成需要引入额外的依赖包,可用使用官方提供的可用依赖包,也可以自己通过引入独立的依赖实现

  • • 当前Flink官网提供的可用的依赖包如下

注意:当前官方提供的Hive3的依赖版本与CDP7.1.7中Hive版本不一致,并且经过测试也是不可用的。

  • • 引入独立的依赖包(如下列表简单列了几个版本,具体参考官网)

3.官方提供可以执行的依赖包并不能很好的适配CDP,只能通过第二种方式下载独立的依赖实现与Hive的集成

  • • 从Cloudera官方的Maven库下载flink-connector-hive依赖包
代码语言:javascript
复制
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive_2.12/1.14.0-csa1.6.0.0

Maven依赖引入方式:

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_2.12</artifactId>
    <version>1.14.0-csa1.6.0.0</version>
    <scope>provided</scope>
</dependency>

4.将下载的依赖包上传至CDP集群有Flink Gateway角色的/opt/cloudera/iceberg目录下

代码语言:javascript
复制
mkdir -p /opt/cloudera/iceberg

5.上述还提到了hive-exec及其他依赖包均在集群中获取,具体路径如下:

代码语言:javascript
复制
/opt/cloudera/parcels/CDH/lib/hive/lib/hive-exec.jar
/opt/cloudera/parcels/CDH/lib/hadoop/client/hadoop-mapreduce-client-core.jar
/opt/cloudera/parcels/CDH/lib/hive/lib/libfb303-0.9.3.jar
/opt/cloudera/parcels/CDH/jars/antlr-runtime-3.5.2.jar

Flink与Hive的集成,找到了依赖的Jar包后,可以将上述依赖的jar包拷贝至Flink的安装目录/opt/cloudera/parcels/FLINK/lib/flink/lib/(需要拷贝至集群所有节点),可以在客户端命令行启动时通过-j的方式引入。

代码语言:javascript
复制
flink-sql-client embedded \
  -j /opt/cloudera/iceberg/iceberg-flink-runtime-1.14-0.13.1.jar \
  -j /opt/cloudera/iceberg/flink-connector-hive_2.12-1.14.0-csa1.6.0.0.jar \
  -j /opt/cloudera/parcels/CDH/lib/hadoop/client/hadoop-mapreduce-client-core.jar \
  -j /opt/cloudera/parcels/CDH/lib/hive/lib/libfb303-0.9.3.jar \
  -j /opt/cloudera/parcels/CDH/lib/hive/lib/hive-exec.jar \
  shell

3 Flink与Hive集成验证

1.在命令行执行执行如下脚本启动Flink SQL Client

代码语言:javascript
复制
export HADOOP_USER_NAME=hive
flink-sql-client embedded \
  -j /opt/cloudera/iceberg/flink-connector-hive_2.12-1.14.0-csa1.6.0.0.jar \
  -j /opt/cloudera/parcels/CDH/lib/hadoop/client/hadoop-mapreduce-client-core.jar \
  -j /opt/cloudera/parcels/CDH/lib/hive/lib/libfb303-0.9.3.jar \
  -j /opt/cloudera/parcels/CDH/lib/hive/lib/hive-exec.jar \
  shell

2.在命令行执行如下命令设置结果显示方式及执行模式

代码语言:javascript
复制
SET 'sql-client.execution.result-mode' = 'tableau';
SET 'execution.runtime-mode' = 'batch';

此处为了便于显示,采用批量的方式执行以及表格的方式显示。

3.执行如下命令,创建一个Hive的Catalog

代码语言:javascript
复制
CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/etc/hive/conf'
);

查看已经创建的Catalog

代码语言:javascript
复制
show CATALOGS;

4.进入到创建的myhive的Catalog并查看表

代码语言:javascript
复制
use catalog myhive;
show tables;

此处看到的表与Hive中的表一致,也是相应的hive表。

5.在命令行执行SQL语句查询表数据

代码语言:javascript
复制
select * from test;

与Hive中查询的数据一致

6.执行一个SQL Count的操作

代码语言:javascript
复制
select count(*) from test;

4 异常处理

1.在命令行运行Flink的wordcount示例时,当作业运行结束后有如下异常日志输出

代码语言:javascript
复制
Exception in thread "Thread-5" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
    at 

解决方法:

在Cloudera Manager中为Flink Gateway角色的flink-conf.yaml中增加如下配置:

代码语言:javascript
复制
classloader.check-leaked-classloader: false

2.在命令行Flink的WordCount示例时会有如下大量异常日志输出

代码语言:javascript
复制
2022-04-13 08:37:50,368 ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] - Authentication failed
2022-04-13 08:37:50,399 ERROR org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.EnsembleTracker [] - Invalid config event received: {server.1=cdp03.fayson.com:3181:4181:participant, version=0, server.3=cdp02.fayson.com:3181:4181:participant, server.2=cdp01.fayson.com:3181:4181:participant}

解决方案:

在Cloudera Manager中为Flink Gateway角色的log4j.properties和log4j-cli.properties中增加如下配置:

代码语言:javascript
复制
logger.curator.name = org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.EnsembleTracker
logger.curator.level = OFF

问题分析:

上述出现的异常日志,主要是因为Flink中引入了Curator依赖包,该依赖包在处理Zookeeper的消息时,收到的信息中携带了”{}”,导致数据解析出现异常,目前该异常并不影响服务的使用(https://issues.apache.org/jira/browse/CURATOR-526),在Curator5.2之后版本修复,在修复的代码中可以看到只是将日志的级别从log.error调整为log.debug,参考https://github.com/apache/curator/pull/382

尝试将5.2版本修复后的类,打包到flink-shaded-zookeeper-3.5.5.7.1.7.0-551.jar包的org/apache/flink/shaded/curator4/org/apache/curator/framework/imps目录下,但在启动跑作业时失败,提示异常日志如下:

代码语言:javascript
复制
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/shaded/curator4/org/apache/curator/framework/imps/EnsembleTracker (wrong name: org/apache/curator/framework/imps/EnsembleTracker)

这里可以考虑重新编译,暂未尝试而是通过过滤该ERROR日志的方式解决。

3.在Flink与Hive集成后,运行SQL代码时报大量的异常日志

代码语言:javascript
复制
2022-04-13 08:58:24,505 WARN  org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An exception occurred when fetching query results
java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (ef7f994a08f57141fafd18481d13ab85)
    at 

问题分析: 通过分析Flink作业的jobmanager的日志可以看到

通过日志可以看到报错的jobid(ef7f994a08f57141fafd18481d13ab85)实际上是在对应的JobMaster 停止以后收到的请求,因此才会出现该错误。通过报错的日志段,查找对应的源码

可以看到获取结果这块是一个while的循环,在不停的从jobmaster获取结果,这里少了对JobMaster关闭状态的判断,或者少了sleep等待,while的循环导致jobmaster还未完全结束,又来了一次新的请求导致。

解决方案: 在CM的FLink服务中将log的日志级别调整为ERROR,具体配置如下:

代码语言:javascript
复制
logger.flink-collect.name = org.apache.flink.streaming.api.operators.collect.CollectResultFetcher
logger.flink-collect.level = ERROR

5 总结

1.官方提供的flink-connector-hive依赖包并不能与CDP的Hive集成,需要使用Cloudera提供的flink-connector-hive_2.12-1.14.0-csa1.6.0.0.jar集成。

2.在Flink SQL Client中创建的Hive Catalog在当前会话有效,在会话重新启动后则需要再次创建。

3.在FLink的Gateway节点必须部署Hive On Tez的Gateway,否则在创建Catalog时会找不到Hive Metastore相关的配置信息(如Metastore URI以及Warehouse的HDFS路径)。

4.在加入了antlr-runtime-3.5.2.jar依赖后,并不能通过设置'table.sql-dialect' = 'hive'使用Hive方言。

5.在未添加hadoop-mapreduce-client-core.jar依赖时,在SQL Client中执行SQL会卡主。

6.在SQL Client下运行Flink作业只支持Per-Job Mode不支持Session Mode模式。

7.通过Flink SQL向表中插入数据后,生成的Flink作业无法自动结束,一直处于运行状态,实际数据已写入表中。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-11-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Hadoop实操 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 文档概述
  • 2 与Hive集成说明及依赖准备
  • 3 Flink与Hive集成验证
  • 4 异常处理
相关产品与服务
专用宿主机
专用宿主机(CVM Dedicated Host,CDH)提供用户独享的物理服务器资源,满足您资源独享、资源物理隔离、安全、合规需求。专用宿主机搭载了腾讯云虚拟化系统,购买之后,您可在其上灵活创建、管理多个自定义规格的云服务器实例,自主规划物理资源的使用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档