SparkStreaming窗口操作

黄文辉同学第三篇的总结,大家支持。

  1. 概述

SparkStreaming提供了窗口的计算,它允许你对数据的滑动窗口应用转换。基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。下图说明了滑动窗口计算。

每个窗口时间滑动过original DStream,落入窗口内的RDD合并并操作,产生windowed DStream的RDDS。上图这种情况,窗口时长为3个时间单位,步长为2个时间单位。因此,这表明任何窗口操作需要指定2个参数。

  1. 窗口长度(window length),窗口的持续时间。
  2. 滑动窗口时间间隔(slide interval),执行基于窗口操作计算的时间间隔。(默认值与批处理间隔时间相等)。

注意,这两个参数必须是源DStream批处理时间间隔的倍数。

SparkStreaming提供一些基于窗口的操作函数,我们来使用window(windowLength,slideInterval)这个函数来表示上图的滑动窗口操作,假设批处理时间间隔为10秒,那么窗口时间为30秒,每隔20秒生成数据。那么函数参数设置为:

// 注:pairs是经过处理的DStream,JavaPairDStream<String, Integer> pairs

pairs.window(Durations.seconds(30), Durations.seconds(20));

下表是一些常见基于窗口的操作:

2.窗口操作分析

下面,通过代码执行这些方法来进行具体分析,代码如下:

SparkConf sparkConf = new SparkConf().setAppName("StreamingWindowJob");
//批处理时间间隔为10秒
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
jsc.checkpoint("/test/streaming/checkpoint");//状态转换使用,必须开启检查点
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("localhost", 9996);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
	public Iterator<String> call(String t) throws Exception {
		return Arrays.asList(t.split(" ")).iterator();
	}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
	public Tuple2<String, Integer> call(String t) throws Exception {
		return new Tuple2<String, Integer>(t, 1);
	}
});
JavaPairDStream<String, Integer> data;
//window操作  窗口长度为30秒,处理间隔10秒(默认跟批处理时间一致)
data = pairs.window(Durations.seconds(30));
data.print(1000);
jsc.start();
jsc.awaitTermination();
  1. window()操作

根据上面代码,设置data = pairs.window(Durations.seconds(30))时,窗口的操作时间为30秒,处理间隔10秒(默认跟批处理时间一致)。运行结果如下图所示:

说明:数据源的发送时间为每10秒发送一个单词。下面方法的发送规则也一致。

根据设置可知,每个窗口是3个批次,每隔一个批次就对前面3个批次的数据进行一次计算。根据结果,窗口计算流程如下:

  1. 在第一个窗口,index为1,2,3的数据进入窗口,处理完后,index为1的批次离开窗口;
  2. 在第二个窗口中,index为4的数据进入窗口,然后继续进行第二个窗口的计算处理,处理完毕,index为2的数据离开窗口。
  3. 在第二个窗口中,index为5的数据进入窗口,然后继续进行第二个窗口的计算处理,处理完毕,index为3的数据离开窗口。后面的窗口处理流程一直如此循环下去。

如果设置成data = pairs.window(Durations.seconds(30),Durations.seconds(20)),则是每个窗口是3个批次,每隔2个批次就对前面3个批次的数据进行一次计算。此时将会是有2个旧批次数据离开窗口,2个新批次数据进入窗口。运行结果如下图所示:

2)reduceByWindow和reduceByKeyAndWindow操作

使用reduceByWindow()和reduceByKeyAndWindow()我们可以对每个窗口进行聚合操作,下面使用reduceByKeyAndWindow()对每个窗口进行单词统计计算,设置data=pairs. reduceByKeyAndWindow (reduceFunc,Durations.seconds(50),Durations.seconds(20)),其中聚合函数reduceFunc为:

Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer, Integer>() {
public Integer call(Integer t1, Integer t2) throws Exception {
                return t1 + t2;
            }
        };

根据上述代码,重新运行结果如下图:

根据运行代码设置可知,每个窗口有5个批次,每隔2个批次就对前面5个批次进行聚合操作,聚合计算的数据仅限于该窗口的数据。

3)reduceByKeyAndWindow高效操作

reduceByKeyAndWindow该计算方法还有一种高效计算形式,通过只考虑新进入窗口的数据和离开窗口的数据,让spark增量计算聚合结果,使计算更加高效。该形式需要提供聚合函数的一个逆函数,比如聚合函数为+,则逆函数为-。下面将通过代码运行来说明该方法的运行方式,

data=pairs. reduceByKeyAndWindow (reduceFunc,invReduceFunc,Durations.seconds(50),Durations.seconds(20)),其中逆函数invReduceFunc为

Function2<Integer, Integer, Integer> invReduceFunc = new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer t1, Integer t2) throws Exception {
                return t1 - t2;
            }
        };

运行结果如下图所示:

