(*)Spark Streaming是核心Spark API的扩展,可实现可扩展、高吞吐量、可容错的实时数据流处理。数据可以从诸如Kafka,Flume,Kinesis或TCP套接字等众多来源获取,并且可以使用由高级函数(如map,reduce,join和window)开发的复杂算法进行流数据处理。最后,处理后的数据可以被推送到文件系统,数据库和实时仪表板。而且,您还可以在数据流上应用Spark提供的机器学习和图处理算法。
(*)问题:Spark Streaming是如何处理连续的数据 Spark Streaming将连续的数据流抽象为discretizedstream或DStream。在内部,DStream 由一个RDD序列表示。
一定注意一个问题:必须保证虚拟机的CPU的核数至少为2 原因:(1)一个接收数据 (2)一个处理数据 中文版官网
http://spark.apachecn.org/
当在本地运行一个 Spark Streaming 程序的时候,不要使用 “local” 或者 “local[1]” 作为 master 的 URL. 这两种方法中的任何一个都意味着只有一个线程将用于运行本地任务. 如果你正在使用一个基于接收器(receiver)的输入离散流(input DStream)(例如, sockets ,Kafka ,Flume 等),则该单独的线程将用于运行接收器(receiver),而没有留下任何的线程用于处理接收到的数据. 因此,在本地运行时,总是用 “local[n]” 作为 master URL ,其中的 n > 运行接收器的数量(查看 Spark 属性 来了解怎样去设置 master 的信息). 将逻辑扩展到集群上去运行,分配给 Spark Streaming 应用程序的内核(core)的内核数必须大于接收器(receiver)的数量。否则系统将接收数据,但是无法处理它.
步骤:(1)启动netcat服务器
nc -lk 1234
(2)启动SparkStreaming的客户端
bin/run-example streaming.NetworkWordCount bigdata01 1234
安装nc步骤: (1)先将已安装的nc删除:
yum erase nc
(2)下载较低版本的nc的.rpm文件
wget http://vault.centos.org/6.6/os/x86_64/Packages/nc-1.84-22.el6.x86_64.rpm
(3)安装.rpm文件
rpm -iUv nc-1.84-22.el6.x86_64.rpm
执行以上步骤命令后检查nc是否安装好,执行 nc -lk 1234
bin/run-example streaming.NetworkWordCount localhost 1234
4:开发一个自己的NetworkWordCount程序 注意:相对于Storm来说,Spark Streaming不能用于实时性要求非常高的场景
(*)在Spark中,Spark Core -> SparkContext ->抽象RDD Spark Sql -> SqlContent ->抽象DataFrame Spark Streaming -> StreamingContext ->抽象DStream (*)方便管理和操作的统一对象(Spark2.0)SparkSession (*)StreamingContent创建方式有两种: (1)通过SparkConf创建
val sparkConf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(3))
(2)通过现有的SparkContext对象sc来创建
val sc = new SparkContext(....)
val ssc = new StreamingContext(sc, Seconds(3))
(*)本质:将连续的数据变成不 连续的RDD-》DStream
(*)tranform(func) 通过RDD-to-RDD函数作用于源DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD 相当于map操作
举例:在NetworkWordCount中,也可以使用transform来生成元组对 (*)UpdateStateByKey(func) -》在原来的状态上进行更新,需要设置检查点 操作允许不断用新信息更新它的同时保持任意状态。 定义状态-状态可以是任何的数据类型 定义状态更新函数-怎样利用更新前的状态和从输入流里面获取的新值更新状态 重写NetworkWordCount程序,累计每个单词出现的频率(注意:累计)
定义窗口:(1)窗口的长度 (2)滑动举例 举例:NetWorkwordCount,每隔8秒,把过去30秒产生的字符串进行单词计数 (1)窗口的长度 30秒 (2)滑动间隔(每次滑动的时间长度) 原因是:滑动的距离,必须是采样时间的整数倍
(*)Socket接收 //创建一个离散流,DStream代表输入的数据流
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata01", 5678)
(*)文件流 类似于——>Flume
package demo
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FileStreaming {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("FileStreaming")
val ssc = new StreamingContext(conf,Seconds(2))
//从本地目录中读取数据:如果有新文件产生,就会读取进来
val lines = ssc.textFileStream("d:\\dowload\\spark123")
//打印结果
lines.print()
ssc.start()
ssc.awaitTermination()
}
}
(*)Mysql 举例:WordCount (*)Redis (*)HDFS (*)HBase
举例:WordCount
//使用Spark SQL来查询Spark Streaming处理的数据
words.foreachRDD { rdd =>
//使用单列模式,创建SparkSession对象
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// 将RDD[String]转换为DataFrame
val wordsDataFrame = rdd.toDF("word")
// 创建临时视图
wordsDataFrame.createOrReplaceTempView("words")
// 执行SQL
val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
8:缓存和持久化
与RDD类似,DStreams还允许开发人员将流数据保留在内存中。也就是说,在DStream上调用persist() 方法会自动将该DStream的每个RDD保留在内存中 9:检查点
流数据处理程序通常都是全天候运行,因此必须对应用中逻辑无关的故障(例如,系统故障,JVM崩溃等)具有弹性。为了实现这一特性,Spark Streaming需要checkpoint足够的信息到容错存储系统,以便可以从故障中恢复。
在Spark中有几个优化可以减少批处理的时间:
通过网络(如kafka,flume,socket等)接收数据需要这些数据反序列化并被保存到Spark中。如果数据接收成为系统的瓶颈,就要考虑并行地接收数据。注意,每个输入DStream创建一个receiver(运行在worker机器上)接收单个数据流。创建多个输入DStream并配置它们可以从源中接收不同分区的数据流,从而实现多数据流接收。例如,接收两个topic数据的单个输入DStream可以被切分为两个kafka输入流,每个接收一个topic。这将在两个worker上运行两个receiver,因此允许数据并行接收,提高整体的吞吐量。多个DStream可以被合并生成单个DStream,这样运用在单个输入DStream的transformation操作可以运用在合并的DStream上。
如果运行在计算stage上的并发任务数不足够大,就不会充分利用集群的资源。默认的并发任务数通过配置属性来确定spark.default.parallelism。
可以通过改变序列化格式来减少数据序列化的开销。在流式传输的情况下,有两种类型的数据会被序列化:
在上述两种情况下,使用Kryo序列化格式可以减少CPU和内存开销。
为了Spark Streaming应用程序能够在集群中稳定运行,系统应该能够以足够的速度处理接收的数据(即处理速度应该大于或等于接收数据的速度)。这可以通过流的网络UI观察得到。批处理时间应该小于批间隔时间。
根据流计算的性质,批间隔时间可能显著的影响数据处理速率,这个速率可以通过应用程序维持。可以考虑WordCountNetwork这个例子,对于一个特定的数据处理速率,系统可能可以每2秒打印一次单词计数(批间隔时间为2秒),但无法每500毫秒打印一次单词计数。所以,为了在生产环境中维持期望的数据处理速率,就应该设置合适的批间隔时间(即批数据的容量)。
找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。
在这一节,我们重点介绍几个强烈推荐的自定义选项,它们可以减少Spark Streaming应用程序垃圾回收的相关暂停,获得更稳定的批处理时间。