Spark源码系列之spark2.2的StructuredStreaming使用及源码介绍

一,概述

Structured Streaming是一个可扩展和容错的流处理引擎,并且是构建于sparksql引擎之上。你可以用处理静态数据的方式去处理你的流计算。随着流数据的不断流入,Sparksql引擎会增量的连续不断的处理并且更新结果。可以使用DataSet/DataFrame的API进行 streaming aggregations, event-time windows, stream-to-batch joins等等。计算的执行也是基于优化后的sparksql引擎。通过checkpointing and Write Ahead Logs该系统可以保证点对点,一次处理,容错担保。

可以把输入的数据流当成一张表。数据流新增的每一条数据就像添加到该表的新增行数据。

二,例子和概念

1,需要导入的依赖为

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

2,以kafka为source数据源,console为sink输出的例子为

val spark = SparkSession
  .builder()
  .appName("Spark structured streaming Kafka example")
  .master("local")
  .getOrCreate()
val inputstream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "blog")
  .load()
val keyValueString = inputstream.selectExpr("CAST(key AS STRING)", "CAST( value AS STRING)").as[(String, String)]
val wordCounts = keyValueString.map(_._2.split(" ")).groupBy("value").count()
val query = wordCounts.writeStream.trigger(Trigger.ProcessingTime(1000000))
  .outputMode("complete")
  .format("console")
  .start()
query.awaitTermination()

3,重点介绍的两个概念:source和sink。

A),source

目前支持的source有三种:

File Sourcec:从给定的目录读取数据,目前支持的格式有text,csv,json,parquet.容错。

Kafka Source:从kafka拉取数据。仅兼容kafka 0.10.0或者更高版本。容错。

Socket Source(for testing):从一个连接中读取UTF8编码的文本数据。不容错。

B),output modes

1),Append mode(default):仅仅从上次触发计算到当前新增的行会被输出到sink。仅仅支持行数据添加入结果表后不进行梗概的query操作。因此,这种方式保证没个流操作仅仅输出一次。例如,带有Select,where,map,flatmap,filter,join等的query操作支持append模式。

2),Complete mode:每次trigger都会讲整个结果表输出到sink。这个是针对聚合操作。

3),Updata mode:仅仅是自上次trigger之后结果表有变更的行会输出到sink。在以后的版本中会有更详细的信息。

不同类型的Streaming query支持不同的输出模式。

Query Type

支持的输出模式

注释

Queries with aggregation

Aggregation on event-time with watermark

Append, Update, Complete

Append mode和Update mode采用高水位watermark去drop掉历史的聚合状态。Completemode不会删除历史聚合状态,由该模式的语义决定。

Other aggregations

Complete, Update

由于没有定义高水位watermark,旧的聚合状态不会drop。Append mode不支持因为聚合操作是违反该模式的语义的。

Queries with mapGroupsWithState

Update

Queries with flatMapGroupsWithState

Append operation mode

Append

Aggregations are allowed after flatMapGroupsWithState.

Update operation mode

Update

Aggregations not allowed after flatMapGroupsWithState.

Other queries

Append, Update

Complete mode不支持这种模式的原因是在结果表保留所有的非聚合的数据是不合适的。

C),sinks

1),FileSink:保存数据到指定的目录

noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()

2),Foreach sink:在输出的数据上做任何操作。

writeStream
  .foreach(...)
.start()

3),Console sink(for debugging):每次trigger都会讲结果输出到console或stdout。

aggDF
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()

4),memory sink

// Have all the aggregates in an in-memory table
aggDF
  .writeStream
  .queryName("aggregates")    // this query name will be the table name
 .outputMode("complete")
  .format("memory")
  .start()
spark.sql("select * from aggregates").show()

5),kafkasink

支持stream和batch数据写入kafka

val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

Sink支持的输出模式

Sink

Outputmode

Options

容错

注释

FileSink

Append

path:输出路径,必须指定

