前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Streaming 快速入门系列(5) | 还不会DStream转换,一文带你深入了解

Spark Streaming 快速入门系列(5) | 还不会DStream转换,一文带你深入了解

作者头像
不温卜火
发布2020-10-28 17:38:26
8920
发布2020-10-28 17:38:26
举报
文章被收录于专栏:不温卜火

关于转换这方面的一些具体问题,如果想要了解可以点击下列网址进行查看http://spark.apache.org/docs/2.1.1/streaming-programming-guide.html#transformations-on-dstreams

  上图为官网的解释,我们可以翻译为:   与RDD相似,转换允许修改来自输入DStream的数据。DStream支持普通Spark RDD上可用的许多转换。

  除此之外,DStream分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()transform()以及各种Window相关的原语。

  • 一些常见的方法

  在DStream转换中,大体可分为无状态转换操作和有状态转换操作两种! 下面就围绕这两个方面进行详细讲解。

一. 无状态转换操作

  无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。

  需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。例如,reduceByKey()会化简每个时间区间中的数据,但不会化简不同区间之间的数据。

  举个例子,在之前的wordcount程序中,我们只会统计几秒内接收到的数据的单词个数,而不会累加。

  无状态转化操作也能在多个DStream间整合数据,不过也是在各个时间区间内。例如,键值对DStream拥有和RDD一样的与连接相关的转化操作,也就是cogroup()join()leftOuterJoin() 等。我们可以在DStream上使用这些操作,这样就对每个批次分别执行了对应的RDD操作。

  我们还可以像在常规的 Spark 中一样使用 DStreamunion() 操作将它和另一个DStream 的内容合并起来,也可以使用StreamingContext.union()来合并多个流。

transform操作

transform 原语允许 DStream上执行任意的RDD-to-RDD函数。

  可以用来执行一些 RDD 操作, 即使这些操作并没有在 SparkStreaming 中暴露出来.

  该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。

  • 1. 样例源码
代码语言:javascript
复制
package com.buwenbuhuo.spark.streaming.day02
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 *
 * @author 不温卜火
 * @create 2020-08-12 18:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object TransformDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TransformDemo").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(3))
    val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop002", 9999)

    val resultDStream = dstream.transform(rdd => {
      rdd.flatMap(_.split("\\W")).map((_, 1)).reduceByKey(_ + _)
    })
    resultDStream.print


    ssc.start()
    ssc.awaitTermination()
  }

}
  • 2. 运行结果

二. 有状态转换操作

  此部分主要介绍两个有状态的操作

2.1 updateStateByKey

  上图为官方解释,下面为翻译:

updateStateByKey操作允许在使用新信息不断更新状态的同时能够保留他的状态.

需要做两件事情:

  1. 定义状态. 状态可以是任意数据类型
  2. 定义状态更新函数. 指定一个函数, 这个函数负责使用以前的状态和新值来更新状态.

  在每个阶段, Spark 都会在所有已经存在的 key 上使用状态更新函数, 而不管是否有新的数据在.

代码语言:javascript
复制
def updateStateByKey[S: ClassTag](
                 updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
  • 1. 样例源码
代码语言:javascript
复制
package com.buwenbuhuo.spark.streaming.day02

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 *
 * @author 不温卜火
 * @create 2020-08-12 18:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object UpstateByKeyDemo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UpstateByKeyDemo")
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(3))
    ssc.checkpoint("ck1")

    ssc
        .socketTextStream("hadoop002",9999)
        .flatMap(_.split("\\W+"))
        .map((_,1))
        .updateStateByKey[Int]((seq:Seq[Int],opt:Option[Int]) =>{
                Some(seq.sum + opt.getOrElse(0))
    })
        .print(1000)


    ssc.start()
    ssc.awaitTermination()
  }
}
  • 2. 运行结果
  • 3. 源码流解析

2.2 window 操作(窗口操作)

  Spark Streaming 也提供了窗口计算, 允许执行转换操作作用在一个窗口内的数据.

  默认情况下, 计算只对一个时间段内的RDD进行, 有了窗口之后, 可以把计算应用到一个指定的窗口内的所有 RDD 上.

  一个窗口可以包含多个时间段. 基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。

  上图所示, 窗口在 DStream 上每滑动一次, 落在窗口内的那些 RDD会结合在一起, 然后在上面操作产生新的 RDD, 组成了 window DStream.

在上面图的情况下, 操作会至少应用在 3 个数据单元上, 每次滑动 2 个时间单位. 所以, 窗口操作需要 2 个参数:

  1. 窗口长度 – 窗口的持久时间(执行一次持续多少个时间单位)(图中是 3)
  2. 滑动步长 – 窗口操作被执行的间隔(每多少个时间单位执行一次).(图中是 2 )

  注意: 这两个参数必须是源 DStream 的 interval 的倍数.

  • 一些常见的窗口操作 所有这些操作均采用上述两个参数-windowLength和slideInterval。
  • 1. 测试程序源码
代码语言:javascript
复制
package com.buwenbuhuo.spark.streaming.day02.window
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
 *
 * @author 不温卜火
 * @create 2020-08-12 18:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object Window1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Window1").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(3))

    ssc
      .socketTextStream("hadoop002",9999)
      .flatMap(_.split("\\W+"))
      .map((_,1))
        .reduceByKeyAndWindow(_ + _,Seconds(6))
        .print()

    ssc.start()
    ssc.awaitTermination()
  }
}
  • 2. 运行结果

2.3 window的优化操作

  • 1. reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration)
代码语言:javascript
复制
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
/*
参数1: reduce 计算规则
参数2: 窗口长度
参数3: 窗口滑动步长. 每隔这么长时间计算一次.
 */
val count: DStream[(String, Int)] =
wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y,Seconds(15), Seconds(10))
  • 2. reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration) 比没有invReduceFunc高效. 会利用旧值来进行计算.

invReduceFunc: (V, V) => V 窗口移动了, 上一个窗口和新的窗口会有重叠部分, 重叠部分的值可以不用重复计算了. 第一个参数就是新的值, 第二个参数是旧的值.

代码语言:javascript
复制
ssc.sparkContext.setCheckpointDir("hdfs://hadoop002:9000/checkpoint")
val count: DStream[(String, Int)] =
    wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y,(x: Int, y: Int) => x - y,Seconds(15), Seconds(10))
  • 3. window(windowLength, slideInterval) 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream
代码语言:javascript
复制
package com.buwenbuhuo.spark.streaming.day02.window

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
 *
 * @author 不温卜火
 * @create 2020-08-12 22:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object Window2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Window2").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(3))
    
    ssc.checkpoint("ck3")
    ssc
      .socketTextStream("hadoop002",9999)
      .window(Seconds(9),Seconds(6))
      .flatMap(_.split("\\W+"))
      .map((_,1))
        .reduceByKeyAndWindow(_ + _,Seconds(6))
        .print()

    ssc.start()
    ssc.awaitTermination()
  }
}
  • 4. countByWindow(windowLength, slideInterval) 返回一个滑动窗口计数流中的元素的个数。
  • 5. countByValueAndWindow(windowLength, slideInterval, [numTasks]) 对(K,V)对的DStream调用,返回(K,Long)对的新DStream,其中每个key的的对象的v是其在滑动窗口中频率。如上,可配置reduce任务数量。

  本次的分享就到这里了

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/08/20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. 无状态转换操作
    • transform操作
    • 二. 有状态转换操作
      • 2.1 updateStateByKey
        • 2.2 window 操作(窗口操作)
          • 2.3 window的优化操作
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档