我有这样的JSON数据:
{
"data": [
{
"id": "4619623",
"team": "452144",
"created_on": "2018-10-09 02:55:51",
"links": {
"edit": "https://some_page",
"publish": "https://some_publish",
"default": "https://some_default"
}
},
{
"id": "4619600",
"team": "452144",
"created_on": "2018-10-09 02:42:25",
"links": {
"edit": "https://some_page",
"publish": "https://some_publish",
"default": "https://some_default"
}
}
}
我使用Apache spark读取此数据,并希望按id列对其进行分区写入。当我使用这个的时候:df.write.partitionBy("data.id").json(<path_to_folder>)
我将得到错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Partition column data.id not found in schema
我也试着像这样使用分解函数:
import org.apache.spark.sql.functions.{col, explode}
val renamedDf= df.withColumn("id", explode(col("data.id")))
renamedDf.write.partitionBy("id").json(<path_to_folder>)
这实际上很有帮助,但是每个id分区文件夹都包含相同的原始JSON文件。
编辑: df DataFrame的模式:
|-- data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- created_on: string (nullable = true)
| | |-- id: string (nullable = true)
| | |-- links: struct (nullable = true)
| | | |-- default: string (nullable = true)
| | | |-- edit: string (nullable = true)
| | | |-- publish: string (nullable = true)
renamedDf DataFrame的架构:
|-- data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- created_on: string (nullable = true)
| | |-- id: string (nullable = true)
| | |-- links: struct (nullable = true)
| | | |-- default: string (nullable = true)
| | | |-- edit: string (nullable = true)
| | | |-- publish: string (nullable = true)
|-- id: string (nullable = true)
我使用的是spark 2.1.0
我找到了这个解决方案:DataFrame partitionBy on nested columns
下面是这个例子:http://bigdatums.net/2016/02/12/how-to-extract-nested-json-data-in-spark/
但是这些都没有帮助我解决我的问题。
感谢andvance的帮助。
发布于 2018-10-12 22:34:41
尝试以下代码:
val renamedDf = df
.select(explode(col("data")) as "x" )
.select($"x.*")
renamedDf.write.partitionBy("id").json(<path_to_folder>)
发布于 2018-10-15 05:43:19
您只是在初始分解之后缺少一条select语句
val df = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json("/FileStore/tables/test.json")
df.printSchema
root
|-- data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- created_on: string (nullable = true)
| | |-- id: string (nullable = true)
| | |-- links: struct (nullable = true)
| | | |-- default: string (nullable = true)
| | | |-- edit: string (nullable = true)
| | | |-- publish: string (nullable = true)
| | |-- team: string (nullable = true)
import org.apache.spark.sql.functions.{col, explode}
val df1= df.withColumn("data", explode(col("data")))
df1.printSchema
root
|-- data: struct (nullable = true)
| |-- created_on: string (nullable = true)
| |-- id: string (nullable = true)
| |-- links: struct (nullable = true)
| | |-- default: string (nullable = true)
| | |-- edit: string (nullable = true)
| | |-- publish: string (nullable = true)
| |-- team: string (nullable = true)
val df2 = df1.select("data.created_on","data.id","data.team","data.links")
df2.show
+-------------------+-------+------+--------------------+
| created_on| id| team| links|
+-------------------+-------+------+--------------------+
|2018-10-09 02:55:51|4619623|452144|[https://some_def...|
|2018-10-09 02:42:25|4619600|452144|[https://some_def...|
+-------------------+-------+------+--------------------+
df2.write.partitionBy("id").json("FileStore/tables/test_part.json")
val f = spark.read.json("/FileStore/tables/test_part.json/id=4619600")
f.show
+-------------------+--------------------+------+
| created_on| links| team|
+-------------------+--------------------+------+
|2018-10-09 02:42:25|[https://some_def...|452144|
+-------------------+--------------------+------+
val full = spark.read.json("/FileStore/tables/test_part.json")
full.show
+-------------------+--------------------+------+-------+
| created_on| links| team| id|
+-------------------+--------------------+------+-------+
|2018-10-09 02:55:51|[https://some_def...|452144|4619623|
|2018-10-09 02:42:25|[https://some_def...|452144|4619600|
+-------------------+--------------------+------+-------+
https://stackoverflow.com/questions/52780898
复制相似问题