专栏首页魏晓蕾的专栏【Spark】Spark Streaming的程序运行原理及与Kafka的集成

【Spark】Spark Streaming的程序运行原理及与Kafka的集成

1、Spark Streaming介绍

(1)Streaming Streaming:是一种数据传送技术,它把客户机收到的数据变成一个稳定连续的流,源源不断地送出,使用户听到的声音或看到的图象十分平稳,而且用户在整个文件送完之前就可以开始在屏幕上浏览文件。 (2)流式计算系统 Streaming Compute 常用的有三种:Apache Storm、Spark Streaming和Apache Samza。 这三种实时计算系统都是开源的分布式系统,具有低延迟、可扩展和容错性等诸多优点,它们的共同特点在于:允许你在运行数据流代码时,将任务分配到一系列具有容错能力的计算机上并行运行。此外,它们都提供了简单的API来简化底层实现的复杂程度,并可以与批处理和交互式处理相结合。 (3)Spark Streaming Spark Streaming是基于Spark Core的,是Spark Core API的扩展,可以对实时数据流进行可扩展的、高吞吐量的、容错的准实时的微观操作的流式处理。 Spark Streaming是按照批次进行执行的,一个一个批次执行,一个批次处理的数据就是对应时间段收集得到的数据,只有当上一个批次执行完成后,下一个批次才会开始执行。(Spark Core是批量处理,Spark SQL是交互式处理,Spark Streaming是小块小块处理。) (4)Storm/JStorm Storm是完全实时的流式数据处理平台,JStorm是Java语言改写的Storm平台。Storm来一条数据就处理一条数据,对机器性能要求比较高。 Spark Streaming相比于Storm的缺点:Spark Streaming平台中数据的延迟性在高并发、大数据量的情况下比Storm高。 (5)Spark Streaming的数据处理过程 1)数据收集/数据输入 指定从哪儿读取数据,常用的有kafka、flume等。 2)数据处理 使用DStream的相关API进行数据处理操作。 3)结果输出 将结果保存到外部的存储系统中,有如下几类: a)redis、mongodb b)RDBMS c)hbase、hdfs、hive d)kafka 企业中的实时处理应用结构:[Flume ->] Kafka -> Streaming/Storm -> Kafka/HBase (6)常用的应用场景 1)活跃访客的数量 a)每个小时的访客数量 b)最近十五分钟的访客数量 c)到现在为止当天的访客数量(五分钟统计一次) d)访客的地域数量统计 2)广告的点击量统计 3)黑名单的统计 (7)Spark Streaming的程序入口 Spark Streaming的程序入口是StreamingContext,它依赖SparkContext对象,其核心抽象是DStream,可以当做RDD来进行操作。 SparkSQL的程序入口是SQLContext。

2、Spark Streaming运行WordCount实例

(1)启动服务

[beifeng@bigdata-senior ~]$ which nc
/usr/bin/nc
[beifeng@bigdata-senior ~]$ nc -lk 9999

~]$ cd /opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/
hadoop-2.5.0-cdh5.3.6]$ sbin/hadoop-daemon.sh start namenode
hadoop-2.5.0-cdh5.3.6]$ sbin/hadoop-daemon.sh start datanode
~]$ cd /opt/modules/zookeeper-3.4.5/
zookeeper-3.4.5]$ bin/zkServer.sh start
~]$ cd /opt/cdh-5.3.6/kafka_2.10-0.8.2.1/
kafka_2.10-0.8.2.1]$ bin/kafka-server-start.sh config/server.properties
~]$ cd /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/
hive-0.13.1-cdh5.3.6]$ bin/hive --service metastore &
~]$ service mysql start
Starting MySQL                                             [  OK  ]

[beifeng@bigdata-senior ~]$ jps
3149 Jps
2599 DataNode
2931 RunJar
2719 QuorumPeerMain
2797 Kafka
2538 NameNode

(2)运行Spark Streaming自带的WordCount程序

~] cd /opt/cdh-5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6
spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/run-example streaming.NetworkWordCount bigdata-senior.ibeifeng.com 9999

(3)在Spark Shell手写WordCount程序 注意:运行spark-shell的时候master至少给定两个线程,一个做数据收集,一个做数据处理。

spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-shell --master local[2]
20/01/31 18:31:06 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
20/01/31 18:31:26 INFO hive.metastore: Trying to connect to metastore with URI thrift://bigdata-senior.ibeifeng.com:9083
20/01/31 18:31:26 INFO hive.metastore: Connected to metastore.
20/01/31 18:31:27 INFO repl.SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@67565668
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._
scala> import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext._
scala> val ssc = new StreamingContext(sc, Seconds(10))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@5483020a
scala> val dstream = ssc.socketTextStream("bigdata-senior.ibeifeng.com", 9999)
dstream: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@60f5b1db
scala> val resultDStream = dstream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
resultDStream: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@2b155889
scala> resultDStream.print()

scala> ssc.start()
scala> ssc.awaitTermination()

(4)关闭服务

[beifeng@bigdata-senior ~]$ jps
2599 DataNode
2931 RunJar
2719 QuorumPeerMain
2797 Kafka
2538 NameNode
4500 Jps
[beifeng@bigdata-senior ~]$ kill -9 2931
[beifeng@bigdata-senior ~]$ jps
2599 DataNode
2719 QuorumPeerMain
2797 Kafka
2538 NameNode
4512 Jps
[beifeng@bigdata-senior ~]$ su root
Password: 
[root@bigdata-senior beifeng]# service mysql stop
Shutting down MySQL..                                      [  OK  ]
[root@bigdata-senior beifeng]# exit
exit
[beifeng@bigdata-senior ~]$ cd /opt/cdh-5.3.6/kafka_2.10-0.8.2.1/
[beifeng@bigdata-senior kafka_2.10-0.8.2.1]$ bin/kafka-server-stop.sh 
[beifeng@bigdata-senior ~]$ cd /opt/modules/zookeeper-3.4.5/
[beifeng@bigdata-senior zookeeper-3.4.5]$ bin/zkServer.sh stop
JMX enabled by default
Using config: /opt/modules/zookeeper-3.4.5/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
[beifeng@bigdata-senior ~]$ cd /opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/
[beifeng@bigdata-senior hadoop-2.5.0-cdh5.3.6]$ sbin/hadoop-daemon.sh stop namenode
stopping namenode
[beifeng@bigdata-senior hadoop-2.5.0-cdh5.3.6]$ sbin/hadoop-daemon.sh stop datanode
stopping datanode
[beifeng@bigdata-senior hadoop-2.5.0-cdh5.3.6]$ jps
4654 Jps

(5)代码实现WordCount程序

package com.ibeifeng.bigdata.spark.app.streaming

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object StreamingWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("streaming-wordcount")
    val sc = SparkContext.getOrCreate(conf)
    /**
      * batchDuration:
      * 给定产生批次的间隔时间,每隔给定的batchDuration就会产生一个批次,产生的批次会先放到一个队列中
      * 后台有一个线程会专门进行批次的执行(每次只执行一个批次)
      * 一般情况下batchDuration设置的时间要比一个批次的执行时间稍微大一点,以避免阻塞
      * 时间类型有:Seconds(),Minutes(),Milliseconds()
      */
    val ssc = new StreamingContext(sc, Seconds(10))
    val dstream = ssc.socketTextStream("bigdata-senior.ibeifeng.com", 9999)
    val resultDStream = dstream.flatMap(_.split(" ")).map(word => {
      (word, 1)
    }).reduceByKey(_ + _)
    resultDStream.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

3、Spark Streaming程序运行原理

(1)数据接收器(Receiver) Receiver接收数据输入,并将输入的数据形成一个block块存储到内存或磁盘中,并进行备份,设置block形成的间隔时间:

spark.streaming.blockInterval:200ms

