前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表

如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表

作者头像
Fayson
发布2018-07-12 15:31:59
4.8K0
发布2018-07-12 15:31:59
举报
文章被收录于专栏:Hadoop实操

1.文档编写目的


在前面的文章Fayson介绍了关于StreamSets的一些文章《如何在CDH中安装和使用StreamSets》、《如何使用StreamSets从MySQL增量更新数据到Hive》、《如何使用StreamSets实现MySQL中变化数据实时写入Kudu》、《如何使用StreamSets实现MySQL中变化数据实时写入HBase》、《如何使用StreamSets实时采集Kafka并入库Kudu》和《如何使用StreamSets实时采集Kafka数据并写入Hive表》,本篇文章Fayson主要介绍如何使用StreamSets实时采集Kafka中嵌套的JSON数据并将采集的数据写入Hive,StreamSets的流程处理如下:

  • 内容概述

1.测试环境准备

2.配置StreamSets

3.创建Pipline及测试

4.总结

  • 测试环境

1.RedHat7.3

2.CM和CDH版本为cdh5.13.3

3.Kafka2.2.0(0.10.0)

4.StreamSets3.3.0

  • 前置条件

1.集群已启用Sentry

2.测试环境准备


1.准备测试的JSON数据

代码语言:javascript
复制
{
  "school": 1,
  "address": 2,
  "no": "page",
  "class": 3,
  "students": [{
    "name": "page1",
    "teacher": "larry",
    "age": 40
  }, {
    "name": "page2",
    "teacher": "larry",
    "age": 50
  }, {
    "name": "page3",
    "teacher": "larry",
    "age": 51
  }]
}

(可左右滑动)

2.为sdc用户授权

由于集群已启用Sentry,所以这里需要为sdc用户授权,否则sdc用户无法向Hive库中创建表及写入数据

3.创建StreamSets的Pipline


1.登录StreamSets,创建一个kafka2hive_json的Pipline

2.在Pipline流程中添加Kafka Consumer作为源并配置Kafka基础信息

配置Kafka相关信息,如Broker、ZK、Group、Topic及Kerberos信息

配置数据格式化方式,写入Kafka的数据为JSON格式,所以这里选择JSON

3.添加JavaScript Evaluator模块,主要用于处理嵌套的JSON数据

编写JSON数据解析代码,将嵌套JSON解析为多个Record,传输给HiveMetadata

解析脚本如下:

代码语言:javascript
复制
for(var i = 0; i < records.length; i++) {
    try {
        var students = records[i].value['students'];
        log.error("---------++++++++------" + students.length);
        for(var j = 0; j< students.length; j++) {
            log.info("============" + students[0].name)
            var newRecord = sdcFunctions.createRecord(true);
            var studentMap = sdcFunctions.createMap(true);
            studentMap.no = records[i].value['no'];
            studentMap.school = records[i].value['school'];
            studentMap.class = records[i].value['class'];
            studentMap.address = records[i].value['address'];
            studentMap.name = students[j].name;
            studentMap.teacher = students[j].teacher;
            studentMap.age = students[j].age;
            newRecord.value = studentMap;
            log.info("-------------" + newRecord.value['school'])
            output.write(newRecord);
        }
    } catch (e) {
        // Send record to error
        error.write(records[i], e);
    }
}

(可左右滑动)

4.添加Hive Metadata中间处理模块,选择对应的CDH版本

配置Hive的JDBC信息

配置Hive的表信息,指定表名和库名

指定数据格式,指定为Avro,选项中有parquet格式,但在后续处理中并不支持parquet格式

5.添加Hadoop FS处理模块,主要用于将HiveMetadata的数据写入HDFS

配置Hadoop FS,配置HDFS URL和是否启用Kerberos认证

配置Hadoop FS的Out Files

注意:勾选“Directory in Header”使HDFS写入数据时使用上一步中Hive Metadata模块传递的目录,“Idle Timeout”主要是用于指定Hadoop FS模块空闲多久则将数据刷到HDFS数据目录。

配置Late Records参数,使用默认参数即可

指定写入到HDFS的数据格式

6.添加Hive Metastore模块,该模块主要用于向Hive库中创建表

配置Hive信息,JDBC访问URL

Hive Metastore的高级配置

7.点击校验流程,如下图所示则说明流程正常

到此为止完成了Kafka数据到Hive的流程配置。

4.流程测试验证


1.启动kafka2hive_json的Pipline,启动成功如下图显示

2.使用Kafka的Producer脚本向kafka_hive_topic生产消息

代码语言:javascript
复制
kafka-console-producer \
--topic kafka_hive_topic \
--broker-list cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092 

(可左右滑动)

3.在StreamSets中查看kafka2hive_json的pipline运行情况

4.使用sdc用户登录Hue查看ods_user表数据

将嵌套的JSON数据解析为3条数据插入到ods_user表中。

5.总结


1.在使用StreamSets的Kafka Consumer模块接入Kafka嵌套的JSON数据后,无法直接将数据入库到Hive,需要将嵌套的JSON数据解析,这里可以使用Evaluator模块,StreamSets支持多种语言的Evaluator(如:JavaScprit、Jython、Groovy、Expression及Spark)。

2.由于集群启用了Sentry,StreamSets默认使用sdc用户访问Hive,在想Hive库中创建表时需要为sdc用户授权,否则会报权限异常。

3.在配置Hive的JDBC是,我们需要在JDBC URL后指定user和password否则会报匿名用户无权限访问的问题,注意必须带上password。

4.HDFS模块在接收到HiveMetadata模块的数据后生成的为临时文件,不是立即将数据写入到HDFS,可以通过“Idle Timeout”参数来控制刷新数据到HDFS的频率。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-06-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Hadoop实操 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档