SparkStreaming窗口操作

黄文辉同学第三篇的总结,大家支持。

  1. 概述

SparkStreaming提供了窗口的计算,它允许你对数据的滑动窗口应用转换。基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。下图说明了滑动窗口计算。

每个窗口时间滑动过original DStream,落入窗口内的RDD合并并操作,产生windowed DStream的RDDS。上图这种情况,窗口时长为3个时间单位,步长为2个时间单位。因此,这表明任何窗口操作需要指定2个参数。

  1. 窗口长度(window length),窗口的持续时间。
  2. 滑动窗口时间间隔(slide interval),执行基于窗口操作计算的时间间隔。(默认值与批处理间隔时间相等)。

注意,这两个参数必须是源DStream批处理时间间隔的倍数。

SparkStreaming提供一些基于窗口的操作函数,我们来使用window(windowLength,slideInterval)这个函数来表示上图的滑动窗口操作,假设批处理时间间隔为10秒,那么窗口时间为30秒,每隔20秒生成数据。那么函数参数设置为:

// 注:pairs是经过处理的DStream,JavaPairDStream<String, Integer> pairs

pairs.window(Durations.seconds(30), Durations.seconds(20));

下表是一些常见基于窗口的操作:

2.窗口操作分析

下面,通过代码执行这些方法来进行具体分析,代码如下:

SparkConf sparkConf = new SparkConf().setAppName("StreamingWindowJob");
//批处理时间间隔为10秒
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
jsc.checkpoint("/test/streaming/checkpoint");//状态转换使用,必须开启检查点
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("localhost", 9996);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
	public Iterator<String> call(String t) throws Exception {
		return Arrays.asList(t.split(" ")).iterator();
	}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
	public Tuple2<String, Integer> call(String t) throws Exception {
		return new Tuple2<String, Integer>(t, 1);
	}
});
JavaPairDStream<String, Integer> data;
//window操作  窗口长度为30秒,处理间隔10秒(默认跟批处理时间一致)
data = pairs.window(Durations.seconds(30));
data.print(1000);
jsc.start();
jsc.awaitTermination();
  1. window()操作

根据上面代码,设置data = pairs.window(Durations.seconds(30))时,窗口的操作时间为30秒,处理间隔10秒(默认跟批处理时间一致)。运行结果如下图所示:

说明:数据源的发送时间为每10秒发送一个单词。下面方法的发送规则也一致。

根据设置可知,每个窗口是3个批次,每隔一个批次就对前面3个批次的数据进行一次计算。根据结果,窗口计算流程如下:

  1. 在第一个窗口,index为1,2,3的数据进入窗口,处理完后,index为1的批次离开窗口;
  2. 在第二个窗口中,index为4的数据进入窗口,然后继续进行第二个窗口的计算处理,处理完毕,index为2的数据离开窗口。
  3. 在第二个窗口中,index为5的数据进入窗口,然后继续进行第二个窗口的计算处理,处理完毕,index为3的数据离开窗口。后面的窗口处理流程一直如此循环下去。

如果设置成data = pairs.window(Durations.seconds(30),Durations.seconds(20)),则是每个窗口是3个批次,每隔2个批次就对前面3个批次的数据进行一次计算。此时将会是有2个旧批次数据离开窗口,2个新批次数据进入窗口。运行结果如下图所示:

2)reduceByWindow和reduceByKeyAndWindow操作

使用reduceByWindow()和reduceByKeyAndWindow()我们可以对每个窗口进行聚合操作,下面使用reduceByKeyAndWindow()对每个窗口进行单词统计计算,设置data=pairs. reduceByKeyAndWindow (reduceFunc,Durations.seconds(50),Durations.seconds(20)),其中聚合函数reduceFunc为:

Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer, Integer>() {
public Integer call(Integer t1, Integer t2) throws Exception {
                return t1 + t2;
            }
        };

根据上述代码,重新运行结果如下图:

根据运行代码设置可知,每个窗口有5个批次,每隔2个批次就对前面5个批次进行聚合操作,聚合计算的数据仅限于该窗口的数据。

3)reduceByKeyAndWindow高效操作

reduceByKeyAndWindow该计算方法还有一种高效计算形式,通过只考虑新进入窗口的数据和离开窗口的数据,让spark增量计算聚合结果,使计算更加高效。该形式需要提供聚合函数的一个逆函数,比如聚合函数为+,则逆函数为-。下面将通过代码运行来说明该方法的运行方式,

data=pairs. reduceByKeyAndWindow (reduceFunc,invReduceFunc,Durations.seconds(50),Durations.seconds(20)),其中逆函数invReduceFunc为

Function2<Integer, Integer, Integer> invReduceFunc = new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer t1, Integer t2) throws Exception {
                return t1 - t2;
            }
        };

运行结果如下图所示:

