首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >有没有办法在pyspark中逐个访问数组(Struct)中的多个JSON对象

有没有办法在pyspark中逐个访问数组(Struct)中的多个JSON对象
EN

Stack Overflow用户
提问于 2019-06-05 02:35:07
回答 2查看 951关注 0票数 0

我对pyspark和json解析还是个新手,我被困在了某些特定的场景中。让我先解释一下我要做什么,我有一个json文件,其中有一个数据元素,这个数据元素是一个包含另外两个json对象的数组。给定的json文件如下所示

代码语言:javascript
复制
 {
    "id": "da20d14c.92ba6",
    "type": "Data Transformation Node",
    "name": "",
    "topic": "",
    "x": 380,
    "y": 240,
    "typeofoperation":"join",
    "wires": [
        ["da20d14c.92ba6","da20d14c.93ba6"]
    ],
 "output":true, 
 "data":[
      {
         "metadata_id":"3434",
         "id":"1",
         "first_name":"Brose",
         "last_name":"Eayres",
         "email":"beayres0@archive.org",
         "gender":"Male",
         "postal_code":null
      },
      {
         "metadata_id":"3434",
         "id":"2",
         "first_name":"Brose",
         "last_name":"Eayres",
         "email":"beayres0@archive.org",
         "gender":"Male",
         "postal_code":null
      }
   ]

 }

现在我要做的是一个接一个地迭代那个数据数组:意思是迭代到json的第一个对象,将其存储到一个dataframe中,然后迭代到第二个对象,并将其存储到另一个dataframe中,然后对它们进行完全连接或任何类型的连接。(这是可能的吗)

如果是,如何在pyspark中做到这一点。到目前为止,我所做的是

试图分解它,但数据是一次性分解的,而不是一个一个地分解

代码语言:javascript
复制
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

sc = SparkContext.getOrCreate()

dataFrame = spark.read.option("multiline", "true").json("nodeWithTwoRddJoin.json")

dataNode = dataFrame.select(explode("data").alias("Data_of_node"))

dataNode.show()

但是上面的代码给了我一个集合数据集。比我以前用的

代码语言:javascript
复制
firstDataSet = dataNode.collect()[0]
secondDataSet =  dataNode.collect()[1] 

这些行给了我一行,但我不能将其返回到dataframe。任何建议和解决方案

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-06-05 03:57:03

您需要在数据帧的每一行上应用一个映射,该映射将其中一列的内容拆分为两个新列。将结果分成两个数据帧之后就很简单了。为此,我使用了一个简单的函数,它从数组中返回所需的索引:

代码语言:javascript
复制
def splitArray(array, pos):
    return array[pos]

您可以像这样应用此函数:

代码语言:javascript
复制
import pyspark.sql.functions as f

mapped = dataFrame.select(
    splitArray(f.col('data'), 0).alias('first'),
    splitArray(f.col('data'), 1).alias('second'))

(我使用了内置的列功能来选择数据列。我不确定是否有更优雅的方法来实现这一点。)

结果是:

代码语言:javascript
复制
+-----------------------------------------------------+-----------------------------------------------------+
|first                                                |second                                               
|
+-----------------------------------------------------+-----------------------------------------------------+
|[beayres0@archive.org, Brose, Male, 1, Eayres, 3434,]|[beayres0@archive.org, Brose, Male, 2, Eayres, 3434,]|
+-----------------------------------------------------+-----------------------------------------------------+

要将列放在不同的dfs中,只需选择它们:

代码语言:javascript
复制
firstDataSet = mapped.select('first')
secondDataSet =  mapped.select('second)
票数 1
EN

Stack Overflow用户

发布于 2019-06-05 03:14:07

这至少将它们放在两个数据帧中

代码语言:javascript
复制
from pyspark.sql.functions import monotonically_increasing_id

df_with_id = dataNode.withColumn("id",monotonically_increasing_id())

max_id = df_with_id.agg({"id": "max"}).collect()[0]["max(id)"]


first_df = df_with_id.where("id = {maxid}".format(maxid=max_id))
second_df = df_with_id.where("id != {maxid}".format(maxid=max_id))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56449520

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档