Yes

支持写入分区的tables。按照时间分区或许有用。

ForeachSink

Append,Update,Complete

None

取决于ForeachWriter的实现

细节请看官网

ConsoleSink

Append,Complete,Update

NumRows:每个trigger显示的行数。Truncate:假如太长是否删除,默认是true

No

MemorySink

Append,Complete

None

No.但是在Completemode 重新query就会导致重新创建整张表

Table name is the query name.

以上是全部概念。关于kafka的batch写入,会在后面详细介绍。

三,源码相关介绍

本次源码不会牵涉到具体的数据源

1,重要的类

A),DataSource

负责构造可插入数据源的数据源。除了充当描述数据源的规范参数集之外,这个类也用于解析一个可以在查询计划中使用的具体实现的描述(或批处理或流)或使用外部库写出数据。

B),StreamingQueryManager

管理所有的StreamingQuery行为的类。

C),StreamExecution

使用单独一个线程管理Streaming Spark Sql query的执行。跟标准的查询不一样之处,一个Streaming query,在query plan中存在的source每次有新的数据产生都会重复执行。每当新的输入到达,就会生成一个QueryExecution,然后结果会事务的方式提交到给定的sink。

D),ProcessingTimeExecutor

继承自TriggerExecutor,每个 ‘intervalMs’个毫秒运行一个batch的trigger Executor。

E),DataStreamWriter

将一个Streaming Dataset写入外部存储系统的接口,使用Dataset.writeStream。

F),DataStreamReader

从外部存储系统加载一个Streaming dataset。使用SparkSession.readStream。

2,重要的源码

采用上面的样例源码。

A),构建Streaming Dataset

Load方法中

val dataSource =
 DataSource(
    sparkSession,
 userSpecifiedSchema = userSpecifiedSchema,
 className = source,
 options = extraOptions.toMap)
Dataset.ofRows(sparkSession, StreamingRelation(dataSource))

然后StreamingRelation(dataSource)

object StreamingRelation {
 def apply(dataSource: DataSource): StreamingRelation = {
    StreamingRelation(
      dataSource, dataSource.sourceInfo.name, dataSource.sourceInfo.schema.toAttributes)
  }
}

构建Dataset这段源码其实写的还是很简单的,重点阅读的两个点是DataSource的两个lazy的变量。

lazy val providingClass: Class[_] = DataSource.lookupDataSource(className) lazy val sourceInfo: SourceInfo = sourceSchema()

ProvidingClass主要是负责构建我们具体的source,比如kafkaSource。

SourceInfo主要是表结构。

Provider class的实现

DataSource.lookupDataSource(className)

重点代码是加载所有DataSourceRegister实现,然后获取shortName,比如kafka相关的实现是KafkaSourceProvider,shortName是kafka,正好跟我们上文format指定的格式kafka匹配,此时就会得到providingClass的实现就是kafkaSourceProvider。

