首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >火花-将JSON字符串的数组转换为Struct数组、过滤器并与root连接

火花-将JSON字符串的数组转换为Struct数组、过滤器并与root连接
EN

Stack Overflow用户
提问于 2020-02-17 09:46:00
回答 2查看 2K关注 0票数 1

我对Spark完全陌生,我正在编写一条管道,以执行一些转换为审计列表。

我的数据示例:

代码语言:javascript
复制
{
    "id": 932522712299,
    "ticket_id": 12,
    "created_at": "2020-02-14T19:05:16Z",
    "author_id": 392401450482,
    "events": ["{\"id\": 932522713292, \"type\": \"VoiceComment\", \"public\": false, \"data\": {\"from\": \"11987654321\", \"to\": \"+1987644\"}"],
}

我的模式基本上是:

代码语言:javascript
复制
root
 |-- id: long (nullable = true)
 |-- ticket_id: long (nullable = true)
 |-- created_at: string (nullable = true)
 |-- author_id: long (nullable = true)
 |-- events: array (nullable = true)
 |    |-- element: string (containsNull = true)

我的转换有几个步骤:

  • 按类型拆分事件:注释、标记、更改或更新;对于发现的每个事件,必须从根添加ticket_id、author_id和created_at;必须为每个事件类型提供一个输出。

基本上,事件数组中的每个对象都是一个字符串JSON,因为每种类型都有不同的结构--它们之间唯一的公共属性是type

我已经达到了我的目标,通过使用以下代码将我的数据转换为dict,做了一些糟糕的工作:

代码语言:javascript
复制
audits = list(map(lambda row: row.asDict(), df.collect()))`
comments = []
for audit in audits:
    base_info = {'ticket_id': audit['ticket_id'], 'created_at': audit['created_at'], 'author_id': audit['author_id']}
    audit['events'] = [json.loads(x) for x in audit['events']]

    audit_comments = [
        {**x, **base_info}
        for x in audit['events']
        if x['type'] == "Comment" or x['type'] == "VoiceComment"
    ]
    comments.extend(audit_comments)

也许这个问题听起来很蹩脚或者很懒,但是我真的困在了一些简单的事情上,比如:

  • 如何将“events”项解析为struct?
  • 如何按类型选择事件并从根中添加信息?可能使用select语法?

任何帮助都是非常感谢的。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-02-17 14:36:10

由于events数组元素对所有行都没有相同的结构,所以可以将其转换为Map(String, String)

使用from_json函数和模式MapType(StringType(), StringType())

代码语言:javascript
复制
df = df.withColumn("events", explode("events"))\
       .withColumn("events", from_json(col("events"), MapType(StringType(), StringType())))

然后,使用element_at (火花2.4+),您可以获得如下所示的type

代码语言:javascript
复制
df = df.withColumn("event_type", element_at(col("events"), "type"))

df.printSchema()

#root
 #|-- author_id: long (nullable = true)
 #|-- created_at: string (nullable = true)
 #|-- events: map (nullable = true)
 #|    |-- key: string
 #|    |-- value: string (valueContainsNull = true)
 #|-- id: long (nullable = true)
 #|-- ticket_id: long (nullable = true)
 #|-- event_type: string (nullable = true)

现在,您可以筛选并选择作为普通列:

代码语言:javascript
复制
df.filter(col("event_type") == lit("VoiceComment")).show(truncate=False)

#+------------+--------------------+-----------------------------------------------------------------------------------------------------------+------------+---------+------------+
#|author_id   |created_at          |events                                                                                                     |id          |ticket_id|event_type  |
#+------------+--------------------+-----------------------------------------------------------------------------------------------------------+------------+---------+------------+
#|392401450482|2020-02-14T19:05:16Z|[id -> 932522713292, type -> VoiceComment, public -> false, data -> {"from":"11987654321","to":"+1987644"}]|932522712299|12       |VoiceComment|
#+------------+--------------------+-----------------------------------------------------------------------------------------------------------+------------+---------+------------+
票数 2
EN

Stack Overflow用户

发布于 2020-02-17 12:08:15

您的代码将把完整的事件数据加载到主节点上,主节点已经提交了作业。处理数据的火花方式希望您创建一个地图减少作业。这方面有多个api --它们为作业创建一个DAG,并且该计划只有在调用特定函数(如head or show )时才会显示出来。这样的作业将分发给集群中的所有机器。

当使用dataframe时,使用pyspark.sql.functions可以做很多事情。

使用spark.sql dataframe进行相同的转换

代码语言:javascript
复制
import pyspark.sql.functions as F

df = df.withColumn('event', F.explode(df.events)).drop(df.events)
df = df.withColumn('event', F.from_json(df.event, 'STRUCT <id: INT, type: STRING, public: Boolean, data: STRUCT<from: STRING, to: STRING>>'))
events = df.where('event.type = "Comment" OR event.type == "VoiceComment"')

events.printSchema()
events.head(100)

当数据不能用sql表达式处理时,您可以实现一个普通用户定义的函数- UDF或Pandas UDF。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60259682

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档