我对pyspark和json解析还是个新手,我被困在了某些特定的场景中。让我先解释一下我要做什么,我有一个json文件,其中有一个数据元素,这个数据元素是一个包含另外两个json对象的数组。给定的json文件如下所示
{
"id": "da20d14c.92ba6",
"type": "Data Transformation Node",
"name": "",
"topic": "",
"x": 380,
"y": 240,
"typeofoperation":"join",
"wires": [
["da20d14c.92ba6","da20d14c.93ba6"]
],
"output":true,
"data":[
{
"metadata_id":"3434",
"id":"1",
"first_name":"Brose",
"last_name":"Eayres",
"email":"beayres0@archive.org",
"gender":"Male",
"postal_code":null
},
{
"metadata_id":"3434",
"id":"2",
"first_name":"Brose",
"last_name":"Eayres",
"email":"beayres0@archive.org",
"gender":"Male",
"postal_code":null
}
]
}
现在我要做的是一个接一个地迭代那个数据数组:意思是迭代到json的第一个对象,将其存储到一个dataframe中,然后迭代到第二个对象,并将其存储到另一个dataframe中,然后对它们进行完全连接或任何类型的连接。(这是可能的吗)
如果是,如何在pyspark中做到这一点。到目前为止,我所做的是
试图分解它,但数据是一次性分解的,而不是一个一个地分解
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
sc = SparkContext.getOrCreate()
dataFrame = spark.read.option("multiline", "true").json("nodeWithTwoRddJoin.json")
dataNode = dataFrame.select(explode("data").alias("Data_of_node"))
dataNode.show()
但是上面的代码给了我一个集合数据集。比我以前用的
firstDataSet = dataNode.collect()[0]
secondDataSet = dataNode.collect()[1]
这些行给了我一行,但我不能将其返回到dataframe。任何建议和解决方案
https://stackoverflow.com/questions/56449520
复制相似问题