//加载所有DataSourceRegister的实现
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
try {
 serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList

表结构的实现

主要是由上步构建的providingClass,得到相应的schema。Kafka的schema信息是很固定的。

providingClass.newInstance() match {
 case s: StreamSourceProvider =>
 val (name, schema) = s.sourceSchema(
      sparkSession.sqlContext, userSpecifiedSchema, className, caseInsensitiveOptions)
 SourceInfo(name, schema, Nil)

B),输出到sink

入口是start方法,此处我们指定的sink类型是console

val (useTempCheckpointLocation, recoverFromCheckpointLocation) =
 if (source == "console") {
    (true, false)
  } else {
    (false, true)
  }
val dataSource =
 DataSource(
 df.sparkSession,
 className = source,
 options = extraOptions.toMap,
 partitionColumns = normalizedParCols.getOrElse(Nil))
df.sparkSession.sessionState.streamingQueryManager.startQuery(
 extraOptions.get("queryName"),
 extraOptions.get("checkpointLocation"),
 df,
 dataSource.createSink(outputMode),
 outputMode,
 useTempCheckpointLocation = useTempCheckpointLocation,
 recoverFromCheckpointLocation = recoverFromCheckpointLocation,
 trigger = trigger)

其中,参数中有一个dataSource.createSink(outputMode),就会根据我们构建DataSource的时候构建的providingClass(此处ConsoleSinkProvider),实例之后得到的具体的sink(ConsoleSink),

providingClass.newInstance() match {
 case s: StreamSinkProvider =>
    s.createSink(sparkSession.sqlContext, caseInsensitiveOptions, partitionColumns, outputMode)

在startQuery里,首先构建了Query,然后调用其start方法进行了执行。

val query = createQuery(
  userSpecifiedName,
 userSpecifiedCheckpointLocation,
 df,
 sink,
 outputMode,
 useTempCheckpointLocation,
 recoverFromCheckpointLocation,
 trigger,
 triggerClock)

CreateQuery方法中构建StreamingQueryWrapper和StreamExecution

new StreamingQueryWrapper(new StreamExecution(
  sparkSession,
 userSpecifiedName.orNull,
 checkpointLocation,
 analyzedPlan,
 sink,
 trigger,
 triggerClock,
 outputMode,
 deleteCheckpointOnStop))

调用StreamExecution的start方法,启动执行。

query.streamingQuery.start()

该方法中主要是启动了一个线程进行microBatch的处理。

microBatchThread.setDaemon(true) microBatchThread.start()

val microBatchThread =
 new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
 override def run(): Unit = {
 // To fix call site like "run at <unknown>:0", we bridge the call site from the caller
      // thread to this micro batch thread
 sparkSession.sparkContext.setCallSite(callSite)
      runBatches()
    }
  }

重点是runBatches()方法。

这里要提到两个一个重要的变量,triggerExecutor,这个会根据我们在样例中trigger(Trigger.ProcessingTime(1000000))设置的时间,类型觉得是构建处理一次的OneTimeExecutor,还是ProcessingTimeExecutor。

该对象在构建StreamExecution时构建和初始化

private val triggerExecutor = trigger match {
 case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
 case OneTimeTrigger => OneTimeExecutor()
 case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
}

微批处理的执行

runBatches方法中,在数据可用的情况下会调用runBatch

if (dataAvailable) {
 currentStatus = currentStatus.copy(isDataAvailable = true)
  updateStatusMessage("Processing new data")
  runBatch(sparkSessionToRunBatches)
}

runBatch,主要分三步走:获取数据,构建执行计划,输出。

A),请求未处理的数据

// Request unprocessed data from all sources.
newData = reportTimeTaken("getBatch") {
 availableOffsets.flatMap {
 case (source, available)
 if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
 val current = committedOffsets.get(source)
 val batch = source.getBatch(current, available)
      logDebug(s"Retrieving data from $source: $current -> $available")
 Some(source -> batch)
 case _ => None
  }
}

B),处理构建具体的执行计划

// Replace sources in the logical plan with data that has arrived since the last batch.
val withNewSources = logicalPlan transform {
 case StreamingExecutionRelation(source, output) =>
 newData.get(source).map { data =>
 val newPlan = data.logicalPlan
 assert(output.size == newPlan.output.size,
 s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
 s"${Utils.truncatedString(newPlan.output, ",")}")
      replacements ++= output.zip(newPlan.output)
      newPlan
    }.getOrElse {
 LocalRelation(output)
    }
}

// Rewire the plan to use the new attributes that were returned by the source.
val replacementMap = AttributeMap(replacements)
val triggerLogicalPlan = withNewSources transformAllExpressions {
 case a: Attribute if replacementMap.contains(a) => replacementMap(a)
 case ct: CurrentTimestamp =>
 CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
 ct.dataType)
 case cd: CurrentDate =>
 CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
 cd.dataType, cd.timeZoneId)
}

