前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >2021年大数据Flink(三十八):​​​​​​​Table与SQL ​​​​​​案例五 FlinkSQL整合Hive

2021年大数据Flink(三十八):​​​​​​​Table与SQL ​​​​​​案例五 FlinkSQL整合Hive

作者头像
Lansonli
发布2021-10-11 14:48:43
7700
发布2021-10-11 14:48:43
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客

案例五 FlinkSQL整合Hive

介绍

Apache Flink 1.12 Documentation: Hive

Flink集成Hive之快速入门--以Flink1.12为例 - 知乎

使用Hive构建数据仓库已经成为了比较普遍的一种解决方案。目前,一些比较常见的大数据处理引擎,都无一例外兼容Hive。Flink从1.9开始支持集成Hive,不过1.9版本为beta版,不推荐在生产环境中使用。在Flink1.10版本中,标志着对 Blink的整合宣告完成,对 Hive 的集成也达到了生产级别的要求。值得注意的是,不同版本的Flink对于Hive的集成有所差异,接下来将以最新的Flink1.12版本为例,实现Flink集成Hive

集成Hive的基本方式

Flink 与 Hive 的集成主要体现在以下两个方面:

  • 持久化元数据

Flink利用 Hive 的 MetaStore 作为持久化的 Catalog,我们可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。例如,我们可以使用HiveCatalog将其 Kafka的数据源表存储在 Hive Metastore 中,这样该表的元数据信息会被持久化到Hive的MetaStore对应的元数据库中,在后续的 SQL 查询中,我们可以重复使用它们。

  • 利用 Flink 来读写 Hive 的表

Flink打通了与Hive的集成,如同使用SparkSQL或者Impala操作Hive中的数据一样,我们可以使用Flink直接读写Hive中的表。

HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive表。不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

​​​​​​​准备工作

1.添加hadoop_classpath

vim /etc/profile

增加如下配置

export HADOOP_CLASSPATH=`hadoop classpath`

刷新配置

source /etc/profile

2.下载jar并上传至flink/lib目录

Apache Flink 1.12 Documentation: Hive

3.修改hive配置

代码语言:javascript
复制
vim /export/server/hive/conf/hive-site.xml
代码语言:javascript
复制
<property>

        <name>hive.metastore.uris</name>

        <value>thrift://node3:9083</value>

</property>

4.启动hive元数据服务

代码语言:javascript
复制
nohup /export/server/hive/bin/hive --service metastore &

​​​​​​​SQL CLI

1.修改flinksql配置

代码语言:javascript
复制
vim /export/server/flink/conf/sql-client-defaults.yaml

增加如下配置

代码语言:javascript
复制
catalogs:

   - name: myhive

     type: hive

     hive-conf-dir: /export/server/hive/conf

     default-database: default

2.启动flink集群

代码语言:javascript
复制
/export/server/flink/bin/start-cluster.sh

3.启动flink-sql客户端

代码语言:javascript
复制
/export/server/flink/bin/sql-client.sh embedded

4.执行sql:

代码语言:javascript
复制
show catalogs;

use catalog myhive;

show tables;

select * from person;

​​​​​​​代码演示

代码语言:javascript
复制
package cn.it.extend;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.hive.HiveCatalog;

/**
 * Author lanson
 * Desc
 */
public class HiveDemo {
    public static void main(String[] args){
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name            = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "./conf";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        //注册catalog
        tableEnv.registerCatalog("myhive", hive);
        //使用注册的catalog
        tableEnv.useCatalog("myhive");

        //向Hive表中写入数据
        String insertSQL = "insert into person select * from person";
        TableResult result = tableEnv.executeSql(insertSQL);

        System.out.println(result.getJobClient().get().getJobStatus());
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-05-02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 案例五 FlinkSQL整合Hive
    • 介绍
      • 集成Hive的基本方式
        • ​​​​​​​准备工作
          • 1.添加hadoop_classpath
          • 2.下载jar并上传至flink/lib目录
          • 3.修改hive配置
          • 4.启动hive元数据服务
        • ​​​​​​​SQL CLI
          • 1.修改flinksql配置
          • 2.启动flink集群
          • 3.启动flink-sql客户端
          • 4.执行sql:
        • ​​​​​​​代码演示
        相关产品与服务
        大数据
        全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档