我刚开始使用Spark并使用JSON,我很难做一些相当简单的事情(我认为)。我试过用一些类似问题的解决方案,但不能完全正确。我现在有一个Spark,它有几个列表示变量。每一行都是变量值的唯一组合。然后,我有一个应用于每一行的UDF,该行接受每一列作为输入,进行一些分析,并将汇总表输出为每一行的JSON字符串,并将这些结果保存在表的新列中。一些小样本数据如下:
+------+-----+------+-------------------------------------------------------------------
|Var 1 |Var 2|Var 3 |JSON Table
+------+------------+-------------------------------------------------------------------
|True |10% |200 |[{"Out_1": "Mean", "Out_2": "25"}, {"Out_1": "Median", "Out_2": "21"}]
|False |15% |150 |[{"Out_1": "Mean", "Out_2": "19"}, {"Out_1": "Median", "Out_2": "18"}]
|True |12% |100 |[{"Out_1": "Mean", "Out_2": "22"}, {"Out_1": "Median", "Out_2": "20"}]
我想将其转换为以下格式:
+------+-----+------+------+-----+
|Var 1 |Var 2|Var 3 |Out_1 |Out_2|
+------+------------+------+-----+
|True |10% |200 |Mean |25 |
|True |10% |200 |Median|21 |
|False |15% |150 |Mean |19 |
|False |15% |150 |Median|18 |
|True |12% |100 |Mean |22 |
|True |12% |100 |Median|20 |
实际上,有更多的变量、数百万行和更大的JSON字符串具有更多的输出,但是核心问题仍然是一样的。基本上,我已经尝试获取JSON模式并使用from_json,如下所示:
from pyspark.sql.functions import *
from pyspark.sql.types import *
schema = spark.read.json(df.rdd.map(lambda row: row["JSON Table"])).schema
df = df\
.withColumn("JSON Table", from_json("JSON Table", schema))\
.select(col('*'), col('JSON Table.*'))\
df.show()
这似乎正确地获得了JSON结构(尽管每个值都被读取为一个字符串,尽管大多数是整数),但是结果的dataframe是空的,尽管有正确的列标题。对如何处理这件事有什么建议吗?
发布于 2020-04-15 00:12:34
假设您的JSON table
列是json string
.您可以显式设置您的schema
,explode(from_json)
,然后select
您的列。
df.show() #sample dataframe
+-----+-----+-----+----------------------------------------------------------------------+
|Var 1|Var 2|Var 3|JSON Table |
+-----+-----+-----+----------------------------------------------------------------------+
|true |10% |200 |[{"Out_1": "Mean", "Out_2": "25"}, {"Out_1": "Median", "Out_2": "21"}]|
|false|15% |150 |[{"Out_1": "Mean", "Out_2": "19"}, {"Out_1": "Median", "Out_2": "18"}]|
|true |12% |100 |[{"Out_1": "Mean", "Out_2": "22"}, {"Out_1": "Median", "Out_2": "20"}]|
+-----+-----+-----+----------------------------------------------------------------------+
#sample schema
#root
#|-- Var 1: boolean (nullable = true)
#|-- Var 2: string (nullable = true)
#|-- Var 3: long (nullable = true)
#|-- JSON Table: string (nullable = true)
from pyspark.sql import functions as F
from pyspark.sql.types import *
schema = ArrayType(MapType(StringType(),StringType()))
df.withColumn("JSON Table", F.explode(F.from_json("JSON Table", schema)))\
.select("Var 1","Var 2","Var 3","JSON Table.Out_1","JSON Table.Out_2").show()
+-----+-----+-----+------+-----+
|Var 1|Var 2|Var 3| Out_1|Out_2|
+-----+-----+-----+------+-----+
| true| 10%| 200| Mean| 25|
| true| 10%| 200|Median| 21|
|false| 15%| 150| Mean| 19|
|false| 15%| 150|Median| 18|
| true| 12%| 100| Mean| 22|
| true| 12%| 100|Median| 20|
+-----+-----+-----+------+-----+
https://stackoverflow.com/questions/61218462
复制相似问题