前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Structrued Streaming 及 DStreaming 调优笔记

Spark Structrued Streaming 及 DStreaming 调优笔记

作者头像
大鹅
发布2021-06-16 17:18:38
1.3K0
发布2021-06-16 17:18:38
举报

背景

项目中用的是Spark Structrued Streaming ,也就是Spark 2.0的新版Streaming,看官方文档也说过性能及实时性会比之前的Dstreaming好点,但是相关的资料相比Dstreaming实在是少很多,现在调优阶段很多都要参考Dstreaming的文章以及经验。

这里整理一个Structured Streaming和DStreaming通用的不同方向、思路的调优的笔记,如有理解不当欢迎指正。

我们的总体目标是:

  1. 减少每个batch interval的Processing Time
  2. 设置正确的batch size(每个batch interval的数据量大小)

为此,这里举出了对不同方向的优化措施:

1. 数据接收并行度

1.1 创建多个Stream(拆分topic)

Spark通过MQ接收数据时(比如Kafka、Flume),会将数据反序列化,并存储在Spark的内存中。如果数据接收称为系统的瓶颈,那么可以考虑并行化数据接收。每一个输入Stream都会在某个Worker的Executor上启动一个Receiver,该Receiver接收一个数据流。因此可以通过创建多个输入Stream,并且配置它们接收数据源不同的分区数据,达到接收多个数据流的效果。

比如说,一个接收两个Kafka Topic的输入Stream,可以被拆分为两个输入Stream,每个分别接收一个topic的数据。这样就会创建两个Receiver,从而并行地接收数据,进而提升吞吐量。

多个Stream可以使用union算子进行聚合,从而形成一个Stream。然后后续的transformation算子操作都针对该一个聚合后的Stream即可。

