前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据开发之Flink连接Hive

大数据开发之Flink连接Hive

作者头像
码客说
发布2022-11-22 16:26:35
1.7K0
发布2022-11-22 16:26:35
举报
文章被收录于专栏:码客码客

前言

本文使用环境版本

  • Hive:2.3.9
  • Flink:flink-1.12.7-bin-scala_2.12

使用代码连接到 Hive

Hive 需要开启元数据服务

代码语言:javascript
复制
nohup hive --service metastore >/dev/null 2>&1 &

需要将配置了hive.metastore.uris的配置文件复制到项目resources路径下

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/user/hive_remote/warehouse</value>
  </property>
  <property>
    <name>hive.metastore.local</name>
    <value>false</value>
  </property>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://192.168.7.101:9083</value>
  </property>
</configuration>

依赖

代码语言:javascript
复制
<properties>
  <maven.compiler.source>8</maven.compiler.source>
  <maven.compiler.target>8</maven.compiler.target>
  <flink.version>1.12.7</flink.version>
  <scala.version>2.12.15</scala.version>
  <hadoop.version>2.7.7</hadoop.version>
  <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
  <!-- Flink Dependency -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <!-- Hive Dependency -->
  <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
  </dependency>
</dependencies>

调用代码

代码语言:javascript
复制
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableEnvironment, TableResult}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.types.Row
import org.apache.flink.util.CloseableIterator

object HiveTest {
  def main(args: Array[String]): Unit = {
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build()
    val tableEnv: TableEnvironment = TableEnvironment.create(settings)

    val name:String = "myhive"
    val defaultDataBase:String = "default"
    val hiveConfDir :String = "/data/tools/bigdata/apache-hive-2.3.9-bin/conf"

    val hive = new HiveCatalog(name, defaultDataBase, hiveConfDir )
    tableEnv.registerCatalog("myhive", hive) // 注册Catalog
    tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
    tableEnv.useCatalog("myhive") // 使用注册的Catalog ,不使用的话查不到数据
    tableEnv.useDatabase("mydb") // 设置要查询的数据库
    tableEnv.executeSql("show tables").print()
    
  }
}

获取数据

代码语言:javascript
复制
TableResult result;
String SelectTables_sql ="select * from test.testdata";
result = tableEnv.executeSql(SelectTables_sql);
result.print();

Flink SQL Cli集成Hive

环境变量

Apache Hive 是基于 Hadoop 之上构建的, 所以还需要 Hadoop 的依赖,配置好HADOOP_CLASSPATH即可。这一点非常重要,否则在使用FlinkSQL Cli查询Hive中的表时,会报如下错误:

java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf

添加HADOOP_CLASSPATH

代码语言:javascript
复制
vi /etc/profile.d/hadoop.sh

内容

