首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Spark Streaming 误用.transform(func)函数导致的问题解析

Spark/Spark Streaming transform 是一个很强的方法,不过使用过程中可能也有一些值得注意的问题。...在分析的问题,我们还会顺带讨论下Spark Streaming 生成job的逻辑,从而让大家知道问题的根源。 问题描述 今天有朋友贴了一段 gist,大家可以先看看这段代码有什么问题。...特定情况你会发现UI 的Storage标签上有很多新的Cache RDD,然后你以为是Cache RDD 不被释放,但是通过Spark Streaming 数据清理机制分析我们可以排除这个问题。...Spark Streaming generateJob 逻辑解析 在JobGenerator中,会定时产生一个GenerateJobs的事件: private val timer = new RecurringTimer...然而transform 又特别灵活,可以执行各种RDD操作,这个时候Spark Streaming 是拦不住你的,一旦你使用了count之类的Action,产生Job的时候就会被立刻执行,而不是等到Job

39530
您找到你想要的搜索结果了吗?
是的
没有找到

spark 2.3 导致driver OOM的一个SparkPlanGraphWrapper源码的bug

背景 长话短说,我们部门一个同事找到我,说他的spark 2.3 structured streaming程序频繁报OOM,从来没有坚持过超过三四天的,叫帮看一下。...这种事情一般我是不愿意看的,因为大部分情况下spark oom就那么几种可能: 数据量拉太大,executor内存爆了; shuffle过程中数据量太大,shuffle数太少,内存又爆了; 闲着蛋疼调用...所以问题应该比较清晰了,spark应该是每次执行batch时在什么地方往这个map里加了很多数据,但是又忘记了移除掉已经过期的部分,所以导致gc无效了。...* the calling thread by setting the `ASYNC_TRACKING_ENABLED` configuration to `false`. */ private[spark...结果 按理说到这里就差不多了,这个OOM的锅还真不能让同事背,的确是spark的一个bug。但是我很好奇,这么大一个问题,spark社区难道就没有动静吗?

78620

SparkSQL并行执行多个Job的探索

但是,这样做就会导致有部分cpu-vcore在写入过程中处于闲置状态,造成了资源浪费。 显然,在这件事情上,“充分利用资源”和“产生少量文件”两个方向发生了冲突。那么,有没有一个两全之策呢?...上述思路可以总结为:通过一个SparkContex并行提交多个Job,由Spark自己来调度资源,实现并行执行。针对这个思路,首先要搞清楚Spark是否支持这么玩,如果支持的话又是怎么支持的。...Spark是以TaskSetManager为单元来调度任务的。...目前,Spark支持FIFO和FAIR两种调度策略。...可以用多线程方式并行提交Job,示例如下: var df = spark.read.json("person.json").repartition(55) // df.cache() // val c

1.4K20

SparkSQL并行执行多个Job的探索

但是,这样做就会导致有部分cpu-vcore在写入过程中处于闲置状态,造成了资源浪费。 显然,在这件事情上,“充分利用资源”和“产生少量文件”两个方向发生了冲突。那么,有没有一个两全之策呢?...上述思路可以总结为:通过一个SparkContex并行提交多个Job,由Spark自己来调度资源,实现并行执行。针对这个思路,首先要搞清楚Spark是否支持这么玩,如果支持的话又是怎么支持的。...Spark是以TaskSetManager为单元来调度任务的。...目前,Spark支持FIFO和FAIR两种调度策略。...可以用多线程方式并行提交Job,示例如下: var df = spark.read.json("person.json").repartition(55) // df.cache() // val c

75410

SparkSQL并行执行多个Job的探索

但是,这样做就会导致有部分cpu-vcore在写入过程中处于闲置状态,造成了资源浪费。 显然,在这件事情上,“充分利用资源”和“产生少量文件”两个方向发生了冲突。那么,有没有一个两全之策呢?...上述思路可以总结为:通过一个SparkContex并行提交多个Job,由Spark自己来调度资源,实现并行执行。针对这个思路,首先要搞清楚Spark是否支持这么玩,如果支持的话又是怎么支持的。...Spark是以TaskSetManager为单元来调度任务的。...目前,Spark支持FIFO和FAIR两种调度策略。...可以用多线程方式并行提交Job,示例如下: var df = spark.read.json("person.json").repartition(55) // df.cache() // val c

1.6K40

3万字长文,PySpark入门级学习教程,框架思维

关于PySpark,我们知道它是Python调用Spark的接口,我们可以通过调用Python API的方式来编写Spark程序,它支持了大多数的Spark功能,比如SparkDataFrame、Spark...-+---+ # DataFrame.cache\DataFrame.persist # 可以把一些数据放入缓存中,default storage level (MEMORY_AND_DISK). df.cache...这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。 MEMORY_AND_DISK_SER 基本含义同MEMORY_AND_DISK。唯一的区别是会先序列化,节约内存。...而为什么使用了这些操作就容易导致数据倾斜呢?大多数情况就是进行操作的key分布不均,然后使得大量的数据集中在同一个处理节点上,从而发生了数据倾斜。...() Plan A: 过滤掉导致倾斜的key 这个方案并不是所有场景都可以使用的,需要结合业务逻辑来分析这个key到底还需要不需要,大多数情况可能就是一些异常值或者空串,这种就直接进行过滤就好了。

8K20

Spark性能调优篇七之JVM相关参数调整

好,说回Spark,运行Spark作业的时候,JVM对会对Spark作业产生什么影响呢?答案很简单,如果数据量过大,一定会导致JVM内存不足。...这是因为可能是说executor的堆外内存不太够用,导致executor在运行的过程中,可能会内存溢出;然后可能导致后续的stage的task在运行的时候,可能要从一些executor中去拉取shuffle...默认情况下,这个堆外内存上限大概是300多M;我们通常项目中真正处理大数据的时候,这里都会出现问题导致spark作业反复崩溃无法运行;此时就会去调节这个参数,到至少1G或者更大的内存。...2.连接等待时长的调整 a) 问题提出:         由于JVM内存过小,导致频繁的Minor gc,有时候更会触犯full gc,一旦出发full gc;此时所有程序暂停,导致无法建立网络连接;spark...几次都拉取不到数据的话,可能会导致spark作业的崩溃。也可能会导致DAGScheduler,反复提交几次stage。TaskScheduler,反复提交几次task。

