我们如何在星火shell中replace
一个元素?
如:val t= sc.parallelize(Seq(("100",List("2","-4","NA","6","8","2"))))
我想用0代替NA
发布于 2017-04-18 19:57:15
您可以尝试使用0替换NA
,但会给出一个新的RDD
。
scala> val t= sc.parallelize(Seq(("100",List("2","-4","NA","6","8","2"))))
t: org.apache.spark.rdd.RDD[(String, List[String])] = ParallelCollectionRDD[0] at parallelize at <console>:21
scala> val newRDD = t.map( x => (x._1,x._2.map{case "NA" => 0; case x => x }))
newRDD: org.apache.spark.rdd.RDD[(String, List[Any])] = MapPartitionsRDD[3] at map at <console>:23
scala> newRDD.collect
res5: Array[(String, List[Any])] = Array((100,List(2, -4, 0, 6, 8, 2)))
发布于 2017-04-18 20:56:05
当您并行化序列时,spark将创建一个提供值的RDD。这个RDD存储在整个集群中。RDD的本质是不可变的,另一种方法可能是从RDD中筛选出"NA“值,将它们映射到Int,并将每个元素乘以零。并将过滤后的RDD合并到RDD,包括not "NA“元素。
样本码
val t= sc.parallelize(Seq(("100",List("2","-4","NA","6","8","2"))))
val a = t.map(i => i._2).filter(i => i.contains("NA"))
val b = t.map(i => i._2).filter(i => !i.contains("NA")).map(i => (i*0))
val d = a.union(b)
https://stackoverflow.com/questions/43485633
复制相似问题