我对Spark完全陌生,我正在编写一条管道,以执行一些转换为审计列表。
我的数据示例:
{
"id": 932522712299,
"ticket_id": 12,
"created_at": "2020-02-14T19:05:16Z",
"author_id": 392401450482,
"events": ["{\"id\": 932522713292, \"type\": \"VoiceComment\", \"public\": false, \"data\": {\"from\": \"11987654321\", \"to\": \"+1987644\"}"],
}我的模式基本上是:
root
|-- id: long (nullable = true)
|-- ticket_id: long (nullable = true)
|-- created_at: string (nullable = true)
|-- author_id: long (nullable = true)
|-- events: array (nullable = true)
| |-- element: string (containsNull = true)我的转换有几个步骤:
。
基本上,事件数组中的每个对象都是一个字符串JSON,因为每种类型都有不同的结构--它们之间唯一的公共属性是type。
我已经达到了我的目标,通过使用以下代码将我的数据转换为dict,做了一些糟糕的工作:
audits = list(map(lambda row: row.asDict(), df.collect()))`
comments = []
for audit in audits:
base_info = {'ticket_id': audit['ticket_id'], 'created_at': audit['created_at'], 'author_id': audit['author_id']}
audit['events'] = [json.loads(x) for x in audit['events']]
audit_comments = [
{**x, **base_info}
for x in audit['events']
if x['type'] == "Comment" or x['type'] == "VoiceComment"
]
comments.extend(audit_comments)也许这个问题听起来很蹩脚或者很懒,但是我真的困在了一些简单的事情上,比如:
任何帮助都是非常感谢的。
发布于 2020-02-17 14:36:10
由于events数组元素对所有行都没有相同的结构,所以可以将其转换为Map(String, String)。
使用from_json函数和模式MapType(StringType(), StringType())
df = df.withColumn("events", explode("events"))\
.withColumn("events", from_json(col("events"), MapType(StringType(), StringType())))然后,使用element_at (火花2.4+),您可以获得如下所示的type:
df = df.withColumn("event_type", element_at(col("events"), "type"))
df.printSchema()
#root
#|-- author_id: long (nullable = true)
#|-- created_at: string (nullable = true)
#|-- events: map (nullable = true)
#| |-- key: string
#| |-- value: string (valueContainsNull = true)
#|-- id: long (nullable = true)
#|-- ticket_id: long (nullable = true)
#|-- event_type: string (nullable = true)现在,您可以筛选并选择作为普通列:
df.filter(col("event_type") == lit("VoiceComment")).show(truncate=False)
#+------------+--------------------+-----------------------------------------------------------------------------------------------------------+------------+---------+------------+
#|author_id |created_at |events |id |ticket_id|event_type |
#+------------+--------------------+-----------------------------------------------------------------------------------------------------------+------------+---------+------------+
#|392401450482|2020-02-14T19:05:16Z|[id -> 932522713292, type -> VoiceComment, public -> false, data -> {"from":"11987654321","to":"+1987644"}]|932522712299|12 |VoiceComment|
#+------------+--------------------+-----------------------------------------------------------------------------------------------------------+------------+---------+------------+发布于 2020-02-17 12:08:15
您的代码将把完整的事件数据加载到主节点上,主节点已经提交了作业。处理数据的火花方式希望您创建一个地图减少作业。这方面有多个api --它们为作业创建一个DAG,并且该计划只有在调用特定函数(如head or show )时才会显示出来。这样的作业将分发给集群中的所有机器。
当使用dataframe时,使用pyspark.sql.functions可以做很多事情。
使用spark.sql dataframe进行相同的转换
import pyspark.sql.functions as F
df = df.withColumn('event', F.explode(df.events)).drop(df.events)
df = df.withColumn('event', F.from_json(df.event, 'STRUCT <id: INT, type: STRING, public: Boolean, data: STRUCT<from: STRING, to: STRING>>'))
events = df.where('event.type = "Comment" OR event.type == "VoiceComment"')
events.printSchema()
events.head(100)当数据不能用sql表达式处理时,您可以实现一个普通用户定义的函数- UDF或Pandas UDF。
https://stackoverflow.com/questions/60259682
复制相似问题