
Structured Streaming将实时数据视为一张正在不断添加数据的表。
可以把流计算等同于在一个静态表上的批处理查询,进行增量运算。 在无界表上对输入的查询将生成结果表,系统每隔一定的周期会触发对无界表的计算并且更新结果。
类别 | Spark | Structured |
|---|---|---|
数据源 | DStream,本质上是RDD | DF数据框 |
处理数据 | 只能处理静态数据 | 能够处理数据流 |
实时性 | 秒级响应 | 毫秒级响应 |
# StructuredNetWordCount.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.functions import explode
# 创建SparkSession对象
if __name__ == "__main__":
spark = SparkSession.builder.appName("StructuredNetworkCount").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
# 创建输入数据源
lines = spark.readStream.formaat("socket").option("host", "localhost").option("port", 9999).load()
# 定义流计算过程
words = lines.select(explode(split(lines.value, " ")).alias("word"))
wordsCounts = words.groupBy("word").count()
# 启动流计算并且输出结果
query = wordCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="8 seconds")
.start() # complete 表示输出模式
query.awaitTermination()# 启动HDFS
cd /usr/local/hadoop
sbin/start-dfs.sh
# 新建数据源终端
nc -lk 9999 # 启动服务端;需要输入语句
# 新建流计算终端:客户端
cd /usr/local/spark/mycode/structuredstreaming/
/usr/local/spark/bin/spark-submit StructuredNetWordCount.pyappendcompleteupdate