我有一个这样的spark DataFrame:
+---------+---------+---------------------------------------------------+
|which_one| matchID| information|
+---------+---------+---------------------------------------------------+
| First| 123|[[1.2, 4.5, 837], [1.4, 4.8, 123], [4.1, 4.7, 143]]|
| First| 234|[[4.8, 8.9, 234], [1.1, 4.2, 321], [3.9, 5.7, 521]]|
| Second| 345|[[7.7, 8.1, 457], [4.5, 4.9, 345], [1.9, 2.8, 776]]|
+---------+---------+---------------------------------------------------+
对于每个matchID,我需要找到相应的信息列表。然后,使用"which_one“来提取我需要的信息。
例如,对于matchID = 123,它对应于1.4,4.8,123。然后,which_one = " first“意味着我需要第一个val 1.4。
对于matchID = 234,它对应于4.8,8.9,234。然后,which_one = " first“意味着我需要第一个val 4.8。
对于matchID = 345,它对应于4.5,4.9,345。然后,which_one = " second“意味着我需要第二个val 4.9。
我想要的结果表是在末尾添加一个新列:
+---------+---------+---------------------------------------------------+---+
|which_one| matchID| information|res|
+---------+---------+---------------------------------------------------+---+
| First| 123|[[1.2, 4.5, 837], [1.4, 4.8, 123], [4.1, 4.7, 143]]|1.4|
| First| 234|[[4.8, 8.9, 234], [1.1, 4.2, 321], [3.9, 5.7, 521]]|4.8|
| Second| 345|[[7.7, 8.1, 457], [4.5, 4.9, 345], [1.9, 2.8, 776]]|4.9|
+---------+---------+---------------------------------------------------+---+
我尝试使用foreach和rdd遍历每一行。然而,它似乎是无能为力的,因为我找不到在信息栏中搜索matchID的方法。我在网上搜索了几个小时,还没有找到有用的资源。如有任何建议或提示,欢迎光临。
发布于 2019-05-29 03:31:15
explode函数可以将information列转换为数组类型,然后通过matchID过滤新列:
val df = Seq(
("First", 123, Array(Array(1.2, 4.5, 837), Array(1.4, 4.8, 123), Array(4.1, 4.7, 143))),
("First", 234, Array(Array(4.8, 8.9, 234), Array(1.1, 4.2, 321), Array(3.9, 5.7, 521))),
("Second", 345, Array(Array(7.7, 8.1, 457), Array(4.5, 4.9, 345), Array(1.9, 2.8, 776))))
.toDF("which_one", "matchID", "information")
val indexColumn = when($"which_one" === "First", 1).otherwise(
when($"which_one" === "Second", 2).otherwise(3))
val exploded = df
.withColumn("exploded", explode($"information"))
.withColumn("indexColumn", indexColumn)
exploded.show(false)
exploded
.where(expr("array_contains(exploded, matchID )"))
.withColumn("res", expr("element_at(exploded,indexColumn)"))
.drop("exploded", "indexColumn")
输出:
+---------+-------+---------------------------------------------------------+-----------------+-----------+
|which_one|matchID|information |exploded |indexColumn|
+---------+-------+---------------------------------------------------------+-----------------+-----------+
|First |123 |[[1.2, 4.5, 837.0], [1.4, 4.8, 123.0], [4.1, 4.7, 143.0]]|[1.2, 4.5, 837.0]|1 |
|First |123 |[[1.2, 4.5, 837.0], [1.4, 4.8, 123.0], [4.1, 4.7, 143.0]]|[1.4, 4.8, 123.0]|1 |
|First |123 |[[1.2, 4.5, 837.0], [1.4, 4.8, 123.0], [4.1, 4.7, 143.0]]|[4.1, 4.7, 143.0]|1 |
|First |234 |[[4.8, 8.9, 234.0], [1.1, 4.2, 321.0], [3.9, 5.7, 521.0]]|[4.8, 8.9, 234.0]|1 |
|First |234 |[[4.8, 8.9, 234.0], [1.1, 4.2, 321.0], [3.9, 5.7, 521.0]]|[1.1, 4.2, 321.0]|1 |
|First |234 |[[4.8, 8.9, 234.0], [1.1, 4.2, 321.0], [3.9, 5.7, 521.0]]|[3.9, 5.7, 521.0]|1 |
|Second |345 |[[7.7, 8.1, 457.0], [4.5, 4.9, 345.0], [1.9, 2.8, 776.0]]|[7.7, 8.1, 457.0]|2 |
|Second |345 |[[7.7, 8.1, 457.0], [4.5, 4.9, 345.0], [1.9, 2.8, 776.0]]|[4.5, 4.9, 345.0]|2 |
|Second |345 |[[7.7, 8.1, 457.0], [4.5, 4.9, 345.0], [1.9, 2.8, 776.0]]|[1.9, 2.8, 776.0]|2 |
+---------+-------+---------------------------------------------------------+-----------------+-----------+
+---------+-------+---------------------------------------------------------+---+
|which_one|matchID|information |res|
+---------+-------+---------------------------------------------------------+---+
|First |123 |[[1.2, 4.5, 837.0], [1.4, 4.8, 123.0], [4.1, 4.7, 143.0]]|1.4|
|First |234 |[[4.8, 8.9, 234.0], [1.1, 4.2, 321.0], [3.9, 5.7, 521.0]]|4.8|
|Second |345 |[[7.7, 8.1, 457.0], [4.5, 4.9, 345.0], [1.9, 2.8, 776.0]]|4.9|
+---------+-------+---------------------------------------------------------+---+
https://stackoverflow.com/questions/56348380
复制相似问题