代码语言:javascript
复制
int numStreams = 5;
List> kafkaStreams = new ArrayList>(numStreams);
for (int i = 0; i < numStreams; i++) {
    kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
1.2 调节block interval / partition数量

block interval默认是200ms。对于大多数Receiver来说,在将接收到的数据保存到Spark的BlockManager之前,都会将数据切分为一个一个的block。而每个batch中的block数量,则决定了该batch对应的RDD的partition的数量,以及针对该RDD执行transformation操作时,创建的task的数量。每个batch对应的task数量是大约估计的,即batch interval / block interval。

例如说,batch interval为2s,block interval为200ms,会创建10个task。如果你认为每个batch的task数量太少,即低于每台机器的cpu core数量,那么就说明batch的task数量是不够的,因为所有的cpu资源无法完全被利用起来。要为batch增加block的数量,那么就减小block interval。然而,推荐的block interval最小值是50ms,如果低于这个数值,那么大量task的启动时间,可能会变成一个性能开销点。

  • Dstreaming 可以通过参数spark.streaming.blockInterval设置
  • Structured Streaming需要显式地对输入数据流进行重分区。使用inputStreamDataset.repartition()即可。这样就可以将接收到的batch,分布到指定数量的机器上,然后再进行进一步的操作。

先来看看一个 stage 里所有 task 运行的一些性能指标,其中的一些说明:

  • Scheduler Delay: spark 分配 task 所花费的时间
  • Executor Computing Time: executor 执行 task 所花费的时间
  • Getting Result Time: 获取 task 执行结果所花费的时间
  • Result Serialization Time: task 执行结果序列化时间
  • Task Deserialization Time: task 反序列化时间
  • Shuffle Write Time: shuffle 写数据时间
  • Shuffle Read Time: shuffle 读数据所花费时间

partition 数量的变化会影响上面几个指标的变动。我们调优的时候,很多时候都会看上面的指标变化情况。当 partition 变化的时候,上面几个指标变动情况如下:

  • partition 过小[容易引入 data skew 问题]
    • Scheduler Delay: 无明显变化
    • Executor Computing Time: 不稳定,有大有小,但平均下来比较大
    • Getting Result Time: 不稳定,有大有小,但平均下来比较大
    • Result Serialization Time: 不稳定,有大有小,但平均下来比较大
    • Task Deserialization Time: 不稳定,有大有小,但平均下来比较大
    • Shuffle Write Time: 不稳定,有大有小,但平均下来比较大
    • Shuffle Read Time: 不稳定,有大有小,但平均下来比较大
  • partition 过大
    • Scheduler Delay: 无明显变化
    • Executor Computing Time: 比较稳定,平均下来比较小
    • Getting Result Time: 比较稳定,平均下来比较小
    • Result Serialization Time: 比较稳定,平均下来比较小
    • Task Deserialization Time: 比较稳定,平均下来比较小
    • Shuffle Write Time: 比较稳定,平均下来比较小
    • Shuffle Read Time: 比较稳定,平均下来比较小

注意:

  1. 推荐num_partition=(batch_interval / block_interval) * num_receiver
  2. 在数据落地的时候,partition数不宜过多,可以避免小文件较多影响加载。落地前调用rdd.coalesce(num_partition)以减少partition数

2. 任务启动调优

如果每秒钟启动的task过于多,比如每秒钟启动50个,那么发送这些task去Worker节点上的Executor的性能开销,会比较大,而且此时基本就很难达到毫秒级的延迟了。使用下述操作可以减少这方面的性能开销:

  1. Task序列化:使用Kryo序列化机制来序列化task,可以减小task的大小,从而减少发送这些task到各个Worker节点上的Executor的时间。
  2. 执行模式:在Standalone模式下运行Spark,可以达到更少的task启动时间。

上述方式,也许可以将每个batch的处理时间减少100毫秒。从而从秒级降到毫秒级。

3. 数据处理并行度调优

如果在计算的任何stage中使用的并行task的数量没有足够多,那么集群资源是无法被充分利用的。举例来说,对于分布式的reduce操作,比如reduceByKey和reduceByKeyAndWindow,默认的并行task的数量是由spark.default.parallelism参数决定的。你可以在reduceByKey等操作中,传入第二个参数,手动指定该操作的并行度,也可以调节全局的spark.default.parallelism参数。

如果parallel task不足,那么core利用率不高。通过提高默认并行度来加速spark.default.parallelism,task数量也不宜过多,太多了,task的序列化与反序列化耗时也更高,适得其反。建议是#executors * #core_per_executor * 4

4. 数据序列化调优

4.1

数据序列化造成的系统开销可以由序列化格式的优化来减小。在流式计算的场景下,有两种类型的数据需要序列化。

1、输入数据:默认情况下,接收到的输入数据,是存储在Executor的内存中的,使用的持久化级别是StorageLevel.MEMORY_AND_DISK_SER_2。这意味着,数据被序列化为字节从而减小GC开销,并且会复制以进行executor失败的容错。因此,数据首先会存储在内存中,然后在内存不足时会溢写到磁盘上,从而为流式计算来保存所有需要的数据。这里的序列化有明显的性能开销——Receiver必须反序列化从网络接收到的数据,然后再使用Spark的序列化格式序列化数据。

2、流式计算操作生成的持久化RDD:流式计算操作生成的持久化RDD,可能会持久化到内存中。例如,窗口操作默认就会将数据持久化在内存中,因为这些数据后面可能会在多个窗口中被使用,并被处理多次。然而,不像Spark Core的默认持久化级别,StorageLevel.MEMORY_ONLY,流式计算操作生成的RDD的默认持久化级别是StorageLevel.MEMORY_ONLY_SER ,默认就会减小GC开销。

4.2 数据序列化调优

在上述的场景中,使用Kryo序列化类库可以减小CPU和内存的性能开销。使用Kryo时,一定要考虑注册自定义的类,并且禁用对应引用的tracking(spark.kryo.referenceTracking)。

在一些特殊的场景中,比如需要为流式应用保持的数据总量并不是很多,也许可以将数据以非序列化的方式进行持久化,从而减少序列化和反序列化的CPU开销,而且又不会有太昂贵的GC开销。举例来说,如果你数秒的batch interval,并且没有使用window操作,那么你可以考虑通过显式地设置持久化级别,来禁止持久化时对数据进行序列化。这样就可以减少用于序列化和反序列化的CPU性能开销,并且不用承担太多的GC开销。

5. batch interval调优

如果想让一个运行在集群上的Spark Streaming应用程序可以稳定,它就必须尽可能快地处理接收到的数据。换句话说,batch应该在生成之后,就尽可能快地处理掉。对于一个应用来说,这个是不是一个问题,可以通过观察Spark UI上的batch处理时间来定。batch处理时间必须小于batch interval时间

基于流式计算的本质,batch interval对于,在固定集群资源条件下,应用能保持的数据接收速率,会有巨大的影响。例如,在WordCount例子中,对于一个特定的数据接收速率,应用业务可以保证每2秒打印一次单词计数,而不是每500ms。因此batch interval需要被设置得,让预期的数据接收速率可以在生产环境中保持住。

为你的应用计算正确的batch大小的比较好的方法,是在一个很保守的batch interval,比如5~10s,以很慢的数据接收速率进行测试。要检查应用是否跟得上这个数据速率,可以检查每个batch的处理时间的延迟,如果处理时间与batch interval基本吻合,那么应用就是稳定的。否则,如果batch调度的延迟持续增长,那么就意味应用无法跟得上这个速率,也就是不稳定的。因此你要想有一个稳定的配置,可以尝试提升数据处理的速度,或者增加batch interval。记住,由于临时性的数据增长导致的暂时的延迟增长,可以合理的,只要延迟情况可以在短时间内恢复即可。

6. 内存调优

6.1 评估内存用量

Spark Streaming应用需要的集群内存资源,是由使用的transformation操作类型决定的。举例来说,如果想要使用一个窗口长度为10分钟的window操作,那么集群就必须有足够的内存来保存10分钟内的数据。如果想要使用updateStateByKey来维护许多key的state,那么你的内存资源就必须足够大。反过来说,如果想要做一个简单的map-filter-store操作,那么需要使用的内存就很少。

通常来说,通过Receiver接收到的数据,会使用StorageLevel.MEMORY_AND_DISK_SER_2持久化级别来进行存储,因此无法保存在内存中的数据会溢写到磁盘上。而溢写到磁盘上,是会降低应用的性能的。因此,通常是建议为应用提供它需要的足够的内存资源。建议在一个小规模的场景下测试内存的使用量,并进行评估。

6.2 GC

内存调优的另外一个方面是垃圾回收。对于流式应用来说,如果要获得低延迟,肯定不想要有因为JVM垃圾回收导致的长时间延迟。有很多参数可以帮助降低内存使用和GC开销:

  1. Stream的持久化:正如在数据序列化调优一节中提到的,输入数据和某些操作生产的中间RDD,默认持久化时都会序列化为字节。与非序列化的方式相比,这会降低内存和GC开销。使用Kryo序列化机制可以进一步减少内存使用和GC开销。 进一步降低内存使用率,可以对数据进行压缩,由spark.rdd.compress参数控制(默认false),但是CPU耗时会升高。
  2. 清理旧数据:默认情况下,所有输入数据和通过DStream transformation操作生成的持久化RDD,会自动被清理。Spark Streaming会决定何时清理这些数据,取决于transformation操作类型。例如,你在使用窗口长度为10分钟内的window操作,Spark会保持10分钟以内的数据,时间过了以后就会清理旧数据。但是在某些特殊场景下,比如Spark SQL和Spark Streaming整合使用时,在异步开启的线程中,使用Spark SQL针对batch RDD进行执行查询。那么就需要让Spark保存更长时间的数据,直到Spark SQL查询结束。可以使用streamingContext.remember()方法来实现。
  3. CMS垃圾回收器:使用并行的mark-sweep垃圾回收机制,被推荐使用,用来保持GC低开销。虽然并行的GC会降低吞吐量,但是还是建议使用它,来减少batch的处理时间(降低处理过程中的gc开销)。如果要使用,那么要在driver端和executor端都开启。在spark-submit中使用--driver-java-options设置;使用spark.executor.extraJavaOptions参数设置 -XX:+UseConcMarkSweepGC。
  4. 用堆外内存来持久化RDDs,堆外没有GC

7. 长时运行保障

7.1 Fault tolerance 错误容忍
  • 增加AM & Spark Driver 重试次数 spark.yarn.maxAppAttempts=4 spark.yarn.am.attemptFailuresValidityInterval=1h
  • 增加Executor失败最大容忍次数 spark.yarn.max.executor.failures={8*num_executors} spark.yarn.executor.failuresValidityInterval=1h
  • 增加Task失败最大容忍次数 spark.task.maxFailures=8
  • 增加网络等待时长 spark.rpc.askTimeout=600s spark.network.timeout=600s
7.2 优雅停止Streaming程序 及Checkpoint设置

保障异常退出或者主动kill不丢数据

spark.streaming.stopGracefullyOnShutdown=true spark.sql.streaming.checkpointLocation=hdfs://hdfsCluster/spark_checkpoint/" + appName

7.3 Performance
  • 开启推测执行,淘汰掉执行慢的task(action操作有幂等性) spark.speculation=true
  • 控制消息中间件的速率 spark.streaming.backpressure.enable=true spark.streaming.receiver.maxRate=XXX

Ref

  1. https://blog.csdn.net/kwu_ganymede/article/details/50577920
  2. https://www.jianshu.com/p/6d576e8186f8
  3. Spark最佳实践
  4. http://litaotao.github.io/boost-spark-application-performance?s=inner
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-05-19 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 1. 数据接收并行度
    • 1.1 创建多个Stream(拆分topic)
      • 1.2 调节block interval / partition数量
      • 2. 任务启动调优
      • 3. 数据处理并行度调优
      • 4. 数据序列化调优
        • 4.1
          • 4.2 数据序列化调优
          • 5. batch interval调优
          • 6. 内存调优
            • 6.1 评估内存用量
              • 6.2 GC
              • 7. 长时运行保障
                • 7.1 Fault tolerance 错误容忍
                  • 7.2 优雅停止Streaming程序 及Checkpoint设置
                    • 7.3 Performance
                    • Ref
                    相关产品与服务
                    文件存储
                    文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档