前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink教程-flink 1.11 使用sql将流式数据写入hive

flink教程-flink 1.11 使用sql将流式数据写入hive

作者头像
大数据技术与应用实战
发布2020-09-15 14:29:40
2.4K0
发布2020-09-15 14:29:40
举报
  • 修改hive配置
  • 案例讲解
    • 引入相关的pom
    • 构造hive catalog
    • 创建hive表
    • 将流数据插入hive,
  • 遇到的坑
    • 问题详解
    • 修改方案

修改hive配置

上一篇介绍了使用sql将流式数据写入文件系统,这次我们来介绍下使用sql将文件写入hive,对于如果想写入已经存在的hive表,则至少需要添加以下两个属性. 写入hive底层还是和写入文件系统一样的,所以对于其他具体的配置参考上一篇.

代码语言:javascript
复制
alter table table_name set TBLPROPERTIES ('is_generic'='false'); 

alter table table_name set TBLPROPERTIES ('sink.partition-commit.policy.kind'='metastore'); 


//如果想使用eventtime分区
alter table table_name set TBLPROPERTIES ('sink.partition-commit.trigger'='partition-time'); 

案例讲解

下面我们讲解一下,如何使用java程序来构建一个flink程序来写入hive。

引入相关的pom

代码语言:javascript
复制
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>

构造hive catalog

代码语言:javascript
复制
  //构造hive catalog
  String name = "myhive";
  String defaultDatabase = "default";
  String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
  String version = "3.1.2";

  HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
  tEnv.registerCatalog("myhive", hive);
  tEnv.useCatalog("myhive");
  tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
  tEnv.useDatabase("db1");

创建hive表

如果目前系统中没有存在相应的hive表,可以通过在程序中执行相应的DDL建表语句来建表,如果已经存在了,就把这段代码省略,使用上面的hive命令修改现有表,添加相应的属性。

代码语言:javascript
复制
CREATE EXTERNAL TABLE `fs_table`(
  `user_id` string, 
  `order_amount` double)
PARTITIONED BY ( 
  `dt` string, 
  `h` string, 
  `m` string)
stored as ORC 
TBLPROPERTIES (
  'sink.partition-commit.policy.kind'='metastore',
  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00'
)

将流数据插入hive,

代码语言:javascript
复制
 String insertSql = "insert into  fs_table SELECT userId, amount, " +
                     " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
  tEnv.executeSql(insertSql);

完整的代码请参考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/connectors/sql/StreamingWriteHive.java

遇到的坑

问题详解

对于如上的程序和sql,如果配置了是使用eventtime,在此程序中配置了'sink.partition-commit.trigger'='partition-time',最后发现程序没法提交分区。

分析了一下源码,问题是出在了这个方法,org.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitions。先贴上代码:

代码语言:javascript
复制
 @Override
 public List<String> committablePartitions(long checkpointId) {
  if (!watermarks.containsKey(checkpointId)) {
   throw new IllegalArgumentException(String.format(
     "Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
     checkpointId, watermarks));
  }

  long watermark = watermarks.get(checkpointId);
  watermarks.headMap(checkpointId, true).clear();

  List<String> needCommit = new ArrayList<>();
  Iterator<String> iter = pendingPartitions.iterator();
  while (iter.hasNext()) {
   String partition = iter.next();
   //通过分区的值抽取分区的时间.
   LocalDateTime partTime = extractor.extract(
     partitionKeys, extractPartitionValues(new Path(partition)));
   //判断水印是否大于分区创建时间+延迟时间
   if (watermark > toMills(partTime) + commitDelay) {
    needCommit.add(partition);
    iter.remove();
   }
  }
  return needCommit;
 }

系统通过分区值来抽取相应的分区创建时间,然后进行比对,比如我们设置的pattern是 h:$m:00 , 某一时刻我们正在往 /2020-07-06/18/20/ 这个分区下写数据,那么程序根据分区值,得到的pattern将会是2020-07-06 18:20:00,这个值在sql中是根据DATA_FORMAT函数获取的。

这个值是带有时区的, 也是我想要的, 比如我们的时区设置为东八区,2020-07-06 18:20:00这个时间是东八区的时间,换成标准UTC时间是减去八个小时,也就是2020-07-06 10:20:00,而源码中的toMills函数在处理这个东八区的时间时,并没有任何加入任何时区的处理,把这个其实应该是东八区的时间当做了UTC时间来处理,这样计算出来的值就比实际值大8小时,导致一直没有触发分区的提交。

如果我们在数据源构造的分区是UTC时间,也就是不带分区的时间,那么这个逻辑就是没有问题的,但是这样又不符合我们的实际情况,比如对于分区2020-07-06 18:20:00,我希望我的分区肯定是东八区的时间,而不是比东八区小8个小时的UTC时间2020-07-06 10:20:00。

所以针对上述情况,有两种解决方案,一种是自定义一个分区抽取类,第二,就是修改源码,改一下现在的缺省的时间分区抽取类。我个人认为修改一下缺省类更好理解,因为目前写入文件和hive这块配置和概念有点多,我不想太增加过多的配置来增加用户的难度,应该尽可能的用缺省值就能使程序很好的运行。

我们看下flink中的StreamingFileSink类,构造分区桶的时候默认是使用的DateTimeBucketAssigner,其构造分区路径就是带有时区概念的,默认就用的是本地时区。

代码语言:javascript
复制
public DateTimeBucketAssigner(String formatString) {
  this(formatString, ZoneId.systemDefault());
 }

修改方案

这个问题,也不知道算不算一个bug,我给官方提交了一个ISSUE,但是官方没有采纳,不过我觉得不符合我的习惯,所以我对这个功能进行了修改,让partition.time-extractor.timestamp-pattern提取的partiiton是带有时区的,默认情况下是本地时区。如果是非本地时区,可以指定时区,通过参数partition.time-extractor.time-zone来指定,我们可以通下面的代码获取有效的时区。

代码语言:javascript
复制
 Set<String> zoneIds = ZoneId.getAvailableZoneIds();
 zoneIds.stream().forEach(System.out::println);

比如我们东八区默认使用 Asia/Shanghai。

我基于社区的flink的tag release-1.11.0-rc4,我改了一下代码 将代码放到了github上。https://github.com/zhangjun0x01/flink/tree/release-1.11.0-rc

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-07-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与应用实战 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 修改hive配置
  • 案例讲解
    • 引入相关的pom
      • 构造hive catalog
        • 创建hive表
          • 将流数据插入hive,
          • 遇到的坑
            • 问题详解
              • 修改方案
              相关产品与服务
              大数据
              全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档