如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表

1.文档编写目的


在前面的文章Fayson介绍了关于StreamSets的一些文章《如何在CDH中安装和使用StreamSets》、《如何使用StreamSets从MySQL增量更新数据到Hive》、《如何使用StreamSets实现MySQL中变化数据实时写入Kudu》、《如何使用StreamSets实现MySQL中变化数据实时写入HBase》、《如何使用StreamSets实时采集Kafka并入库Kudu》和《如何使用StreamSets实时采集Kafka数据并写入Hive表》,本篇文章Fayson主要介绍如何使用StreamSets实时采集Kafka中嵌套的JSON数据并将采集的数据写入Hive,StreamSets的流程处理如下:

  • 内容概述

1.测试环境准备

2.配置StreamSets

3.创建Pipline及测试

4.总结

  • 测试环境

1.RedHat7.3

2.CM和CDH版本为cdh5.13.3

3.Kafka2.2.0(0.10.0)

4.StreamSets3.3.0

  • 前置条件

1.集群已启用Sentry

2.测试环境准备


1.准备测试的JSON数据

{
  "school": 1,
  "address": 2,
  "no": "page",
  "class": 3,
  "students": [{
    "name": "page1",
    "teacher": "larry",
    "age": 40
  }, {
    "name": "page2",
    "teacher": "larry",
    "age": 50
  }, {
    "name": "page3",
    "teacher": "larry",
    "age": 51
  }]
}

(可左右滑动)

2.为sdc用户授权

由于集群已启用Sentry,所以这里需要为sdc用户授权,否则sdc用户无法向Hive库中创建表及写入数据

3.创建StreamSets的Pipline


1.登录StreamSets,创建一个kafka2hive_json的Pipline

2.在Pipline流程中添加Kafka Consumer作为源并配置Kafka基础信息

配置Kafka相关信息,如Broker、ZK、Group、Topic及Kerberos信息

配置数据格式化方式,写入Kafka的数据为JSON格式,所以这里选择JSON

3.添加JavaScript Evaluator模块,主要用于处理嵌套的JSON数据

编写JSON数据解析代码,将嵌套JSON解析为多个Record,传输给HiveMetadata

解析脚本如下:

for(var i = 0; i < records.length; i++) {
    try {
        var students = records[i].value['students'];
        log.error("---------++++++++------" + students.length);
        for(var j = 0; j< students.length; j++) {
            log.info("============" + students[0].name)
            var newRecord = sdcFunctions.createRecord(true);
            var studentMap = sdcFunctions.createMap(true);
            studentMap.no = records[i].value['no'];
            studentMap.school = records[i].value['school'];
            studentMap.class = records[i].value['class'];
            studentMap.address = records[i].value['address'];
            studentMap.name = students[j].name;
            studentMap.teacher = students[j].teacher;
            studentMap.age = students[j].age;
            newRecord.value = studentMap;
            log.info("-------------" + newRecord.value['school'])
            output.write(newRecord);
        }
    } catch (e) {
        // Send record to error
        error.write(records[i], e);
    }
}

(可左右滑动)

4.添加Hive Metadata中间处理模块,选择对应的CDH版本

配置Hive的JDBC信息

配置Hive的表信息,指定表名和库名

指定数据格式,指定为Avro,选项中有parquet格式,但在后续处理中并不支持parquet格式

5.添加Hadoop FS处理模块,主要用于将HiveMetadata的数据写入HDFS

配置Hadoop FS,配置HDFS URL和是否启用Kerberos认证

配置Hadoop FS的Out Files

注意:勾选“Directory in Header”使HDFS写入数据时使用上一步中Hive Metadata模块传递的目录,“Idle Timeout”主要是用于指定Hadoop FS模块空闲多久则将数据刷到HDFS数据目录。

配置Late Records参数,使用默认参数即可

指定写入到HDFS的数据格式

6.添加Hive Metastore模块,该模块主要用于向Hive库中创建表

配置Hive信息,JDBC访问URL

Hive Metastore的高级配置

7.点击校验流程,如下图所示则说明流程正常

到此为止完成了Kafka数据到Hive的流程配置。

4.流程测试验证


1.启动kafka2hive_json的Pipline,启动成功如下图显示

2.使用Kafka的Producer脚本向kafka_hive_topic生产消息

kafka-console-producer \
--topic kafka_hive_topic \
--broker-list cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092 

(可左右滑动)

3.在StreamSets中查看kafka2hive_json的pipline运行情况

4.使用sdc用户登录Hue查看ods_user表数据

将嵌套的JSON数据解析为3条数据插入到ods_user表中。

5.总结


1.在使用StreamSets的Kafka Consumer模块接入Kafka嵌套的JSON数据后,无法直接将数据入库到Hive,需要将嵌套的JSON数据解析,这里可以使用Evaluator模块,StreamSets支持多种语言的Evaluator(如:JavaScprit、Jython、Groovy、Expression及Spark)。

2.由于集群启用了Sentry,StreamSets默认使用sdc用户访问Hive,在想Hive库中创建表时需要为sdc用户授权,否则会报权限异常。

3.在配置Hive的JDBC是,我们需要在JDBC URL后指定user和password否则会报匿名用户无权限访问的问题,注意必须带上password。

4.HDFS模块在接收到HiveMetadata模块的数据后生成的为临时文件,不是立即将数据写入到HDFS,可以通过“Idle Timeout”参数来控制刷新数据到HDFS的频率。

原文发布于微信公众号 - Hadoop实操(gh_c4c535955d0f)

原文发表时间:2018-06-18

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

扫码关注云+社区

领取腾讯云代金券