一般情况建议将该参数设置为1-2s。 (2)batch批次的产生 每隔batchDuration指定的时间后,产生一个执行批次,这个批次包含了在这个批次时间间隔中接收到的所有数据形成的block块,一般要求batchDuration是blockInterval的整数倍,实质是将这些block块形成一个RDD。 (3)批次对应的RDD Job的执行 一个Job就是一个RDD的Action类型的API触发,一个DStream的执行其实就是RDD的执行,一个Job可以包含多个批次。 (4)DStream DStream底层是由一系列的RDD和时间组成的集合,每个RDD包含了对应批次的数据,DStream的API的调用实际上底层都是RDD的调用。 一个DStream依赖于其他的DStream,DStream存在批次的间隔时间,每隔time interval产生对应批次的RDD,每个DStream中有一个compute的函数供该批次执行。 (5)RDD的销毁 在下下一个批次执行的时候,当前批次的RDD会被销毁,条件是该RDD已经执行过,而且StreamingContext中没有对该RDD有任何依赖,其它DStream对当前RDD也没有任何依赖。 可以考虑通过StreamingContext的remember函数更改RDD的生命周期长度,要求参数比原始的batchDuration大:

ssc.remember(Seconds(60))

(6)Streaming的数据读取 1)Basic Sources 基于StreamingContext的API读取数据。 数据的接收方式只有一种:基于Receiver的数据接收方式,即push的数据接收方式。 2)Advanced Sources 基于外部API读取数据,比如Kafka/Flume等。 数据接收方式有两种(一般都有两种,或者说kafka和flume有两种): a)Use Receiver(使用数据接收器,push方式): 在正常的业务Job之外,启动一个数据接收Job,专门负责将数据保存到当前执行的内存或磁盘中,RDD中保存的其实是block的id。 b)Direct Approach(直接连接方式,poll方式): StreamingContext形成RDD的过程中,RDD中保存的是数据存储的位置信息以及相关参数,直到job被真正运行的时候,会直接从数据产生方根据保存的相关参数获取数据。

4、Spark Streaming和Kafka的集成

Spark Streaming可以和Flume、Kafka、Kinesis等进行集成。 Spark Streaming和Kafka的集成: (1)Use Receiver 基于Kafka的High Level Consumer API进行开发和数据读取,最终执行RDD的Task数量实质上和形成的block数量一致。

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)

kafkaParams:Kafka High Level Consumer API的相关配置参数,比如Group Id、Zookeeper的连接信息等等
topics:指定Spark Streaming(Kafka的消费者)获取哪些Topic的数据,以及获取这些数据的线程数量

(2)Direct Approach 基于Simple Consumer API进行开发和数据读取,最终执行RDD的Task数量实质上和Kafka对应分区数量一致。

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

kafkaParams:指定使用simple consumer api的时候需要使用到的相关参数,主要是metadata.broker.list和auto.offset.reset
topics:指定streaming消费的kafka的topic名称

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](ssc, kafkaParams, fromOffsets, messageHandler)

kafkaParams:指定使用simple consumer api的时候需要使用到的相关参数,主要是metadata.broker.list
fromOffsets:指定stremaing消费哪些topic的哪些partiton的偏移量是多少
messageHandler:对kafka的数据进行具体的操作,可以从中获取offset、partition、topic、key、message,需要给定一个匿名函数

(3)Spark Streaming和Kafka的优化方式 1)将spark.streaming.blockInterval参数进行调整(最小要求大于50ms),以适应RDD的Task数量 2)对于使用数据接收器的形式,可以考虑使用多个数据接收器,然后使用union API合并dstream 3)Direct形式下,将: a)spark.streaming.backpressure.enabled:false,改成true,表示开启输入数据量动态调整 b)spark.streaming.receiver.maxRate:none,指定每个receiver每一个执行批次中能够接收的数据量大小 c)spark.streaming.kafka.maxRatePerPartition:none, 指定direct模式下,每个partition(也就是每个RDD的Task)最多读取的数据量条数。 (4)Spark Streaming和Kafka集成的Consumer偏移量管理方式 为了当失败或异常产生时,修复bug后,进行恢复机制 1)Use Receiver 只能基于Kafka自带的offset管理机制来管理offset偏移量(间隔性的将offset上传到zk中)(存在着重复消费的问题)。 2)Direct Approach a)在自定义的messageHandler中将获取的偏移量保存到其它第三方的存储系统中,下次恢复的时候,再读取第三方的存储数据进行恢复操作(存在着数据不消费的问题)。 b)使用Spark Streaming的HA恢复机制,会将DStream的相关元数据保存到给定的checkpointdir文件夹中,一般存储位置为hdfs,在下次进行恢复的时候,会自动从checkpointdir文件夹中进行数据恢复操作,一般使用该方式管理direct模式下与kafka集成的offset偏移量。 (5)Spark Streaming HA功能 提供了一种元数据恢复的功能,方便解决一些应用宕机后的恢复问题。