1.8K10

0820-CDSW在Session中运行代码超过一次就报错问题分析

问题分析过程 由于这个报错出现在CDSW服务中,因此首先需要确认是CDSW侧导致的问题还是CDH侧导致的问题。...credentials provider对于这种已有凭据的请求不会进行任何处理,因此导致了Delegation Token的报错,详情可以查阅上面的jira链接,同时该jira影响的版本是Spark2.2.0...,与行内使用的Spark版本相符,在与Support沟通后,确认了问题是该jira导致。...问题处理结论 基于该问题是Spark版本的bug导致,因此从根本上解决该问题的方式是升级行内的Spark版本,目前行内所使用的Spark2.2.0是一个比较老的版本,该版本在CDH5.16.2上其实已经不支持了...5.13升级上来的,因此还在继续使用该版本的Spark,建议将行内的Spark版本升级到Spark2.4,一方面来说Spark2.4是Spark2的最高版本,相比Spark2.2多了新特性以及一些bug

66920

Spark面对OOM问题的解决方法及优化总结

,这就导致了Executor的内存利用率不高,而且需要根据Application的具体情况,使用者自己来调节这两个参数才能优化Spark的内存使用。...内存溢出解决方法: 1. map过程产生大量对象导致内存溢出: 这种溢出的原因是在单个map中产生了大量的对象导致的,例如:rdd.map(x=>for(i 针对这种问题,在不增加内存的情况下,可以通过减少每个...2.数据不平衡导致内存溢出: 数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,解决方法和上面说的类似,就是调用repartition重新分区。这里就不再累赘了。...3.coalesce调用导致内存溢出: 这是我最近才遇到的一个问题,因为hdfs中不适合存小问题,所以Spark计算后如果产生的文件太小,我们会调用coalesce合并文件再存入hdfs中。...就容易导致内存溢出的情况。

85410

Spark性能调优06-JVM调优

Spark 调优和 JVM 调优的关系 再JVM虚拟机中,当创建的对象的数量很多时,Eden 和 Survior1 区域会很快的满溢,就需要进行频繁地 Minor GC,这样会导致有一些生命周期较短的对象迅速长到...,而且不管 Minor GC 还是 Full GC 都会导致 JVM 的工作线程停止,因为 Scala 也是基于 JVM 的编程语言,所以运行 Spark 程序和运行 Java 程序在 JVM 中的内存分配情况是相同的...避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。...此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值 spark.shuffle.memoryFraction...对象被回收 解决办法: 增加 Executor 的内存,调整--executor-memory(spark.executor.memory)的值 (2) 由于堆外内存不足导致的Executor挂掉的话

1.3K10

Spark性能优化 (4) | JVM 调优

gc,甚至于频繁的full gc,进而导致Spark频繁的停止工作,性能影响会很大。...默认情况下,Executor 堆外内存上限大概为300多MB,在实际的生产环境下,对海量数据进行处理的时候,这里都会出现问题,导致Spark作业反复崩溃,无法运行,此时就会去调节这个参数,到至少1G,甚至于...如果 task 在运行过程中创建大量对象或者创建的对象较大,会占用大量的内存,这会导致频繁的垃圾回收,但是垃圾回收会导致工作现场全部停止,也就是说,垃圾回收一旦执行,Spark 的 Executor 进程就会停止工作...,无法提供相应,此时,由于没有响应,无法建立网络连接,会导致网络连接超时。...这种情况也可能会导致 DAGScheduler 反复提交几次 stage,TaskScheduler 返回提交几次 task,大大延长了我们的 Spark 作业的运行时间。

89230

SparkSQL执行时参数优化

设置超过40个executor,但未指定分区数,导致多数executor空闲....这是导致executor并行度上不去的罪魁祸首,之所以这样计算是为了尽量避免计算最慢的task决定整个stage的时间,将其设置为总核心的2-3倍,让运行快的task可以继续领取任务计算直至全部任务计算完毕...) 开启spark.sql.auto.repartition=true 自动重新分区 (每个stage[阶段]运行时分区并不尽相同,使用此配置可优化计算后分区数,避免分区数过大导致单个分区数据量过少,每个...task运算分区数据时时间过短,从而导致task频繁调度消耗过多时间) 设置spark.sql.shuffle.partitions=400 提高shuffle并行度 (shuffle read task...并未测试 (Executor 进程除了运行task 也要进行写shuffle 数据,当Executor进程任务过重时,导致GC不能为其他Executor提供shuffle数据时将会影响效率.此服务开启时代替

1.3K10

Spark面对OOM问题的解决方法及优化总结

,这就导致了Executor的内存利用率不高,而且需要根据Application的具体情况,使用者自己来调节这两个参数才能优化Spark的内存使用。...内存溢出解决方法: 1. map过程产生大量对象导致内存溢出: 这种溢出的原因是在单个map中产生了大量的对象导致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString...2.数据不平衡导致内存溢出: 数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,解决方法和上面说的类似,就是调用repartition重新分区。这里就不再累赘了。...3.coalesce调用导致内存溢出: 这是我最近才遇到的一个问题,因为hdfs中不适合存小问题,所以Spark计算后如果产生的文件太小,我们会调用coalesce合并文件再存入hdfs中。...,就容易导致内存溢出的情况。

2.9K20

Apache Spark:来自Facebook的60 TB +生产用例

最重要的是,我们在Spark driver中实现了一项功能,以便能够暂停任务的调度,以便由于群集重新启动导致过多的任务失败不会导致job失败。...其他可靠性修复 无响应的driver (SPARK-13279):在添加任务时,由于O(N ^ 2)操作,Spark driver卡住了,导致作业最终被卡住并终止。...由于大缓冲区的整数溢出导致的TimSort问题 (SPARK-13850):测试发现Spark的unsafe内存操作有一个导致TimSort内存损坏的错误。...Spark executor内存不足,因为sorter中存在导致指针数组无限增长的错误。我们通过在没有更多可用于指针数组增长的内存时强制将数据溢出到磁盘来解决该问题。...修复由于fetch失败导致的重复任务运行问题 (SPARK-14649):Spark driver在发生fetch失败时重新提交已在运行的任务,从而导致性能不佳。

1.2K20

关于Spark的面试题,你应该知道这些!

本篇博客,博主打算再出个番外篇,也就是再为大家分享一些Spark面试题,敢问各位准备好了么~ 1、Spark Application在没有获得足够的资源,job就开始执行了,可能会导致什么问题发生?...driver端的内存溢出 : 可以增大driver的内存参数:spark.driver.memory (default 1g) map过程产生大量对象导致内存溢出: 具体做法可以在会产生大量对象的...数据不平衡导致内存溢出: 数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,解决方法和上面说的类似,就是调用repartition重新分区。...standalone模式下资源分配不均匀导致内存溢出: 这种情况的解决方法就是同时配置–executor-cores或者spark.executor.cores参数,确保Executor...DataFrame引入了off-heap,构建对象直接使用操作系统的内存,不会导致频繁GC。

1.7K21

Spark性能调优指北:性能优化和故障处理

注意,过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得 Spark 作业的运行时间反而增加了。...默认情况下,Executor 堆外内存上限大概为 300MB,在实际的生产环境下,对海量数据进行处理的时候,这里都会出现问题,导致 Spark 作业反复崩溃,无法运行,此时就会去调节这个参数,到至少1G...过滤导致倾斜的 key 在 Spark 作业过程中出现的异常数据,比如 null 值,将可能导致数据倾斜,此时滤除可能导致数据倾斜的 key 对应的数据,这样就不会发生数据倾斜了。...所以, 当由单个 key 导致数据倾斜时,可有将发生数据倾斜的 key 单独提取出来,组成一个 RDD,然后用这个原本会导致倾斜的 key 组成的 RDD 跟其他 RDD 单独 join,此时,根据 Spark...这就导致有可能在Spark任务运行过程中,由于频繁大量的网络通讯,本地机器的网卡流量会激增。

89160
领券