Structured Streaming实现超低延迟

浪院长,最近忙死了,写文章的时间都没了。但是,都说时间就像海绵里的水,挤挤就有了。所以,今晚十点半开始整理这篇Structured streaming 相关的文章。

最近,忙于开发完善flink平台,并且使用我们的平台去支持一些复杂的业务,比如用户画像处理等。遇见了很多bug和性能点,后面陆续出文章给大家解析。

书归正传,大家都知道spark streaming是微批批处理,而Structured streaming在2.3以前也是批处理,在2.3引入了连续处理的概念,延迟大幅度降低值~1ms,但是还有诸多限制,这点比flink差了许多。

至于低延迟的测试,建议本文使用本文代码去测试,kafka source->kafka sink,这样便于观察延迟。

连续处理是Spark 2.3中引入的一种新的实验版本流执行模式,可实现极低(~1 ms)端到端延迟,并且具有至少一次处理容错保证。 structured streaming的连续处理模式与微批处理模式进行比较,微批处理引擎可以实现一次性保证,但微批处理最好仅可实现约100ms的延迟。 对于某些类型的查询(在下面讨论),可以选择在不修改应用代码的情况下运行该模式(即,不更改DataFrame / Dataset操作)。

要在连续处理模式下运行支持的查询,您只需指定一个连续触发器,并将所需的checkpoint间隔作为参数。 例如浪尖的demo如下:

object ContinuousProcessing {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("yarn-client")
      .set("yarn.resourcemanager.hostname", "mt-mdh.local")
      .set("spark.executor.instances","2")
      .set("spark.default.parallelism","4")
      .set("spark.sql.shuffle.partitions","4")
      .setJars(List("/Users/meitu/Desktop/sparkjar/bigdata.jar"
        ,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
        ,"/opt/jars/kafka-clients-0.10.2.2.jar"
        ,"/opt/jars/kafka_2.11-0.10.2.2.jar"
        ,"/opt/jars/spark-sql-kafka-0-10_2.11-2.0.2.jar"))


    val spark = SparkSession
      .builder
      .appName("StructuredKafkaWordCount")
      .config(sparkConf)
      .getOrCreate()

    spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "mt-mdh.local:9093")
      .option("subscribe", "StructuredSource")
      .load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "mt-mdh.local:9093")
      .option("topic", "StructuredSink")
      .option("checkpointLocation","/sql/checkpoint")
      .trigger(Trigger.Continuous("1 second"))  // only change in query
      .start()
      .awaitTermination()
  }

}

checkpoint 间隔为1秒意味着连续处理引擎将每秒记录查询的进度。 生成的checkpoint采用与微批处理引擎兼容的格式,因此可以使用任何触发器重新启动任何查询。 例如,假如查询支持微批处理和连续处理,那么实际上也可以用连续处理触发器去启动微批处理触发器,反之亦然。

请注意,无论何时切换到连续模式,都将获得至少一次的容错保证。

支持的查询

从Spark 2.3开始,连续处理模式仅支持以下类型的查询。

  • Operations:在连续模式下仅支持dataset/dataframe的类似于map的操作,即支持projection(select,map,flatMap,mapPartitions等)和selection(where,filter等)。
  • 除了聚合函数(因为尚不支持聚合),current_timestamp()和current_date()(使用时间的确定性计算具有挑战性)之外,支持所有SQL函数。

Sources

  • Kafka Source:支持所有操作。
  • Rate source:适合测试。只有连续模式支持的选项是numPartitions和rowsPerSecond。

Sinks

  • Kafka sink:支持所有选项。
  • Memory sink:适合调试。
  • Console sink:适合调试。支持所有操作。请注意,控制台将打印你在连续触发器中指定的每个checkpoint间隔。

更详细的关于sink和source信息,请参阅输入源和输出接收器部分的官网。虽然控制台接收器非常适合测试,但是使用Kafka作为源和接收器可以最好地观察到端到端的低延迟处理。

注意事项

  • 连续处理引擎启动多个长时间运行的任务,这些任务不断从源中读取数据,处理数据并连续写入接收器。 查询所需的任务数取决于查询可以并行从源读取的分区数。 因此,在开始连续处理查询之前,必须确保群集中有足够的核心并行执行所有任务。 例如,如果您正在读取具有10个分区的Kafka主题,则群集必须至少具有10个核心才能使查询正常执行。
  • 停止连续处理流可能会产生虚假的任务终止警告。 这些可以安全地忽略。
  • 目前没有自动重试失败的任务。 任何失败都将导致查询停止,并且需要从检查点手动重新启动。(深受其害,kafka topic没数据流入也会挂掉的)

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2018-10-19

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏瓜大三哥

System DMA 和Bus Master DMA

System DMA是将DMA作为中心位置挂在总线上,能够被总线上的任何设备所使用。BMD是目前为止发现的基于PCIe总线使用最多的DMA类型(使用Endpoi...

22210
来自专栏信安之路

一道 CTF 题 get 到的新姿势

本文是从一个 CTF 题目中学到的一个新姿势,下面对我的学习做一个记录总结,给大家分享一下,希望大家多多参与一起分享学习。

1090
来自专栏java架构师

Hadoop学习5--配置本地开发环境(Windows+Eclipse)

一、导入hadoop插件到eclipse 插件名称:hadoop-eclipse-plugin-2.7.0.jar 我是从网上下载的,还可以自己编译。 放到ec...

3118
来自专栏挖掘大数据

如何高效地合并Spark社区PR到自己维护的分支

最近刚刚忙完Spark 2.2.0的性能测试及Bug修复,社区又要发布2.1.2了,国庆期间刚好有空,过了一遍2.1.2的相关JIRA,发现有不少重要修复2.2...

3868
来自专栏数据之美

通过hiveserver远程服务构建hive web查询分析工具

(1)hive 三种启动方式及用途,本文主要关注通过hiveserver(可jdbc连接)的方式启动  1, hive  命令行模式,直接输入/hive/...

2265
来自专栏Maroon1105

Linode Cloud中的大数据:使用Apache Storm进行流数据处理

Apache Storm是一项大数据技术,使软件,数据和基础架构工程师能够实时处理高速,大容量数据并提取有用信息。任何涉及实时处理高速数据流的项目都可以从中受益...

1762
来自专栏Petrichor的专栏

git:git commit 书写格式

  正如 git add 的作用是将文件放入暂存区, git commit 的作用是将修改提交到分支上。

4502
来自专栏about云

MapReduce执行过程分析【问题】

这个是个问题贴,由about云会员提问。会员答疑。提问和回答都比较有水平,分享出来。

1194
来自专栏CSDN技术头条

【问底】许鹏:使用Spark+Cassandra打造高性能数据分析平台(二)

【导读】笔者(许鹏)看Spark源码的时间不长,记笔记的初衷只是为了不至于日后遗忘。在源码阅读的过程中秉持着一种非常简单的思维模式,就是努力去寻找一条贯穿全局的...

23310
来自专栏Spark学习技巧

Kafka源码系列之以kafka为例讲解分布式存储系统

Kafka源码系列,浪尖是以kafka 0.8.2.2为例给大家讲解。由于公众号阅读不适大量文字,所以浪尖会尽量精简文字。目标是大家读完kafka源码系列能彻底...

3415

扫码关注云+社区

领取腾讯云代金券