我有许多CSV文件,需要通过文件名的一部分将它们合并到RDD中。
例如,对于以下文件
$ ls
20140101_1.csv 20140101_3.csv 20140201_2.csv 20140301_1.csv
20140301_3.csv 20140101_2.csv 20140201_1.csv 20140201_3.csv 我需要将名为20140101*.csv的文件组合到一个RDD中来处理,等等。
我使用sc.wholeTextFiles读取整个目录,然后根据文件名的模式对文件名进行分组,以形成一个文件名字符串。然后,我将字符串传递给sc.textFile,将文件作为一个RDD打开。
这是我的密码-
val files = sc.wholeTextFiles("*.csv")
val indexed_files = files.map(a => (a._1.split("_")(0),a._1))
val data = indexed_files.groupByKey
data.map { a =>
var name = a._2.mkString(",")
(a._1, name)
}
data.foreach { a =>
var file = sc.textFile(a._2)
println(file.count)
}当我试图给SparkException - NullPointerException打电话时,我得到了textFile。错误堆栈引用RDD中的Iterator。我无法理解错误-
15/07/21 15:37:37 INFO TaskSchedulerImpl: Removed TaskSet 65.0, whose tasks have all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 65.0 failed 4 times, most recent failure: Lost task 1.3 in stage 65.0 (TID 115, 10.132.8.10): java.lang.NullPointerException
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:33)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:32)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)但是,当我在星火外壳中执行sc.textFile(data.first._2).count时,我能够形成RDD并能够检索计数。
任何帮助都是非常感谢的。
发布于 2015-07-21 21:22:30
将评论转换为答复:
var file = sc.textFile(a._2)在另一个RDD的foreach中不起作用。你不能那样嵌套RDDs。
https://stackoverflow.com/questions/31549038
复制相似问题