0604-6.1.0-如何使用StreamSets实时采集指定数据目录文件并写入库Kudu

温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

Fayson的github: https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1

文档编写目的

Fayson在前面写过多篇StreamSets的文章,本篇文章主要介绍通过StreamSets实时的方式读取本地的数据文件,通过解析处理将文件中的内容写入到Kudu中。在进行本篇文章学习前你还需要了解:

《如何在CDH中安装和使用StreamSets》

  • 内容概述

1.测试环境准备

2.准备测试数据

3.配置StreamSets

4.流程测试及数据验证

  • 测试环境

1.RedHat7.4

2.CM和CDH版本为6.1.0

3.Kudu 1.8.0

2

测试环境准备

1.通过Hue使用Impala创建一个Kudu表,创建脚本如下:

CREATE TABLE user_info_kudu (
  id STRING COMPRESSION snappy,
  name STRING COMPRESSION snappy,
  sex STRING COMPRESSION snappy,
  city STRING COMPRESSION snappy,
  occupation STRING COMPRESSION snappy,
  mobile_phone_num STRING COMPRESSION snappy,
  fix_phone_num STRING COMPRESSION snappy,
  bank_name STRING COMPRESSION snappy,
  address STRING COMPRESSION snappy,
  marriage STRING COMPRESSION snappy,
  child_num INT COMPRESSION snappy,
  PRIMARY KEY (id)
)
  PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
  TBLPROPERTIES ('kudu.master_addresses'='master,hadoop13'
);

在创建Kudu表的时候增加了kudu.master的配置参数,如果Impala中未集成kudu则需要增加该参数,集成方式如下:

2.准备测试数据文件

[root@hadoop13 data]# cat user_infoaa.txt 
411025200708151236,濮敬才,1,竹山县,生产工作、运输工作和部分体力劳动者,13702734056,15103111241,广州银行48,台东东二路21号-20-7,0,2
653000199408254560,人思巧,0,怀化,商业工作人员,15305590235,15306212544,广州银行17,台东东二路21号-20-7,0,0
500000195305076075,詹致,1,商丘,企事业单位的负责人,13507721161,15105419035,广州银行81,台东东二路21号-20-2,0,4
130522198207211990,和东,1,阳泉,商业工作人员,13205104083,13105301541,广州银行6,台东东二路21号-2-9,0,0

准备了两个数据文件共100条测试数据,数据的id是唯一的。

3.在StreamSets服务所在节点上创建一个/data1/tmp的数据目录,用于配置StreamSets的采集目录

3

创建Pipline

1.登录StreamSets,创建一个directory2kudu的Pipline

2.在Pipline流程中添加Directory作为源并配置基础信息

3.配置Kafka相关信息,如Broker、ZK及Topic

配置采集的数据目录及文件读取方式

配置数据格式化方式,由于数据文件是以“,”分割因此选择CSV方式

Root Field Type选择为List,为会每行数据转化成List<Map<String, String>>格式的数据。

4.配置数据解析模块,这里选择使用“JavaScript Evaluator”

在JavaScript配置项选择处理数据的方式为Batch by Batch

配置数据解析代码,在Script配置项增加如下代码片段

for(var i = 0; i < records.length; i++) {
    try {
        var info = records[i];
        var newRecord = sdcFunctions.createRecord(true);
        var userInfoMap = sdcFunctions.createMap(true);
        userInfoMap.id = info.value[0]['value'];
        userInfoMap.name = info.value[1]['value'];
        userInfoMap.sex = info.value[2]['value'];
        userInfoMap.city = info.value[3]['value'];
        userInfoMap.occupation = info.value[4]['value'];
        userInfoMap.tel = info.value[5]['value'];
        userInfoMap.fixPhoneNum = info.value[5]['value'];
        userInfoMap.bankName = info.value[7]['value'];
        userInfoMap.address = info.value[8]['value'];
        userInfoMap.marriage = info.value[9]['value'];
        userInfoMap.childNum = info.value[10]['value'];
        newRecord.value = userInfoMap;
        output.write(newRecord);
    } catch (e) {
        // Send record to error
        error.write(records[i], e);
    }
}

5.添加Kudu模块及配置基本信息

6.配置Kudu的Master、Table、Operation等

Kudu Masters:可以配置多个,多个地址以“,”分割

Table Name:如果使用Impala创建的Kudu表则需要添加impala::前缀

Field to Column Mapping:配置Json中key与Kudu表的column的映射关系,如果字段名称一致则不需要配置。

Default Opertation:设置操作类型如:insert、upsert、delete

4

流程测试验证

1.启动的directory2kudu,启动成功如下图显示

2.向/data1/tmp目录下拷贝一个准备好的数据文件

可以看到Pipline监控数据的变化,采集到50条数据

user_info_kudu表数据显示有50条记录

3.再次向/data1/tmp目录拷贝一个数据文件

可以看到Pipline监控数据的变化,采集到100条数据

user_info_kudu表数据显示有100条记录

入库的数据总条数

5

总结

1.通过StreamSets可以方便的监听指定的数据目录进行数据采集,可以在Directory模块上配置文件的过滤规则、采集频率以及数据的格式化方式。

2.StreamSets的Directory模块会将数据文件的数据以行为单位解析传输,通过List或着Map的方式封装

3.通过Process提供的JavaScript Evaluator模块来进行数据解析转换为能Kudu接收大数据格式。

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原文发布于微信公众号 - Hadoop实操(gh_c4c535955d0f)

原文发表时间:2019-04-18

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券