前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SparkStreaming窗口操作

SparkStreaming窗口操作

作者头像
大数据和云计算技术
发布2018-03-08 15:13:07
2.5K0
发布2018-03-08 15:13:07
举报

黄文辉同学第三篇的总结,大家支持。

  1. 概述

SparkStreaming提供了窗口的计算,它允许你对数据的滑动窗口应用转换。基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。下图说明了滑动窗口计算。

每个窗口时间滑动过original DStream,落入窗口内的RDD合并并操作,产生windowed DStream的RDDS。上图这种情况,窗口时长为3个时间单位,步长为2个时间单位。因此,这表明任何窗口操作需要指定2个参数。

  1. 窗口长度(window length),窗口的持续时间。
  2. 滑动窗口时间间隔(slide interval),执行基于窗口操作计算的时间间隔。(默认值与批处理间隔时间相等)。

注意,这两个参数必须是源DStream批处理时间间隔的倍数。

SparkStreaming提供一些基于窗口的操作函数,我们来使用window(windowLength,slideInterval)这个函数来表示上图的滑动窗口操作,假设批处理时间间隔为10秒,那么窗口时间为30秒,每隔20秒生成数据。那么函数参数设置为:

// 注:pairs是经过处理的DStream,JavaPairDStream<String, Integer> pairs

代码语言:javascript
复制
pairs.window(Durations.seconds(30), Durations.seconds(20));

下表是一些常见基于窗口的操作:

2.窗口操作分析

下面,通过代码执行这些方法来进行具体分析,代码如下:

代码语言:javascript
复制
SparkConf sparkConf = new SparkConf().setAppName("StreamingWindowJob");
//批处理时间间隔为10秒
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
jsc.checkpoint("/test/streaming/checkpoint");//状态转换使用,必须开启检查点
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("localhost", 9996);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
	public Iterator<String> call(String t) throws Exception {
		return Arrays.asList(t.split(" ")).iterator();
	}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
	public Tuple2<String, Integer> call(String t) throws Exception {
		return new Tuple2<String, Integer>(t, 1);
	}
});
JavaPairDStream<String, Integer> data;
//window操作  窗口长度为30秒,处理间隔10秒(默认跟批处理时间一致)
data = pairs.window(Durations.seconds(30));
data.print(1000);
jsc.start();
jsc.awaitTermination();
  1. window()操作

根据上面代码,设置data = pairs.window(Durations.seconds(30))时,窗口的操作时间为30秒,处理间隔10秒(默认跟批处理时间一致)。运行结果如下图所示:

说明:数据源的发送时间为每10秒发送一个单词。下面方法的发送规则也一致。

根据设置可知,每个窗口是3个批次,每隔一个批次就对前面3个批次的数据进行一次计算。根据结果,窗口计算流程如下:

  1. 在第一个窗口,index为1,2,3的数据进入窗口,处理完后,index为1的批次离开窗口;
  2. 在第二个窗口中,index为4的数据进入窗口,然后继续进行第二个窗口的计算处理,处理完毕,index为2的数据离开窗口。
  3. 在第二个窗口中,index为5的数据进入窗口,然后继续进行第二个窗口的计算处理,处理完毕,index为3的数据离开窗口。后面的窗口处理流程一直如此循环下去。

如果设置成data = pairs.window(Durations.seconds(30),Durations.seconds(20)),则是每个窗口是3个批次,每隔2个批次就对前面3个批次的数据进行一次计算。此时将会是有2个旧批次数据离开窗口,2个新批次数据进入窗口。运行结果如下图所示:

2)reduceByWindow和reduceByKeyAndWindow操作

使用reduceByWindow()和reduceByKeyAndWindow()我们可以对每个窗口进行聚合操作,下面使用reduceByKeyAndWindow()对每个窗口进行单词统计计算,设置data=pairs. reduceByKeyAndWindow (reduceFunc,Durations.seconds(50),Durations.seconds(20)),其中聚合函数reduceFunc为:

代码语言:javascript
复制
Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer, Integer>() {
public Integer call(Integer t1, Integer t2) throws Exception {
                return t1 + t2;
            }
        };

根据上述代码,重新运行结果如下图:

根据运行代码设置可知,每个窗口有5个批次,每隔2个批次就对前面5个批次进行聚合操作,聚合计算的数据仅限于该窗口的数据。

3)reduceByKeyAndWindow高效操作

reduceByKeyAndWindow该计算方法还有一种高效计算形式,通过只考虑新进入窗口的数据和离开窗口的数据,让spark增量计算聚合结果,使计算更加高效。该形式需要提供聚合函数的一个逆函数,比如聚合函数为+,则逆函数为-。下面将通过代码运行来说明该方法的运行方式,

data=pairs. reduceByKeyAndWindow (reduceFunc,invReduceFunc,Durations.seconds(50),Durations.seconds(20)),其中逆函数invReduceFunc为

代码语言:javascript
复制
Function2<Integer, Integer, Integer> invReduceFunc = new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer t1, Integer t2) throws Exception {
                return t1 - t2;
            }
        };

运行结果如下图所示:

从运行结果中可以分析,每个窗口有5个批次,每隔2个批次就对前面5个批次进行聚合操作,计算流程如下:

  1. index为2,3,4,5,6这5个批次的数据进入第一个窗口(红色窗口),进行聚合计算,聚合结果如上图红色箭头指向的数据集。
  2. 第一个窗口计算完成后,index为2,3的数据离开窗口,index为7,8的数据进入第二个窗口(蓝色窗口),然后进行第二个窗口聚合计算,得到第二窗口结果集(蓝色箭头指向)。根据第一窗口结果集跟第二窗口结果集对比,因为index为2,3的数据(即单词为spark和java)离开窗口,所以这两个数据根据逆函数进行计算,分别减1,得出单词spark数量为2-1=1,单词java数量为1-1=0。index为4,5,6的数据是共有批次数据,可以复用这几个批次数据。而index为7,8的数据(即单词hive和hbase)进入窗口,这两个批次数据进行聚合函数操作,即单词hive和hbase数量分别加1。所以得出第二窗口结果集。
  3. 第三窗口的计算形式跟第二窗口的计算一致。

所以,根据结果和计算流程可以知道,使用这种方式运行可以复用两个窗口共有的批次数据,计算增加进入窗口的数据,和使用逆函数减去离开窗口的数据。对于较大窗口,使用逆函数这种计算方式可以大大提高执行效率。

根据上图可知,当数据退出窗口后,有些单词的统计数为0,对于这种情况,可以添加过滤函数进行过滤。代码为

data = pairs.reduceByKeyAndWindow(reduceFunc, invReduceFunc, duration_windowLength, duration_slideInterval, 2, filterFunc);

其中2为numPartitions,过滤函数filterFunc为:

代码语言:javascript
复制
Function<Tuple2<String, Integer>, Boolean> filterFunc = new Function<Tuple2<String, Integer>, Boolean>() {
            public Boolean call(Tuple2<String, Integer> t1) throws Exception {
                return t1._2 == 0 ? false : true;
            }
        };

运行结果就可以吧统计数为0的过滤掉,运行结果如下图所示:

问题:

使用reduceByKeyAndWindow这个方法当选择逆函数做法时,再加入过滤函数后,这种计算方式跟不使用逆函数计算方式相比,其执行效率是怎样?

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2016-12-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据和云计算技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档