首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Spark流不会持久化信息

Spark流不会持久化信息
EN

Stack Overflow用户
提问于 2017-10-26 16:15:48
回答 1查看 230关注 0票数 0

我已经创建了一个类似于wordcount的Spark流脚本。优点是,我希望将所有信息存储在一个集合(addedRDD)中,但过了一段时间后,由于块消失了,一个异常被启动。有没有办法将这个累积的RDD保存在内存中?

代码语言:javascript
复制
import org.apache.spark._
import org.apache.spark.streaming._
import scala.collection.mutable
import org.apache.spark.rdd.RDD

val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("localhost",9999)
var addedRDD : RDD[String] = sc.emptyRDD[String]

lines.foreachRDD( 
    rdd => {
        addedRDD = addedRDD.union(rdd).cache()
        addedRDD.collect().foreach(println)
    }
)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print
ssc.start() 
ssc.awaitTermination()

引发的异常如下所示(它表明该块已被删除):

代码语言:javascript
复制
    ERROR scheduler.JobScheduler: Error running job streaming job 1509014590000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: org.apache.spark.SparkException: Attempted to use BlockRDD[86] at socketTextStream at <console>:38 after its blocks have been removed!
org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47)
org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47)
org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47)
org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47)
org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1547)
org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1521)
org.apache.spark.scheduler.DAGScheduler$$anonfun$15.apply(DAGScheduler.scala:974)
org.apache.spark.scheduler.DAGScheduler$$anonfun$15.apply(DAGScheduler.scala:972)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
scala.collection.AbstractTraversable.map(Traversable.scala:105)
org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:972)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1609)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
        at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:981)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1609)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1940)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
        at $line38.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:47)
        at $line38.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:44)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Attempted to use BlockRDD[86] at socketTextStream at <console>:38 after its blocks have been removed!
        at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
        at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
        at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47)
        at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
        at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47)
        at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
        at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47)
        at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
        at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47)
        at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1547)
        at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1521)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$15.apply(DAGScheduler.scala:974)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$15.apply(DAGScheduler.scala:972)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:972)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1609)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
EN

回答 1

Stack Overflow用户

发布于 2017-10-26 17:49:07

上面的错误是OOM(Out Of MemoryException),因为您在向内存中添加了一些rdd之后将rdd存储在内存中。executor将耗尽内存异常,默认情况下,cache()将该rdd存储在内存中

选中以下选项将帮助您将读写数据存储到DISK_ONLY您可能还有一些选项,如MEMORY_AND_DISK,MEMORY_AND_DISK_SER,MEMORY_ONLY,MEMORY_ONLY_SER每个选项将带有_2后缀表示磁盘或内存中的副本数和SER-表示序列化,以减少磁盘或内存中的存储空间

代码语言:javascript
复制
import org.apache.spark.storage.StorageLevel
addedRDD = addedRDD.union(rdd).persist(StorageLevel.DISK_ONLY)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46949070

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档