关于转换这方面的一些具体问题,如果想要了解可以点击下列网址进行查看: http://spark.apache.org/docs/2.1.1/streaming-programming-guide.html#transformations-on-dstreams
上图为官网的解释,我们可以翻译为: 与RDD相似,转换允许修改来自输入DStream的数据。DStream支持普通Spark RDD上可用的许多转换。
除此之外,DStream分为Transformations
(转换)和Output Operations
(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()
、transform()
以及各种Window相关的原语。
在DStream转换中,大体可分为无状态转换操作和有状态转换操作两种! 下面就围绕这两个方面进行详细讲解。
无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream
在内部是由许多RDD
(批次)组成,且无状态转化操作是分别应用到每个RDD上的。例如,reduceByKey()
会化简每个时间区间中的数据,但不会化简不同区间之间的数据。
举个例子,在之前的wordcount程序中,我们只会统计几秒内接收到的数据的单词个数,而不会累加。
无状态转化操作也能在多个DStream
间整合数据,不过也是在各个时间区间内。例如,键值对DStream
拥有和RDD
一样的与连接相关的转化操作,也就是cogroup()
、join()
、leftOuterJoin()
等。我们可以在DStream
上使用这些操作,这样就对每个批次分别执行了对应的RDD操作。
我们还可以像在常规的 Spark 中一样使用 DStream
的union()
操作将它和另一个DStream
的内容合并起来,也可以使用StreamingContext.union()
来合并多个流。
transform
操作 transform
原语允许 DStream
上执行任意的RDD-to-RDD
函数。
可以用来执行一些 RDD 操作, 即使这些操作并没有在 SparkStreaming 中暴露出来.
该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。
package com.buwenbuhuo.spark.streaming.day02
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
*
* @author 不温卜火
* @create 2020-08-12 18:45
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
object TransformDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("TransformDemo").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(3))
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop002", 9999)
val resultDStream = dstream.transform(rdd => {
rdd.flatMap(_.split("\\W")).map((_, 1)).reduceByKey(_ + _)
})
resultDStream.print
ssc.start()
ssc.awaitTermination()
}
}
此部分主要介绍两个有状态的操作
上图为官方解释,下面为翻译:
updateStateByKey
操作允许在使用新信息不断更新状态的同时能够保留他的状态.
需要做两件事情:
在每个阶段, Spark 都会在所有已经存在的 key 上使用状态更新函数, 而不管是否有新的数据在.
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
package com.buwenbuhuo.spark.streaming.day02
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
*
* @author 不温卜火
* @create 2020-08-12 18:45
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
object UpstateByKeyDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UpstateByKeyDemo")
val ssc: StreamingContext = new StreamingContext(conf,Seconds(3))
ssc.checkpoint("ck1")
ssc
.socketTextStream("hadoop002",9999)
.flatMap(_.split("\\W+"))
.map((_,1))
.updateStateByKey[Int]((seq:Seq[Int],opt:Option[Int]) =>{
Some(seq.sum + opt.getOrElse(0))
})
.print(1000)
ssc.start()
ssc.awaitTermination()
}
}
Spark Streaming 也提供了窗口计算, 允许执行转换操作作用在一个窗口内的数据.
默认情况下, 计算只对一个时间段内的RDD进行, 有了窗口之后, 可以把计算应用到一个指定的窗口内的所有 RDD 上.
一个窗口可以包含多个时间段. 基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。
上图所示, 窗口在 DStream 上每滑动一次, 落在窗口内的那些 RDD会结合在一起, 然后在上面操作产生新的 RDD, 组成了 window DStream.
在上面图的情况下, 操作会至少应用在 3 个数据单元上, 每次滑动 2 个时间单位. 所以, 窗口操作需要 2 个参数:
注意: 这两个参数必须是源 DStream 的 interval 的倍数.
package com.buwenbuhuo.spark.streaming.day02.window
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
*
* @author 不温卜火
* @create 2020-08-12 18:45
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
object Window1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Window1").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(3))
ssc
.socketTextStream("hadoop002",9999)
.flatMap(_.split("\\W+"))
.map((_,1))
.reduceByKeyAndWindow(_ + _,Seconds(6))
.print()
ssc.start()
ssc.awaitTermination()
}
}
reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration)
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
/*
参数1: reduce 计算规则
参数2: 窗口长度
参数3: 窗口滑动步长. 每隔这么长时间计算一次.
*/
val count: DStream[(String, Int)] =
wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y,Seconds(15), Seconds(10))
reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration)
比没有invReduceFunc高效. 会利用旧值来进行计算.invReduceFunc: (V, V) => V 窗口移动了, 上一个窗口和新的窗口会有重叠部分, 重叠部分的值可以不用重复计算了. 第一个参数就是新的值, 第二个参数是旧的值.
ssc.sparkContext.setCheckpointDir("hdfs://hadoop002:9000/checkpoint")
val count: DStream[(String, Int)] =
wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y,(x: Int, y: Int) => x - y,Seconds(15), Seconds(10))
window(windowLength, slideInterval)
基于对源 DStream 窗化的批次进行计算返回一个新的 Dstreampackage com.buwenbuhuo.spark.streaming.day02.window
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
*
* @author 不温卜火
* @create 2020-08-12 22:45
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
object Window2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Window2").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(3))
ssc.checkpoint("ck3")
ssc
.socketTextStream("hadoop002",9999)
.window(Seconds(9),Seconds(6))
.flatMap(_.split("\\W+"))
.map((_,1))
.reduceByKeyAndWindow(_ + _,Seconds(6))
.print()
ssc.start()
ssc.awaitTermination()
}
}
countByWindow(windowLength, slideInterval)
返回一个滑动窗口计数流中的元素的个数。
countByValueAndWindow(windowLength, slideInterval, [numTasks])
对(K,V)对的DStream调用,返回(K,Long)对的新DStream,其中每个key的的对象的v是其在滑动窗口中频率。如上,可配置reduce任务数量。
本次的分享就到这里了