温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。
Fayson的github: https://github.com/fayson/cdhproject
提示:代码块部分可以左右滑动查看噢
1
文档编写目的
Fayson在前面写过多篇StreamSets的文章,本篇文章主要介绍通过StreamSets实时的方式读取本地的数据文件,通过解析处理将文件中的内容写入到Kudu中。在进行本篇文章学习前你还需要了解:
《如何在CDH中安装和使用StreamSets》
1.测试环境准备
2.准备测试数据
3.配置StreamSets
4.流程测试及数据验证
1.RedHat7.4
2.CM和CDH版本为6.1.0
3.Kudu 1.8.0
2
测试环境准备
1.通过Hue使用Impala创建一个Kudu表,创建脚本如下:
CREATE TABLE user_info_kudu (
id STRING COMPRESSION snappy,
name STRING COMPRESSION snappy,
sex STRING COMPRESSION snappy,
city STRING COMPRESSION snappy,
occupation STRING COMPRESSION snappy,
mobile_phone_num STRING COMPRESSION snappy,
fix_phone_num STRING COMPRESSION snappy,
bank_name STRING COMPRESSION snappy,
address STRING COMPRESSION snappy,
marriage STRING COMPRESSION snappy,
child_num INT COMPRESSION snappy,
PRIMARY KEY (id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='master,hadoop13'
);
在创建Kudu表的时候增加了kudu.master的配置参数,如果Impala中未集成kudu则需要增加该参数,集成方式如下:
2.准备测试数据文件
[root@hadoop13 data]# cat user_infoaa.txt
411025200708151236,濮敬才,1,竹山县,生产工作、运输工作和部分体力劳动者,13702734056,15103111241,广州银行48,台东东二路21号-20-7,0,2
653000199408254560,人思巧,0,怀化,商业工作人员,15305590235,15306212544,广州银行17,台东东二路21号-20-7,0,0
500000195305076075,詹致,1,商丘,企事业单位的负责人,13507721161,15105419035,广州银行81,台东东二路21号-20-2,0,4
130522198207211990,和东,1,阳泉,商业工作人员,13205104083,13105301541,广州银行6,台东东二路21号-2-9,0,0
准备了两个数据文件共100条测试数据,数据的id是唯一的。
3.在StreamSets服务所在节点上创建一个/data1/tmp的数据目录,用于配置StreamSets的采集目录
3
创建Pipline
1.登录StreamSets,创建一个directory2kudu的Pipline
2.在Pipline流程中添加Directory作为源并配置基础信息
3.配置Kafka相关信息,如Broker、ZK及Topic
配置采集的数据目录及文件读取方式
配置数据格式化方式,由于数据文件是以“,”分割因此选择CSV方式
Root Field Type选择为List,为会每行数据转化成List<Map<String, String>>格式的数据。
4.配置数据解析模块,这里选择使用“JavaScript Evaluator”
在JavaScript配置项选择处理数据的方式为Batch by Batch
配置数据解析代码,在Script配置项增加如下代码片段
for(var i = 0; i < records.length; i++) {
try {
var info = records[i];
var newRecord = sdcFunctions.createRecord(true);
var userInfoMap = sdcFunctions.createMap(true);
userInfoMap.id = info.value[0]['value'];
userInfoMap.name = info.value[1]['value'];
userInfoMap.sex = info.value[2]['value'];
userInfoMap.city = info.value[3]['value'];
userInfoMap.occupation = info.value[4]['value'];
userInfoMap.tel = info.value[5]['value'];
userInfoMap.fixPhoneNum = info.value[5]['value'];
userInfoMap.bankName = info.value[7]['value'];
userInfoMap.address = info.value[8]['value'];
userInfoMap.marriage = info.value[9]['value'];
userInfoMap.childNum = info.value[10]['value'];
newRecord.value = userInfoMap;
output.write(newRecord);
} catch (e) {
// Send record to error
error.write(records[i], e);
}
}
5.添加Kudu模块及配置基本信息
6.配置Kudu的Master、Table、Operation等
Kudu Masters:可以配置多个,多个地址以“,”分割
Table Name:如果使用Impala创建的Kudu表则需要添加impala::前缀
Field to Column Mapping:配置Json中key与Kudu表的column的映射关系,如果字段名称一致则不需要配置。
Default Opertation:设置操作类型如:insert、upsert、delete
4
流程测试验证
1.启动的directory2kudu,启动成功如下图显示
2.向/data1/tmp目录下拷贝一个准备好的数据文件
可以看到Pipline监控数据的变化,采集到50条数据
user_info_kudu表数据显示有50条记录
3.再次向/data1/tmp目录拷贝一个数据文件
可以看到Pipline监控数据的变化,采集到100条数据
user_info_kudu表数据显示有100条记录
入库的数据总条数
5
总结
1.通过StreamSets可以方便的监听指定的数据目录进行数据采集,可以在Directory模块上配置文件的过滤规则、采集频率以及数据的格式化方式。
2.StreamSets的Directory模块会将数据文件的数据以行为单位解析传输,通过List或着Map的方式封装
3.通过Process提供的JavaScript Evaluator模块来进行数据解析转换为能Kudu接收大数据格式。
提示:代码块部分可以左右滑动查看噢
为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。
推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。