前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SparkStreaming学习笔记

SparkStreaming学习笔记

作者头像
曼路
发布2018-10-18 15:14:55
1K0
发布2018-10-18 15:14:55
举报
文章被收录于专栏:浪淘沙浪淘沙

一、Spark Streaming基础

  •     1:什么是SparkStreaming?

        (*)Spark Streaming是核心Spark API的扩展,可实现可扩展、高吞吐量、可容错的实时数据流处理。数据可以从诸如Kafka,Flume,Kinesis或TCP套接字等众多来源获取,并且可以使用由高级函数(如map,reduce,join和window)开发的复杂算法进行流数据处理。最后,处理后的数据可以被推送到文件系统,数据库和实时仪表板。而且,您还可以在数据流上应用Spark提供的机器学习和图处理算法。

  •     2:SparkStreaming的内部结构:本质是一个个的RDD(RDD其实是离散流,不连续)

        (*)问题:Spark Streaming是如何处理连续的数据         Spark Streaming将连续的数据流抽象为discretizedstream或DStream。在内部,DStream 由一个RDD序列表示。

  •     3:Demo:NetworkWordCount 单词计数(实时计算)

        一定注意一个问题:必须保证虚拟机的CPU的核数至少为2         原因:(1)一个接收数据               (2)一个处理数据        中文版官网

代码语言:javascript
复制
 http://spark.apachecn.org/
  • 注意:

            当在本地运行一个 Spark Streaming 程序的时候,不要使用 “local” 或者 “local[1]” 作为 master 的 URL. 这两种方法中的任何一个都意味着只有一个线程将用于运行本地任务. 如果你正在使用一个基于接收器(receiver)的输入离散流(input DStream)(例如, sockets ,Kafka ,Flume 等),则该单独的线程将用于运行接收器(receiver),而没有留下任何的线程用于处理接收到的数据. 因此,在本地运行时,总是用 “local[n]” 作为 master URL ,其中的 n > 运行接收器的数量(查看 Spark 属性 来了解怎样去设置 master 的信息).             将逻辑扩展到集群上去运行,分配给 Spark Streaming 应用程序的内核(core)的内核数必须大于接收器(receiver)的数量。否则系统将接收数据,但是无法处理它.

  •  简单实现                                                                                                                                                                                      使用工具:NetCat 实现实时流

        步骤:(1)启动netcat服务器

代码语言:javascript
复制
nc -lk 1234

              (2)启动SparkStreaming的客户端

代码语言:javascript
复制
 bin/run-example streaming.NetworkWordCount bigdata01 1234

        安装nc步骤:             (1)先将已安装的nc删除:

代码语言:javascript
复制
yum erase nc

            (2)下载较低版本的nc的.rpm文件

代码语言:javascript
复制
wget http://vault.centos.org/6.6/os/x86_64/Packages/nc-1.84-22.el6.x86_64.rpm

            (3)安装.rpm文件

代码语言:javascript
复制
 rpm -iUv nc-1.84-22.el6.x86_64.rpm

            执行以上步骤命令后检查nc是否安装好,执行 nc -lk 1234

代码语言:javascript
复制
bin/run-example streaming.NetworkWordCount localhost 1234

    4:开发一个自己的NetworkWordCount程序         注意:相对于Storm来说,Spark Streaming不能用于实时性要求非常高的场景

二、Spark steaming进阶

  •     1:核心对象:StreamingContext

        (*)在Spark中,Spark Core              -> SparkContext                ->抽象RDD                                      Spark Sql                -> SqlContent                    ->抽象DataFrame                                      Spark Streaming      -> StreamingContext        ->抽象DStream         (*)方便管理和操作的统一对象(Spark2.0)SparkSession         (*)StreamingContent创建方式有两种:             (1)通过SparkConf创建

代码语言:javascript
复制
val sparkConf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(3))

            (2)通过现有的SparkContext对象sc来创建

代码语言:javascript
复制
 val sc = new SparkContext(....)
 val ssc = new StreamingContext(sc, Seconds(3))
  1.         请务必记住以下几点:
    1. 一旦一个StreamingContextt开始运作,就不能设置或添加新的流计算。
    2. 一旦一个上下文被停止,它将无法重新启动。
    3. 同一时刻,一个JVM中只能有一个StreamingContext处于活动状态。
    4. StreamingContext上的stop()方法也会停止SparkContext。 要仅停止StreamingContext(保持SparkContext活跃),请将stop() 方法的可选参数stopSparkContext设置为false。
    5. 只要前一个StreamingContext在下一个StreamingContext被创建之前停止(不停止SparkContext),SparkContext就可以被重用来创建多个StreamingContext。
  •        2:核心概念:DStream离散流-》RDD

        (*)本质:将连续的数据变成不 连续的RDD-》DStream

  •     3:DStream离散流的算子:Transformation和Action

        (*)tranform(func)                 通过RDD-to-RDD函数作用于源DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD                 相当于map操作

                举例:在NetworkWordCount中,也可以使用transform来生成元组对         (*)UpdateStateByKey(func) -》在原来的状态上进行更新,需要设置检查点             操作允许不断用新信息更新它的同时保持任意状态。             定义状态-状态可以是任何的数据类型             定义状态更新函数-怎样利用更新前的状态和从输入流里面获取的新值更新状态             重写NetworkWordCount程序,累计每个单词出现的频率(注意:累计)

  •     4:窗口操作

        定义窗口:(1)窗口的长度 (2)滑动举例          举例:NetWorkwordCount,每隔8秒,把过去30秒产生的字符串进行单词计数                 (1)窗口的长度  30秒                 (2)滑动间隔(每次滑动的时间长度)                 原因是:滑动的距离,必须是采样时间的整数倍

  •     5:输入:接收器(基本数据源)

        (*)Socket接收             //创建一个离散流,DStream代表输入的数据流