从运行结果中可以分析,每个窗口有5个批次,每隔2个批次就对前面5个批次进行聚合操作,计算流程如下:

  1. index为2,3,4,5,6这5个批次的数据进入第一个窗口(红色窗口),进行聚合计算,聚合结果如上图红色箭头指向的数据集。
  2. 第一个窗口计算完成后,index为2,3的数据离开窗口,index为7,8的数据进入第二个窗口(蓝色窗口),然后进行第二个窗口聚合计算,得到第二窗口结果集(蓝色箭头指向)。根据第一窗口结果集跟第二窗口结果集对比,因为index为2,3的数据(即单词为spark和java)离开窗口,所以这两个数据根据逆函数进行计算,分别减1,得出单词spark数量为2-1=1,单词java数量为1-1=0。index为4,5,6的数据是共有批次数据,可以复用这几个批次数据。而index为7,8的数据(即单词hive和hbase)进入窗口,这两个批次数据进行聚合函数操作,即单词hive和hbase数量分别加1。所以得出第二窗口结果集。
  3. 第三窗口的计算形式跟第二窗口的计算一致。

所以,根据结果和计算流程可以知道,使用这种方式运行可以复用两个窗口共有的批次数据,计算增加进入窗口的数据,和使用逆函数减去离开窗口的数据。对于较大窗口,使用逆函数这种计算方式可以大大提高执行效率。

根据上图可知,当数据退出窗口后,有些单词的统计数为0,对于这种情况,可以添加过滤函数进行过滤。代码为

data = pairs.reduceByKeyAndWindow(reduceFunc, invReduceFunc, duration_windowLength, duration_slideInterval, 2, filterFunc);

其中2为numPartitions,过滤函数filterFunc为:

Function<Tuple2<String, Integer>, Boolean> filterFunc = new Function<Tuple2<String, Integer>, Boolean>() {
            public Boolean call(Tuple2<String, Integer> t1) throws Exception {
                return t1._2 == 0 ? false : true;
            }
        };

运行结果就可以吧统计数为0的过滤掉,运行结果如下图所示:

问题:

使用reduceByKeyAndWindow这个方法当选择逆函数做法时,再加入过滤函数后,这种计算方式跟不使用逆函数计算方式相比,其执行效率是怎样?

原文发布于微信公众号 - 大数据和云计算技术(jiezhu2007)

原文发表时间:2016-12-24

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏京东技术

服务器端的图像处理 | 请召唤ImageMagick助你解忧

在客户端我们可以用 PhotoShop 等 GUI 工具处理静态图片或者动态 GIF 图片,不过在服务器端对于 WEB 应用程序要处理图片格式转换,缩放裁剪,翻...

931
来自专栏Python数据科学

Seaborn从零开始学习教程(二)

在Seaborn的使用中,是可以针对数据类型而选择合适的颜色,并且使用选择的颜色进行可视化,节省了大量的可视化的颜色调整工作。

972
来自专栏hightopo

基于HTML5和WebGL的3D网络拓扑结构图

1453
来自专栏Java与Android技术栈

图像中二维码的检测和定位

所谓开操作是指先腐蚀后膨胀的操作。在之前的文章二值图像分析:案例实战(文本分离+硬币计数)曾经介绍过开操作的用途。

1003
来自专栏腾讯AlloyTeam的专栏

教你用 webgl 快速创建一个小世界

Webgl的魅力在于可以创造一个自己的3D世界,但相比较canvas2D来说,除了物体的移动旋转变换完全依赖矩阵增加了复杂度,就连生成一个物体都变得很复杂……这...

1.5K0
来自专栏Python中文社区

基于matplotlib的2D/3D抽象网格和能量曲线绘制程序

專 欄 ❈PytLab,Python 中文社区专栏作者。主要从事科学计算与高性能计算领域的应用,主要语言为Python,C,C++。熟悉数值算法(最优化方法,蒙...

2177
来自专栏落影的专栏

OpenGL ES学习阶段性总结

前言 最近观看下面这本书有感,结合之前的学习,对OpenGL的知识进行回顾。 ? 概念 帧缓存:接收渲染结果的缓冲区,为GPU指定存储渲染结果的区域。 ...

3808
来自专栏欧阳大哥的轮子

iOS的一种基于服务器下发的动态布局方案(一)

栅格布局MyGridLayout是MyLayout布局体系里面的第八种布局。这是一种将布局约束设置和视图分离的布局方式,就像HTML中的标签元素和css样式可以...

893
来自专栏前端说吧

echarts - 特殊需求实现代码汇总之【线图】篇

继7月24的echarts-柱图配置汇总后,echarts特殊配置连载第四篇 之 线图终于也被我这个懒家伙放出来了!

1183
来自专栏HT

基于HTML5和WebGL的3D网络拓扑结构图

现在,3D模型已经用于各种不同的领域。在医疗行业使用它们制作器官的精确模型;电影行业将它们用于活动的人物、物体以及现实电影;视频游戏产业将它们作为计算机与视频游...

2265

扫码关注云+社区