5、Spark Streaming的常用API

(1)Transform API 直接操作DStream中的当前Job或当前批次对应的RDD,来替换DStream的操作。 (2)数据输出 1)直接调用DStream的相关API进行数据输出。

dstream.saveAs####

2)通过foreachRDD API将DStream当前批次的数据操作转换为RDD的数据操作,和transform API类似,区别在于foreachRDD API没有数据返回。 a)调用rdd的saveAsXXX相关API输出数据。 b)调用rdd的foreachPartition API自定义数据输出代码进行数据输出。 c)将rdd转换为dataframe进行dataframe数据输出。 (3)updateStateByKey updateStateByKey一般情况下和Spark Streaming的HA一起使用,应用于需要累加数据的场景。updateStateByKey必须给定checkpoint的文件夹路径。 (4)reduceByKeyAndWindow 窗口函数 对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。

val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int, b:Int) => (a+b), Seconds(30), Seconds(10))

应用场景:计算最近一段时间的数据。 窗口函数相关API必须给定checkpoint的文件夹路径。

  1. Spark Streaming用于实时统计,基本的API或者RDD相关的API就可以实现。
  2. updateStateByKey用于实时累加统计。
  3. reduceByKeyAndWindow窗口函数用于最近一段时间的统计。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 【NDN实验】Consumer-Producer API for Named Data Networking 学习笔记

    版权声明:本文为博主原创文章,转载请注明出处。 https://blog.csdn.net/gongxifacai_believe/artic...

    魏晓蕾
  • ResNet: Deep Residual Learning for Image Recognition (2015) 全文翻译

    Kaiming He,Xiangyu Zhang,Shaoqing Ren,Jian Sun (Microsoft Research {kahe, v-xia...

    魏晓蕾
  • 【ResNeXt】Aggregated Residual Transformations

    Saining Xie 1 Ross Girshick 2 Piotr Dollar 2 Zhuowen Tu 1 Kaiming He 2 1 UC San...

    魏晓蕾
  • Intel-analytics三大深度学习开源库: DL应用直接用于Spark或Hadoop集群

    【新智元导读】本文带来Github上账号为intel-analytics发布的三大深度学习库的介绍。 BigDL 什么是BigDL? BigDL是一个基于Apa...

    新智元
  • Mac iterm2 配色方案

    首先我们下载的 iTem2 这个软件,比 Mac 自带的终端更加强大。直接官网 http://iterm2.com/ 下载并安装即可。

    s_在路上
  • Pornhub 年终总结,揭秘了全人类的性生活

    就在 2019 年 12 月 11 日,全球最大的“学习”网站 Pornhub 发布来第七个年度报告,这份报告将会给大家解读,在即将过去的 2019 年中,全球...

    叫我龙总
  • 如何应对大数据分析工程师面试Spark考察,看这一篇就够了

    可以说Spark几乎是企业搭建大数据平台必备组件,作为数据分析工程师在工作中执行程序、调试程序、查询数据都会和Spark打交道,所以对Spark知识的考察也就顺...

    AI科技大本营
  • 员工Emp表的增删改查实现!本实验室目的是为了了解模板页的好处!

    细节;id 还是 i -----------------------;;半个笑死

    静心物语313
  • 重温数据结构系列随笔:数据结构的基本概念

    现在项目已经踏上正轨,有不少时间可以用来学习,昨晚发现柜子里那本大学时候啃过无数遍的(数据结构 C语言版),那真的无限感叹啊,初恋女友啊,大学回忆啊都涌上心头。...

    逸鹏
  • Arm面向物联网可信芯片-CortexM23 M33

    Nuvoton’s NuMicro M2351 Cortex M23 MCU 指纹应用设计 1,ARM将ARMV8-A上的TrustZone技术移植到ARMv8...

    安智客

扫码关注云+社区

领取腾讯云代金券