我想在Spark数据集上循环,并根据每行的特征将特定值保存在Map中。我是Spark和Scala的新手,所以我加入了一个简单的例子来说明我在python中要做的事情。
python中的最小工作示例:
mydict = dict()
for row in data:
if row['name'] == "Yan":
mydict[row['id']] = row['surname']
else:
mydict[row['id']] = "Random lad"
其中,data是org.apache.spark.sql.Datasetorg.apache.spark.sql.Row.类型的(大) spark数据集
你知道Spark或Scala的方式吗?
发布于 2019-10-22 10:59:23
您不能遍历Dataset
的内容,因为它们在运行此代码的机器上是不可访问的,而是分散在(可能有许多)不同的工作节点上。这是像spark这样的分布式执行引擎的基本概念。
相反,您必须在函数式( map、filter、reduce、... )中操作数据。操作传播到工作进程)或声明性(在工作进程上执行的sql查询)方式。
为了实现你的目标,你可以在你的数据上运行一个地图,检查名字是否等于"Yan“,然后从那里继续下去。在此转换之后,您可以collect
您的数据帧并将其转换为字典。
您还应该检查使用Spark和映射的方法:您似乎希望在mydict
中为data
的每个元素创建一个条目。这意味着你的数据要么足够小,以至于你实际上不需要使用Spark,要么它可能会失败,因为它不适合你的驱动程序内存。
发布于 2019-10-22 11:02:56
我想你就是在找这样的东西。如果你的最终df不是很大,你可以收集它并存储为map。
scala> df.show()
+---+----+--------+
| id|name|surrname|
+---+----+--------+
| 1| Yan| abc123|
| 2| Abc| def123|
+---+----+--------+
scala> df.select('id, when('name === "Yan", 'surrname).otherwise("Random lad")).toDF("K","V").show()
+---+----------+
| K| V|
+---+----------+
| 1| abc123|
| 2|Random lad|
+---+----------+
发布于 2019-10-22 11:03:40
这是一个简单的方法,但是要小心使用collect()
,因为它会收集driver中的数据。数据应该能够适应驱动程序。
我不建议你这样做。
var df: DataFrame = Seq(
("1", "Yan", "surname1"),
("2", "Yan1", "surname2"),
("3", "Yan", "surname3"),
("4", "Yan2", "surname4")
).toDF("id", "name", "surname")
val myDict = df.withColumn("newName", when($"name" === "Yan", $"surname").otherwise("RandomeName"))
.rdd.map(row => (row.getAs[String]("id"), row.getAs[String]("newName")))
.collectAsMap()
myDict.foreach(println)
输出:
(2,RandomeName)
(1,surname1)
(4,RandomeName)
(3,surname3)
https://stackoverflow.com/questions/58502313
复制相似问题