1,StructuredStreaming简介

一,概述

Structured Streaming是一个可扩展和容错的流处理引擎,并且是构建于sparksql引擎之上。你可以用处理静态数据的方式去处理你的流计算。随着流数据的不断流入,Sparksql引擎会增量的连续不断的处理并且更新结果。可以使用DataSet/DataFrame的API进行 streaming aggregations, event-time windows, stream-to-batch joins等等。计算的执行也是基于优化后的sparksql引擎。通过checkpointing and Write Ahead Logs该系统可以保证点对点,一次处理,容错担保。

可以把输入的数据流当成一张表。数据流新增的每一条数据就像添加到该表的新增行数据。

在输入数据流上执行的query操作会生成一个结果表。每个触发间隔,比如1s,新的行都会被追加到输入表,最终更新结果表。结果表无论何时得到更新,都将会将变化的结果行写入外部的sink。

二,例子和概念

1, Socket Source-> console sink

下载安装nc,请不要用yum直接安装。

wget http://vault.centos.org/6.6/os/x86_64/Packages/nc-1.84-22.el6.x86_64.rpm

rpm -iUv nc-1.84-22.el6.x86_64.rpm

启动nc,然后准备写入数据到Structured Streaming

nc -lk 9999

启动nc之后,开始启动spark-shell

Spark-shell –master local[*]

执行如下代码:

val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

val words = lines.as[String].flatMap(_.split(" "))

val wordCounts = words.withWatermark("timestamp", "30 seconds").groupBy("value").count()

val query = wordCounts.writeStream.outputMode("Update").format("console").start()

query.awaitTermination()

2,编程模型讲解

输入的第一行是生成了一个lines DataFrame,然后作为输入表。最终wordCounts DataFrame是结果表。基于lines DataFrame的查询跟静态的Dataframe查询时一样的。然而,当查询一旦启动,Spark 会不停的检查Socket链接是否有新的数据。如果有新的数据,Spark 将会在新数据上运行一个增量的查询,并且组合之前的counts结果,计算得到更新后的统计。

3, 重点介绍的两个概念:source和sink。

3.1 source

目前支持的source有三种:

File Sourcec:从给定的目录读取数据,目前支持的格式有text,csv,json,parquet。容错。

Kafka Source:从kafka拉取数据。仅兼容kafka 0.10.0或者更高版本。容错。

Socket Source(for testing):从一个连接中读取UTF8编码的文本数据。不容错。

3.2 output modes与查询类型

Append mode(default):仅仅从上次触发计算到当前新增的行会被输出到sink。仅仅支持行数据插入结果表后不进行更改的query操作。因此,这种方式能保证每行数据仅仅输出一次。例如,带有Select,where,map,flatmap,filter,join等的query操作支持append模式。

Complete mode:每次trigger都会将整个结果表输出到sink。这个是针对聚合操作的。

Updata mode:仅仅是自上次trigger之后结果表有变更的行会输出到sink。在以后的版本中会有更详细的信息。

不同类型的Streaming query支持不同的输出模式。

Query Type

支持的输出模式

注释

Queries with aggregation

Aggregation on event-time with watermark

Append, Update, Complete

Append mode和Update mode采用watermark来drop掉历史的聚合状态。Completemode不会删除历史聚合状态。

Other aggregations

Complete, Update

由于没有定义watermark,旧的聚合状态不会drop。Append mode不支持因为聚合操作是违反该模式的含义的。

Queries with mapGroupsWithState

Update

Queries with flatMapGroupsWithState

Append operation mode

Append

flatMapGroupsWithState之后运行Aggregations

Update operation mode

Update

flatMapGroupsWithState之后不允许Aggregations

Other queries

Append, Update

Complete mode不支持这种模式的原因是在结果表保留所有的非聚合的数据是不合适的。

3.3 sinks

FileSink:保存数据到指定的目录

noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start() 

Foreach sink:在输出的数据上做任何操作。

writeStream
  .foreach(...)
.start()

Console sink(for debugging):每次trigger都会讲结果输出到console或stdout。

aggDF
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()

memory sink

// Have all the aggregates in an in-memory table
aggDF
  .writeStream
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start()
spark.sql("select * from aggregates").show()

kafkasink

支持stream和batch数据写入kafka

val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

Sink支持的输出模式

Sink

Outputmode

Options

容错

注释

FileSink

Append

path:输出路径,必须指定

Yes

支持写入分区的tables。按照时间分区或许是有用的。

ForeachSink

Append,Update,Complete

None

依赖于于ForeachWriter的实现

细节请看官网

ConsoleSink

Append,Complete,Update

NumRows:每个trigger显示的行数。Truncate:假如太长是否删除,默认是true

No

MemorySink

Append,Complete

None

No.但是在Completemode 重新query就会导致重新创建整张表

后续sql使用的表明就是queryName

以上是全部概念。

三 注意事项

Structured Streaming不会管理整个输入表。它会从Streaming数据源中读取最近的可用数据,然后增量的处理它并更新结果,最后废弃源数据。它仅仅会保留很小更新结果必要的中间状态数据。

这种模型更很多其他的流处理引擎不一样。很多其他流处理系统需要用户自己保持聚合状态,所以还需要考虑容错和数据一致性(at-least-once, or at-most-once, or exactly-once)。在这种模型里面,在有新数据的时候spark 负责更新结果表。

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2018-03-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏技术博文

Memcache

Memcached概念:     Memcached是一个免费开源的,高性能的,具有分布式对象的缓存系统,它可以用来保存一些经常存取的对象或数据,保存的数据像一...

44140
来自专栏XAI

SpringMVC+MongoDB+Maven整合(微信回调Oauth授权)

个人小程序。里面是基于百度大脑 腾讯优图做的人脸检测。是关于人工智能的哦。 2017年第一篇自己在工作中的总结文档。土豪可以打赏哦。 https://git.o...

92370
来自专栏老码农专栏

原 荐 RESTFul 服务测试自动化的艺术

16530
来自专栏猿天地

Spring Cloud中如何保证各个微服务之间调用的安全性(下篇)

今天我们继续接着上篇文章来聊一聊如何能够在调用方实现token的自动设置以及刷新。

16020
来自专栏JAVA高级架构

2017 年你不能错过的 Java 类库

各位读者好, 这篇文章是在我看过 Andres Almiray 的一篇介绍文后,整理出来的。 因为内容非常好,我便将它整理成参考列表分享给大家, 同时附上各个库...

29180
来自专栏LEo的网络日志

python i18n实现

40270
来自专栏java、Spring、技术分享

记一次unable to create new native thread错误处理过程

unable to create new native thread,看到这里,首先想到的是让运维搞一份线上的线程堆栈(可能通过jstack命令搞定的)。...

1.2K10
来自专栏携程技术中心

干货 | JAVA反序列化安全实例解析

作者简介 迟长峰,携程技术中心信息安全部应用安全工程师。 什么是序列化 序列化 (Serialization)是指将对象的状态信息转换为可以存储或传输的形式的过...

362100
来自专栏chenssy

【追光者系列】HikariCP源码分析之故障检测那些思考 fail fast & allowPoolSuspension

由于时间原因,本文主要内容参考了 https://segmentfault.com/a/1190000013136251,并结合一些思考做了增注。

18420
来自专栏xingoo, 一个梦想做发明家的程序员

如何在Java应用中提交Spark任务?

最近看到有几个Github友关注了Streaming的监控工程——Teddy,所以思来想去还是优化下代码,不能让别人看笑话啊。于是就想改一下之前觉得最丑陋的一...

76360

扫码关注云+社区

领取腾讯云代金券