reportTimeTaken("queryPlanning") {
 lastExecution = new IncrementalExecution(
    sparkSessionToRunBatch,
 triggerLogicalPlan,
 outputMode,
 checkpointFile("state"),
 currentBatchId,
 offsetSeqMetadata)
 lastExecution.executedPlan // Force the lazy generation of execution plan
}

3),调用sink的addBatch输出

val nextBatch =
 new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema))

reportTimeTaken("addBatch") {
  sink.addBatch(currentBatchId, nextBatch)
}

对于本文的consoleSink具体实现如下

data.sparkSession.createDataFrame(
 data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
  .show(numRowsToShow, isTruncated)

四,总结

Structured Streaming 提供高效,可升级,容错,点对点 仅一次处理的流处理,用户根本不用关心流。

分三个,概念大家会理解的更清晰。

1,DataSource

2,Sink

3,DataSet/DataFrame的执行计划。

把流当成一张表,新增数据就是新增表的行。这么理解是不是更彻底呢?

StructuredStreaming与kafka的结合是看点。后面我会出一些文章详细道来。也会对kafka 0.10.0的Consumer和KafkaProducer进行源码解密,因为你会发现,关于kafka 0.10.0与spark的结合已经变了天了。

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2017-07-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java开发者杂谈

Netty(1):第一个netty程序

为什么选择Netty   netty是业界最流行的NIO框架之一,它的健壮型,功能,性能,可定制性和可扩展性都是首屈一指的,Hadoop的RPC框架Avro就使...

4067
来自专栏Spark学习技巧

Spark与mongodb整合完整版本

一,准备阶段 MongoDB Connector for spark是的spark操作mongodb数据很简单,这样方便使用spark去分析mongodb数据,...

2.5K10
来自专栏Pythonista

牛掰的python与unix

  加载subprocess模块仅仅是将可以使用的代码文件加载进来。也可以创建自己的模块或文件,拱以后重复使用,这与加载subprocess模块的方法相同。IP...

1122
来自专栏Hadoop实操

如何通过Livy的RESTful API接口向Kerberos环境的CDH集群提交作业

在前面的文章Fayson介绍了《Livy,基于Apache Spark的开源REST服务,加入Cloudera Labs》、《如何编译Livy并在非Kerber...

1.6K11
来自专栏Android 研究

Android系统启动——5 zyogte进程(Java篇)

上一篇文章,我们知道在AndroidRuntime.cpp的start()函数里面是调用的Zygoteinit类的main()函数,那我们就继续研究

2392
来自专栏潇涧技术专栏

Android Heroes Reading Notes 4

《Android群英传》读书笔记 (4) 第八章 Activity和Activity调用栈分析 + 第九章 系统信息与安全机制 + 第十章 性能优化

741
来自专栏JadePeng的技术博客

Docker+Jenkins持续集成环境(5): android构建与apk发布

项目组除了常规的java项目,还有不少android项目,如何使用jenkins来实现自动构建呢?本文会介绍安卓项目通过jenkins构建的方法,并设计开发一个...

5048
来自专栏浪淘沙

Spark 实现两表查询(SparkCore和SparkSql)

1.将两张表的数据提取出来,转换成DataFrame,创建两个view。实现join查询

5323
来自专栏haifeiWu与他朋友们的专栏

I-team 博客全文检索 Elasticsearch 实战

一直觉得博客缺点东西,最近还是发现了,当博客慢慢多起来的时候想要找一篇之前写的博客很是麻烦,于是作为后端开发的楼主觉得自己动手丰衣足食,也就有了这次博客全文检索...

3001
来自专栏JackieZheng

探秘Tomcat——启动篇

tomcat作为一款web服务器本身很复杂,代码量也很大,但是模块化很强,最核心的模块还是连接器Connector和容器Container。具体请看下图: ? ...

4857

扫码关注云+社区

领取腾讯云代金券