从运行结果中可以分析,每个窗口有5个批次,每隔2个批次就对前面5个批次进行聚合操作,计算流程如下:

  1. index为2,3,4,5,6这5个批次的数据进入第一个窗口(红色窗口),进行聚合计算,聚合结果如上图红色箭头指向的数据集。
  2. 第一个窗口计算完成后,index为2,3的数据离开窗口,index为7,8的数据进入第二个窗口(蓝色窗口),然后进行第二个窗口聚合计算,得到第二窗口结果集(蓝色箭头指向)。根据第一窗口结果集跟第二窗口结果集对比,因为index为2,3的数据(即单词为spark和java)离开窗口,所以这两个数据根据逆函数进行计算,分别减1,得出单词spark数量为2-1=1,单词java数量为1-1=0。index为4,5,6的数据是共有批次数据,可以复用这几个批次数据。而index为7,8的数据(即单词hive和hbase)进入窗口,这两个批次数据进行聚合函数操作,即单词hive和hbase数量分别加1。所以得出第二窗口结果集。
  3. 第三窗口的计算形式跟第二窗口的计算一致。

所以,根据结果和计算流程可以知道,使用这种方式运行可以复用两个窗口共有的批次数据,计算增加进入窗口的数据,和使用逆函数减去离开窗口的数据。对于较大窗口,使用逆函数这种计算方式可以大大提高执行效率。

根据上图可知,当数据退出窗口后,有些单词的统计数为0,对于这种情况,可以添加过滤函数进行过滤。代码为

data = pairs.reduceByKeyAndWindow(reduceFunc, invReduceFunc, duration_windowLength, duration_slideInterval, 2, filterFunc);

其中2为numPartitions,过滤函数filterFunc为:

Function<Tuple2<String, Integer>, Boolean> filterFunc = new Function<Tuple2<String, Integer>, Boolean>() {
            public Boolean call(Tuple2<String, Integer> t1) throws Exception {
                return t1._2 == 0 ? false : true;
            }
        };

运行结果就可以吧统计数为0的过滤掉,运行结果如下图所示:

问题:

使用reduceByKeyAndWindow这个方法当选择逆函数做法时,再加入过滤函数后,这种计算方式跟不使用逆函数计算方式相比,其执行效率是怎样?

原文发布于微信公众号 - 大数据和云计算技术(jiezhu2007)

原文发表时间:2016-12-24

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java技术

Java多线程编程-(16)-无锁CAS操作以及Java中Atomic并发包的“18罗汉”

通过上面的学习,我们应该很清楚的知道了在多线程并发情况下如何保证数据的安全性和一致性的两种主要方法:一种是加锁,另一种是使用ThreadLocal。锁是一种以时...

663
来自专栏java 成神之路

java内存模型

3167
来自专栏Java进阶之路

Java分布式缓存框架Ehcache 使用(二)

1465
来自专栏祝威廉

自定义Spark Partitioner提升es-hadoop Bulk效率

之前写过一篇文章,如何提高ElasticSearch 索引速度。除了对ES本身的优化以外,我现在大体思路是尽量将逻辑外移到Spark上,Spark的分布式计算能...

903
来自专栏纯洁的微笑

jvm系列(十一):Java 8-从持久代到metaspace

译者 梅小西,原文出处:http://blog.csdn.net/wang8118/article/details/45765869 Java 8介绍了一些新语...

3516
来自专栏Linyb极客之路

2016年阿里java面试题分享

(1)自我介绍 (2)JVM如何加载一个类的过程,双亲委派模型中有哪些方法? (3)HashMap如何实现的? (4)HashMap和Concurrent H...

3448
来自专栏CSDN技术头条

Java 平台反应式编程(Reactive Programming)入门

反应式编程(Reactive Programming)对有些人来说可能相对陌生一点。反应式编程是一套完整的编程体系,既有其指导思想,又有相应的框架和库的支持,并...

8176
来自专栏java一日一条

如何在 Java 中正确使用 wait, notify 和 notifyAll – 以生产者消费者模型为例

wait, notify 和 notifyAll,这些在多线程中被经常用到的保留关键字,在实际开发的时候很多时候却并没有被大家重视。本文对这些关键字的使用进行了...

332
来自专栏互联网杂技

丁点而内存知识

在C和C++语言开发中,指针、内存一直是学习的重点。因为C语言作为一种偏底层的中低级语言,提供了大量的内存直接操作的方法,这一方面使程序的灵活度最大化,同时也为...

3134
来自专栏java一日一条

如何在 Java 中正确使用 wait, notify 和 notifyAll – 以生产者消费者模型为例

wait, notify 和 notifyAll,这些在多线程中被经常用到的保留关键字,在实际开发的时候很多时候却并没有被大家重视。本文对这些关键字的使用进行了...

171

扫描关注云+社区