1,StructuredStreaming简介

一,概述

Structured Streaming是一个可扩展和容错的流处理引擎,并且是构建于sparksql引擎之上。你可以用处理静态数据的方式去处理你的流计算。随着流数据的不断流入,Sparksql引擎会增量的连续不断的处理并且更新结果。可以使用DataSet/DataFrame的API进行 streaming aggregations, event-time windows, stream-to-batch joins等等。计算的执行也是基于优化后的sparksql引擎。通过checkpointing and Write Ahead Logs该系统可以保证点对点,一次处理,容错担保。

可以把输入的数据流当成一张表。数据流新增的每一条数据就像添加到该表的新增行数据。

在输入数据流上执行的query操作会生成一个结果表。每个触发间隔,比如1s,新的行都会被追加到输入表,最终更新结果表。结果表无论何时得到更新,都将会将变化的结果行写入外部的sink。

二,例子和概念

1,SocketSource->console sink

下载安装nc,请不要用yum直接安装。

wget http://vault.centos.org/6.6/os/x86_64/Packages/nc-1.84-22.el6.x86_64.rpm

rpm -iUv nc-1.84-22.el6.x86_64.rpm

启动nc,然后准备写入数据到Structured Streaming

nc -lk 9999

启动nc之后,开始启动spark-shell

Spark-shell –master local[*]

执行如下代码:

vallines= spark.readStream.format("socket").option("host","localhost").option("port",9999).load()

valwords= lines.as[String].flatMap(_.split(" "))

valwordCounts = words.withWatermark("timestamp","30 seconds").groupBy("value").count()

valquery = wordCounts.writeStream.outputMode("Update").format("console").start()

query.awaitTermination()

2,编程模型讲解

输入的第一行是生成了一个linesDataFrame,然后作为输入表。最终wordCountsDataFrame是结果表。基于linesDataFrame的查询跟静态的Dataframe查询时一样的。然而,当查询一旦启动,Spark会不停的检查Socket链接是否有新的数据。如果有新的数据,Spark将会在新数据上运行一个增量的查询,并且组合之前的counts结果,计算得到更新后的统计。

3,重点介绍的两个概念:source和sink。

3.1source

目前支持的source有三种:

File Sourcec:从给定的目录读取数据,目前支持的格式有text,csv,json,parquet。容错。

Kafka Source:从kafka拉取数据。仅兼容kafka 0.10.0或者更高版本。容错。

Socket Source(for testing):从一个连接中读取UTF8编码的文本数据。不容错。

3.2output modes与查询类型

Append mode(default):仅仅从上次触发计算到当前新增的行会被输出到sink。仅仅支持行数据插入结果表后不进行更改的query操作。因此,这种方式能保证每行数据仅仅输出一次。例如,带有Select,where,map,flatmap,filter,join等的query操作支持append模式。

Complete mode:每次trigger都会将整个结果表输出到sink。这个是针对聚合操作的。

Updata mode:仅仅是自上次trigger之后结果表有变更的行会输出到sink。在以后的版本中会有更详细的信息。

不同类型的Streaming query支持不同的输出模式。

3.3sinks

FileSink:保存数据到指定的目录

noAggDF

.writeStream

.format("parquet")

.option("checkpointLocation", "path/to/checkpoint/dir")

.option("path", "path/to/destination/dir")

.start()

Foreach sink:在输出的数据上做任何操作。

writeStream

.foreach(...)

.start()

Console sink(for debugging):每次trigger都会讲结果输出到console或stdout。

aggDF

.writeStream

.outputMode("complete")

.format("console")

.start()

memory sink

// Have all the aggregates in an in-memory table

aggDF

.writeStream

.queryName("aggregates") // this query name will be the table name

.outputMode("complete")

.format("memory")

.start()

spark.sql("select * from aggregates").show()

kafkasink

支持stream和batch数据写入kafka

Sink支持的输出模式

以上是全部概念。

三 注意事项

StructuredStreaming不会管理整个输入表。它会从Streaming数据源中读取最近的可用数据,然后增量的处理它并更新结果,最后废弃源数据。它仅仅会保留很小更新结果必要的中间状态数据。

这种模型更很多其他的流处理引擎不一样。很多其他流处理系统需要用户自己保持聚合状态,所以还需要考虑容错和数据一致性(at-least-once, or at-most-once, or exactly-once)。在这种模型里面,在有新数据的时候spark负责更新结果表。

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180316G00DCV00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

同媒体快讯

扫码关注云+社区

领取腾讯云代金券