上一篇文章里,总结了Spark 的两个常用的库(Spark SQL和Spark Streaming),可以点击这里进行回顾。其中,SparkSQL提供了两个API:DataFrame API和DataSet API,我们对比了它们和RDD:
备注:图来自于极客时间
简单总结一下,DataFrame/DataSet的优点在于:
基于以上的想法,Spark在2016年推出了结构化流数据处理的模块 Structured Streaming。它是基于Spark SQL引擎实现的,依靠Structured Streaming,在开发者看来流数据可以像静态数据一样处理,因为引擎会自动更新计算结果。
流处理相比于批处理来说,难点在于如何对不断更新的无边界数据进行建模,先前Spark Streaming就是把流数据按照一定的时间间隔分割成很多个小的数据块进行批处理,Structured Streaming也是类似,在这里,Structured Streaming有3种输出模式:
Structured Streaming模型在处理数据时按事件时间(Event Time)来操作的,比如说一个订单在10:59被创建,11:01才被处理,这里,10:59代表事件时间,11:01代表处理时间(Processing Time)。
这里简单地说些常见的操作:
SparkSession.readStream()返回的 DataStreamReader可以用于创建 流DataFrame,支持多种类型的数据流作为输入,如File、Kafka、socket等等。
socketDataFrame = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
流DataFrame的查询操作和静态的一样的,请看下面的例子。
有一个不断更新的学生数据流,每个数据代表一名学生,有属性:Name、Age、Height、Grade。
df = … // 这个 DataFrame 代表学校学生的数据流,schema 是{name: string, age: number, height: number, grade: string}
df.select("name").where("age > 10") // 返回年龄大于 10 岁的学生名字列表
df.groupBy("grade").count() // 返回每个年级学生的人数
df.sort_values([‘age’], ascending=False).head(100) // 返回 100 个年龄最大的学生
假设一个数据流中,每一个词语有其产生的时间戳,如何每隔10秒输出过去一分钟内产生的前10热点词呢?
words = ... # 这个 DataFrame 代表词语的数据流,schema 是 { timestamp: Timestamp, word: String}
windowedCounts = words.groupBy(
window(words.timestamp, "1 minute", "10 seconds"),
words.word
).count()
.sort(desc("count"))
.limit(10)
基于词语的生成时间,创建一个时间窗口长度为1分钟,滑动间隔为10秒的window,然后把输入的词语根据window和词语本身聚合,统计每个window内每个词语的数量,选取Top10返回即可。
再举个例子,如果数据产生了延迟,一般也会以事件时间为准:
如应用程序在12:11可以接受到在12:04生成的单词,应用程序应使用12:04(事件时间)而不是12:11(处理时间)来更新窗口的统计数据。
当然数据不可能一直缓存在内存中,上一次我们学习到水印这个说法,就是系统允许一段时间内保存历史的聚合结果,当超出这个时间范围则内清除。
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word) \
.count()
在上面的例子中,我们定义了10分钟的水印,引擎的最大事件时间10分钟。
当我们完成了各项处理,是时候把结果输出数给别人,这里支持多种方式,如硬盘文件、Kafka、console和内存等。
query = wordCounts
.writeStream
.outputMode("complete")
.format("csv")
.option("path", "path/to/destination/dir")
.start()
query.awaitTermination()
上面我们使用完全模式,把结果写入CSV文件。