代码语言:javascript
复制
#HADOOP_HOME
export HADOOP_HOME=/data/tools/bigdata/hadoop-2.7.7
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`

配置生效

代码语言:javascript
复制
source /etc/profile

测试

代码语言:javascript
复制
echo $HADOOP_CLASSPATH

添加Jar

Flink1.12集成Hive只需要在Flink的lib中添加如下三个jar包

以Hive2.3.9为例,分别为:

flink-sql-connector-hive-2.3.6_2.12-1.14.6.jar

https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.12/1.14.6/

flink-connector-hive_2.12-1.12.7.jar

https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.12.7/

hive-exec-2.3.9.jar

Hive安装路径下的lib文件夹

Flink 支持以下 Hive 版本。

1.0.x(1.0.0、1.0.1),1.1.x(1.1.0、1.1.1),1.2.x(1.2.0、1.2.1、1.2.2)

2.0.x(2.0.0、2.0.1),2.1.x(2.1.0、2.1.1),2.2.0,2.3.x(2.3.0、2.3.1、2.3.2、2.3.4、2.3.5、2.3.6)

3.1.x(3.1.0、3.1.1、3.1.2)

配置

Hive下配置

hive-site.xml

代码语言:javascript
复制
<property>
    <name>hive.metastore.uris</name>
    <value>thrift://192.168.7.101:9083</value>
</property>

Flink下配置

sql-client-defaults.yaml

该文件时Flink SQL Cli启动时使用的配置文件,该文件位于Flink安装目录的conf/文件夹下,具体的配置如下,主要是配置catalog:

代码语言:javascript
复制
catalogs: [] # empty list
# A typical catalog definition looks like:
#  - name: myhive
#    type: hive
#    hive-conf-dir: /opt/hive_conf/
#    default-database: ...

修改为

代码语言:javascript
复制
catalogs: 
  - name: myhive
    type: hive
    hive-conf-dir: /data/tools/bigdata/apache-hive-2.3.9-bin/conf
    default-database: default

开启元数据服务元数据服务

Hive 需要开启元数据服务

代码语言:javascript
复制
nohup hive --service metastore >/dev/null 2>&1 &

Hive中创建表

代码语言:javascript
复制
create table t_user(id int,name string,password string);
INSERT INTO t_user VALUES (1,'Zhang San', '123456');
select * from t_user;

退出

代码语言:javascript
复制
exit;

Flink中操作Hive中的表

首先启动FlinkSQL Cli,命令如下:

代码语言:javascript
复制
$FLINK_HOME/bin/sql-client.sh embedded

接下来,我们可以查看注册的catalog

代码语言:javascript
复制
show catalogs;

结果

default_catalog myhive

使用注册的myhive catalog

代码语言:javascript
复制
use catalog myhive;

FlinkSQL操作Hive中的表,比如查询,写入数据。

代码语言:javascript
复制
show tables;
select * from t_user;

退出

代码语言:javascript
复制
exit;

运行报错

java.lang.RuntimeException: The Yarn application application_1667981758965_0021 doesn’t run anymore

修改yarn-site.xml配置文件,原因是可能内存超过虚拟内存的限制,所以需要对yarn进行虚拟内存限制修正,将如下两个配置改为false

代码语言:javascript
复制
<property>
  <!--pmem指的是默认检查物理内存,容器使用的物理内存不能超过我们限定的内存大小,因为我们上面设置了所有容器能够使用的最大内存数量,超出这个内存限制,任务就会被kill掉-->
  <name>yarn.nodemanager.pmem-check-enabled</name>
  <value>false</value>
</property>
<property>
  <!--vmem指的是默认检查虚拟内存,容器使用的虚拟内存不能超过我们设置的虚拟内存大小-->
  <name>yarn.nodemanager.vmem-check-enabled</name>
  <value>false</value>
</property>

Flink SQL

Show

代码语言:javascript
复制
-- 列出catalog
SHOW CATALOGS;
-- 列出数据库
SHOW DATABASES;
--列出表
SHOW TABLES;
--列出函数
SHOW FUNCTIONS;
-- 列出所有激活的 module
SHOW MODULES;

create

CREATE 语句用于向当前或指定的 Catalog 中注册表、视图或函数。注册后的表、视图和函数可以在 SQL 查询中使用。

代码语言:javascript
复制
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] ]


-- 例如
CREATE TABLE Orders_with_watermark (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    'scan.startup.mode' = 'latest-offset'
);

drop

DROP 语句可用于删除指定的 catalog,也可用于从当前或指定的 Catalog 中删除一个已经注册的表、视图或函数。

代码语言:javascript
复制
--删除表
DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
--删除数据库
DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
--删除视图
DROP [TEMPORARY] VIEW  [IF EXISTS] [catalog_name.][db_name.]view_name
--删除函数
DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.]function_name;

alter

ALTER 语句用于修改一个已经在 Catalog 中注册的表、视图或函数定义。

代码语言:javascript
复制
--修改表名
ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
--设置或修改表属性
ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)
--修改视图名
ALTER VIEW [catalog_name.][db_name.]view_name RENAME TO new_view_name
--在数据库中设置一个或多个属性。若个别属性已经在数据库中设定,将会使用新值覆盖旧值。
ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)

insert

INSERT 语句用来向表中添加行(INTO是追加,OVERWRITE是覆盖)

代码语言:javascript
复制
-- 1. 插入别的表的数据
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statement

-- 2. 将值插入表中 
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES [values_row , values_row ...]



-- 追加行到该静态分区中 (date='2019-8-30', country='China')
INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China')
  SELECT user, cnt FROM page_view_source;

-- 追加行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每一行动态决定
INSERT INTO country_page_view PARTITION (date='2019-8-30')
  SELECT user, cnt, country FROM page_view_source;

-- 覆盖行到静态分区 (date='2019-8-30', country='China')
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China')
  SELECT user, cnt FROM page_view_source;

-- 覆盖行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每一行动态决定
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30')
  SELECT user, cnt, country FROM page_view_source;
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-11-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 使用代码连接到 Hive
    • 依赖
      • 调用代码
      • Flink SQL Cli集成Hive
        • 环境变量
          • 添加Jar
            • 配置
              • Hive下配置
              • Flink下配置
            • 开启元数据服务元数据服务
              • Hive中创建表
                • Flink中操作Hive中的表
                • Flink SQL
                  • Show
                    • create
                      • drop
                        • alter
                          • insert
                          相关产品与服务
                          大数据
                          全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档