代码语言:javascript
复制
            val lines: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata01", 5678)

        (*)文件流 类似于——>Flume

代码语言:javascript
复制
package demo

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FileStreaming {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[2]").setAppName("FileStreaming")
    val ssc = new StreamingContext(conf,Seconds(2))

    //从本地目录中读取数据:如果有新文件产生,就会读取进来
    val lines = ssc.textFileStream("d:\\dowload\\spark123")

    //打印结果
    lines.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
  •     6:输出

        (*)Mysql             举例:WordCount         (*)Redis         (*)HDFS         (*)HBase

  •     7;集成Spark Sql:使用Sql语句方式处理流式数据

        举例:WordCount

代码语言:javascript
复制
//使用Spark SQL来查询Spark Streaming处理的数据
    words.foreachRDD { rdd =>
      //使用单列模式,创建SparkSession对象
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()

      import spark.implicits._
      // 将RDD[String]转换为DataFrame
      val wordsDataFrame = rdd.toDF("word")

      // 创建临时视图
      wordsDataFrame.createOrReplaceTempView("words")

      // 执行SQL
      val wordCountsDataFrame =   spark.sql("select word, count(*) as total from words group by word")
      wordCountsDataFrame.show()
    }

    8:缓存和持久化

          与RDD类似,DStreams还允许开发人员将流数据保留在内存中。也就是说,在DStream上调用persist() 方法会自动将该DStream的每个RDD保留在内存中     9:检查点

          流数据处理程序通常都是全天候运行,因此必须对应用中逻辑无关的故障(例如,系统故障,JVM崩溃等)具有弹性。为了实现这一特性,Spark Streaming需要checkpoint足够的信息到容错存储系统,以便可以从故障中恢复。

四、性能优化

1、减少批数据的执行时间

在Spark中有几个优化可以减少批处理的时间:

  • 数据接收的并行水平

通过网络(如kafka,flume,socket等)接收数据需要这些数据反序列化并被保存到Spark中。如果数据接收成为系统的瓶颈,就要考虑并行地接收数据。注意,每个输入DStream创建一个receiver(运行在worker机器上)接收单个数据流。创建多个输入DStream并配置它们可以从源中接收不同分区的数据流,从而实现多数据流接收。例如,接收两个topic数据的单个输入DStream可以被切分为两个kafka输入流,每个接收一个topic。这将在两个worker上运行两个receiver,因此允许数据并行接收,提高整体的吞吐量。多个DStream可以被合并生成单个DStream,这样运用在单个输入DStream的transformation操作可以运用在合并的DStream上。

  • 数据处理的并行水平

如果运行在计算stage上的并发任务数不足够大,就不会充分利用集群的资源。默认的并发任务数通过配置属性来确定spark.default.parallelism

  • 数据序列化

可以通过改变序列化格式来减少数据序列化的开销。在流式传输的情况下,有两种类型的数据会被序列化:

  1. 输入数据
  2. 由流操作生成的持久RDD

在上述两种情况下,使用Kryo序列化格式可以减少CPU和内存开销。

2、设置正确的批容量

为了Spark Streaming应用程序能够在集群中稳定运行,系统应该能够以足够的速度处理接收的数据(即处理速度应该大于或等于接收数据的速度)。这可以通过流的网络UI观察得到。批处理时间应该小于批间隔时间。

根据流计算的性质,批间隔时间可能显著的影响数据处理速率,这个速率可以通过应用程序维持。可以考虑WordCountNetwork这个例子,对于一个特定的数据处理速率,系统可能可以每2秒打印一次单词计数(批间隔时间为2秒),但无法每500毫秒打印一次单词计数。所以,为了在生产环境中维持期望的数据处理速率,就应该设置合适的批间隔时间(即批数据的容量)。

找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。

3、内存调优

在这一节,我们重点介绍几个强烈推荐的自定义选项,它们可以减少Spark Streaming应用程序垃圾回收的相关暂停,获得更稳定的批处理时间。

  1. Default persistence level of DStreams:和RDDs不同的是,默认的持久化级别是序列化数据到内存中(DStream是StorageLevel.MEMORY_ONLY_SER,RDD是StorageLevel.MEMORY_ONLY)。即使保存数据为序列化形态会增加序列化/反序列化的开销,但是可以明显的减少垃圾回收的暂停。
  2. Clearing persistent RDDs:默认情况下,通过Spark内置策略(LUR),Spark Streaming生成的持久化RDD将会从内存中清理掉。如果spark.cleaner.ttl已经设置了,比这个时间存在更老的持久化RDD将会被定时的清理掉。正如前面提到的那样,这个值需要根据Spark Streaming应用程序的操作小心设置。然而,可以设置配置选项spark.streaming.unpersist为true来更智能的去持久化(unpersist)RDD。这个配置使系统找出那些不需要经常保有的RDD,然后去持久化它们。这可以减少Spark RDD的内存使用,也可能改善垃圾回收的行为。
  3. Concurrent garbage collector:使用并发的标记-清除垃圾回收可以进一步减少垃圾回收的暂停时间。尽管并发的垃圾回收会减少系统的整体吞吐量,但是仍然推荐使用它以获得更稳定的批处理时间。
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年10月14日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Spark Streaming基础
  • 二、Spark steaming进阶
  • 四、性能优化
    • 1、减少批数据的执行时间
    • 2、设置正确的批容量
    • 3、内存调优
    相关产品与服务
    数据库
    云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档