在实际生产中,我们经常会遇到类似kafka这种流式数据,并且原始数据并不是我们想要的,需要经过一定的逻辑处理转换为我们需要的数据。鉴于这种需求,本文采用NiFi+Spark Streaming的技术方案设计了一种针对各种外部数据源的通用实时采集处理方法。
实时采集处理方案由两部分组成:数据采集、流式处理。数据采集由NiFi中任务流采集外部数据源,并将数据写入指定端口。流式处理由Spark Streaming从NiFi中指定端口读取数据并进行相关的数据转换,然后写入kafka。整个流式采集处理框架如下:
NiFi是一个易于使用、功能强大而且可靠的数据拉取、数据处理和分发系统。NiFi是为数据流设计。它支持高度可配置的指示图的数据路由、转换和系统中介逻辑,支持从多种数据源动态拉取数据,由NSA开源,是Apache顶级项目之一,详情见:https://nifi.apache.org/。
在NiFi中,会根据不同数据源创建对应的模板,然后由模板部署任务流,任务流会采集数据源的数据,然后写入指定端口。针对不同数据源,数据采集方式不一样,例如数据库类型的数据源需要采用记录水位、增量拉取的方式进行采集。为了方便后续数据转换,此处会将数据统一转换为csv格式,例如mongodb的json数据会根据字段平铺展开第一层,object值则序列化为string。
一个最简单的任务流如下:
其中GetFile读取的文件本身就是csv格式,并带表头,如下所示:
id,name,age
1000,name1,20
1001,name2,21
1002,name3,22
UpdateAttribute会设置目标字段名、类型、转换规则,如下所示:
tid|string|.select(df("*"), df("id").cast("string").as("tid"))
tname|string|.select(df("*"), df("name").cast("string").as("tname"))
tage|string|.select(df("*"), df("age").cast("int").as("tage"))
Spark Streaming是构建在Spark上的实时计算框架,是对Spark Core API的一个扩展,它能够实现对流数据进行实时处理,并具有很好的可扩展性、高吞吐量和容错性。
Spark Streaming对接NiFi数据并进行流式处理步骤:
1.初始化context
final SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(sparkMaster);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L));
2.连接nifi中的指定输出端口
SiteToSiteClientConfig config = new SiteToSiteClient.Builder().url(nifiUrl).portName(nifiPort).buildConfig();
final JavaReceiverInputDStream<NiFiDataPacket> packetStream = ssc.receiverStream(new NiFiReceiver(config, StorageLevel.MEMORY_AND_DISK()));
3.读取端口上的流数据、属性
JavaDStream<NifiFeed> ds = packetStream.map(new Function<NiFiDataPacket, NifiFeed>() {
@Override
public NifiFeed call(NiFiDataPacket dataPacket) throws Exception {
return new NifiFeed(new String(dataPacket.getContent()), dataPacket.getAttributes());
}
});
其中NifiFeed是自定义数据结构,用于存储数据、属性。
4.数据转换
ds.foreachRDD(new VoidFunction<JavaRDD<NifiFeed>>() {
@Override
public void call(JavaRDD<NifiFeed> rdd) throws Exception {
rdd.foreachPartition(new VoidFunction<Iterator<NifiFeed>>() {
@Override
public void call(Iterator<NifiFeed> iterator) throws Exception {
try {
while (iterator.hasNext()) {
//TODO:执行数据转换
}
} catch (Exception e) {
//TODO:异常处理
}
}
});
}
});
其中数据转换需要动态执行属性中的代码,这里使用jexl开源库动态执行java代码,详情见:http://commons.apache.org/proper/commons-jexl/index.html。
5.启动服务
ssc.start();
ssc.awaitTermination();
本方案采用NiFi进行采集数据,然后经过Spark Streaming流式处理引擎,将采集的数据进行指定的转换,生成新数据发送到Kafka系统,为后续业务或流程提供,如Kylin流式模型构建。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。