SparkStreaming窗口操作

黄文辉同学第三篇的总结,大家支持。

  1. 概述

SparkStreaming提供了窗口的计算,它允许你对数据的滑动窗口应用转换。基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。下图说明了滑动窗口计算。

每个窗口时间滑动过original DStream,落入窗口内的RDD合并并操作,产生windowed DStream的RDDS。上图这种情况,窗口时长为3个时间单位,步长为2个时间单位。因此,这表明任何窗口操作需要指定2个参数。

  1. 窗口长度(window length),窗口的持续时间。
  2. 滑动窗口时间间隔(slide interval),执行基于窗口操作计算的时间间隔。(默认值与批处理间隔时间相等)。

注意,这两个参数必须是源DStream批处理时间间隔的倍数。

SparkStreaming提供一些基于窗口的操作函数,我们来使用window(windowLength,slideInterval)这个函数来表示上图的滑动窗口操作,假设批处理时间间隔为10秒,那么窗口时间为30秒,每隔20秒生成数据。那么函数参数设置为:

// 注:pairs是经过处理的DStream,JavaPairDStream<String, Integer> pairs

pairs.window(Durations.seconds(30), Durations.seconds(20));

下表是一些常见基于窗口的操作:

2.窗口操作分析

下面,通过代码执行这些方法来进行具体分析,代码如下:

SparkConf sparkConf = new SparkConf().setAppName("StreamingWindowJob");
//批处理时间间隔为10秒
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
jsc.checkpoint("/test/streaming/checkpoint");//状态转换使用,必须开启检查点
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("localhost", 9996);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
	public Iterator<String> call(String t) throws Exception {
		return Arrays.asList(t.split(" ")).iterator();
	}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
	public Tuple2<String, Integer> call(String t) throws Exception {
		return new Tuple2<String, Integer>(t, 1);
	}
});
JavaPairDStream<String, Integer> data;
//window操作  窗口长度为30秒,处理间隔10秒(默认跟批处理时间一致)
data = pairs.window(Durations.seconds(30));
data.print(1000);
jsc.start();
jsc.awaitTermination();
  1. window()操作

根据上面代码,设置data = pairs.window(Durations.seconds(30))时,窗口的操作时间为30秒,处理间隔10秒(默认跟批处理时间一致)。运行结果如下图所示:

说明:数据源的发送时间为每10秒发送一个单词。下面方法的发送规则也一致。

根据设置可知,每个窗口是3个批次,每隔一个批次就对前面3个批次的数据进行一次计算。根据结果,窗口计算流程如下:

  1. 在第一个窗口,index为1,2,3的数据进入窗口,处理完后,index为1的批次离开窗口;
  2. 在第二个窗口中,index为4的数据进入窗口,然后继续进行第二个窗口的计算处理,处理完毕,index为2的数据离开窗口。
  3. 在第二个窗口中,index为5的数据进入窗口,然后继续进行第二个窗口的计算处理,处理完毕,index为3的数据离开窗口。后面的窗口处理流程一直如此循环下去。

如果设置成data = pairs.window(Durations.seconds(30),Durations.seconds(20)),则是每个窗口是3个批次,每隔2个批次就对前面3个批次的数据进行一次计算。此时将会是有2个旧批次数据离开窗口,2个新批次数据进入窗口。运行结果如下图所示:

2)reduceByWindow和reduceByKeyAndWindow操作

使用reduceByWindow()和reduceByKeyAndWindow()我们可以对每个窗口进行聚合操作,下面使用reduceByKeyAndWindow()对每个窗口进行单词统计计算,设置data=pairs. reduceByKeyAndWindow (reduceFunc,Durations.seconds(50),Durations.seconds(20)),其中聚合函数reduceFunc为:

Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer, Integer>() {
public Integer call(Integer t1, Integer t2) throws Exception {
                return t1 + t2;
            }
        };

根据上述代码,重新运行结果如下图:

根据运行代码设置可知,每个窗口有5个批次,每隔2个批次就对前面5个批次进行聚合操作,聚合计算的数据仅限于该窗口的数据。

3)reduceByKeyAndWindow高效操作

reduceByKeyAndWindow该计算方法还有一种高效计算形式,通过只考虑新进入窗口的数据和离开窗口的数据,让spark增量计算聚合结果,使计算更加高效。该形式需要提供聚合函数的一个逆函数,比如聚合函数为+,则逆函数为-。下面将通过代码运行来说明该方法的运行方式,

data=pairs. reduceByKeyAndWindow (reduceFunc,invReduceFunc,Durations.seconds(50),Durations.seconds(20)),其中逆函数invReduceFunc为

Function2<Integer, Integer, Integer> invReduceFunc = new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer t1, Integer t2) throws Exception {
                return t1 - t2;
            }
        };

运行结果如下图所示:

从运行结果中可以分析,每个窗口有5个批次,每隔2个批次就对前面5个批次进行聚合操作,计算流程如下:

  1. index为2,3,4,5,6这5个批次的数据进入第一个窗口(红色窗口),进行聚合计算,聚合结果如上图红色箭头指向的数据集。
  2. 第一个窗口计算完成后,index为2,3的数据离开窗口,index为7,8的数据进入第二个窗口(蓝色窗口),然后进行第二个窗口聚合计算,得到第二窗口结果集(蓝色箭头指向)。根据第一窗口结果集跟第二窗口结果集对比,因为index为2,3的数据(即单词为spark和java)离开窗口,所以这两个数据根据逆函数进行计算,分别减1,得出单词spark数量为2-1=1,单词java数量为1-1=0。index为4,5,6的数据是共有批次数据,可以复用这几个批次数据。而index为7,8的数据(即单词hive和hbase)进入窗口,这两个批次数据进行聚合函数操作,即单词hive和hbase数量分别加1。所以得出第二窗口结果集。
  3. 第三窗口的计算形式跟第二窗口的计算一致。

所以,根据结果和计算流程可以知道,使用这种方式运行可以复用两个窗口共有的批次数据,计算增加进入窗口的数据,和使用逆函数减去离开窗口的数据。对于较大窗口,使用逆函数这种计算方式可以大大提高执行效率。

根据上图可知,当数据退出窗口后,有些单词的统计数为0,对于这种情况,可以添加过滤函数进行过滤。代码为

data = pairs.reduceByKeyAndWindow(reduceFunc, invReduceFunc, duration_windowLength, duration_slideInterval, 2, filterFunc);

其中2为numPartitions,过滤函数filterFunc为:

Function<Tuple2<String, Integer>, Boolean> filterFunc = new Function<Tuple2<String, Integer>, Boolean>() {
            public Boolean call(Tuple2<String, Integer> t1) throws Exception {
                return t1._2 == 0 ? false : true;
            }
        };

运行结果就可以吧统计数为0的过滤掉,运行结果如下图所示:

问题:

使用reduceByKeyAndWindow这个方法当选择逆函数做法时,再加入过滤函数后,这种计算方式跟不使用逆函数计算方式相比,其执行效率是怎样?

原文发布于微信公众号 - 大数据和云计算技术(jiezhu2007)

原文发表时间:2016-12-24

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏AI研习社

Github 项目推荐 | 用 Keras 实现的神经网络机器翻译

本库是用 Keras 实现的神经网络机器翻译,查阅库文件请访问: https://nmt-keras.readthedocs.io/ Github 页面: ht...

36012
来自专栏SIGAI学习与实践平台

编写基于TensorFlow的应用之构建数据pipeline

本文主要以MNIST数据集为例介绍TFRecords文件如何制作以及加载使用。所讲内容可以在SIGAI 在线编程功能中的sharedata/intro_to_t...

942
来自专栏流柯技术学院

jmeter参数化随机取值实现

jmeter能用来做参数化的组件有几个,但是都没有随机取值的功能,遇到随机取值的需求怎么办呢?

1002
来自专栏量子位

TensorFlow 1.2正式发布,新增Python 3.6支持

王小新 编译整理 量子位 出品 | 公众号 QbitAI TensorFlow 1.2.0今日正式发布。 主要功能和改进点: 在Windows系统下新增对Pyt...

3314
来自专栏MixLab科技+设计实验室

自己动手做一个识别手写数字的web应用01

最近在深入地学习keras,发现网上各种教程都是教你怎么训练模型的,很少有问题提到如何把训练好的模型部署为后端服务,为web及app提供服务。 于是,我决定把学...

3908
来自专栏目标检测和深度学习

Github 项目推荐 | 用 Keras 实现的神经网络机器翻译

本库是用 Keras 实现的神经网络机器翻译,查阅库文件请访问: https://nmt-keras.readthedocs.io/ Github 页面: ht...

3677
来自专栏杨熹的专栏

TensorFlow-6-TensorBoard 可视化学习

学习资料: https://www.tensorflow.org/get_started/summaries_and_tensorboard 中文翻译: h...

3145
来自专栏Small Code

【Python】Numpy 中的 shuffle VS permutation

有时候我们会有随机打乱一个数组的需求,例如训练时随机打乱样本,我们可以使用 numpy.random.shuffle() 或者 numpy.random.per...

28411
来自专栏目标检测和深度学习

深度学习工程模板:简化加载数据、构建网络、训练模型和预测样本的流程

注意:支持在训练中调用callbacks,额外添加模型存储、TensorBoard、FPR度量等。

944
来自专栏debugeeker的专栏

《coredump问题原理探究》windows版3.1节函数桢

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/xuzhina/article/detai...

492

扫码关注云+社区