我有一个包含多列(大约30个)嵌套结构的pyspark dataframe,我想把它们写到csv中。(结构
为了做到这一点,我想把所有的struct列都串起来。
我在这里检查了几个答案:
Pyspark converting an array of struct into string
PySpark: DataFrame - Convert Struct to Array
PySpark convert struct field inside array to string
这是我的数据帧的结构(大约有30个复杂的键):
root  
 |-- 1_simple_key: string (nullable = true)  
 |-- 2_simple_key: string (nullable = true)  
 |-- 3_complex_key: struct (nullable = true)  
 |    |-- n1: string (nullable = true)  
 |    |-- n2: struct (nullable = true)  
 |    |    |-- n3: boolean (nullable = true)  
 |    |    |-- n4: boolean (nullable = true)  
 |    |    |-- n5: boolean (nullable = true)  
 |    |-- n6: long (nullable = true)  
 |    |-- n7: long (nullable = true)  
 |-- 4_complex_key: struct (nullable = true)  
 |    |-- n1: string (nullable = true)  
 |    |-- n2: struct (nullable = true)  
 |    |    |-- n3: boolean (nullable = true)  
 |    |    |-- n4: boolean (nullable = true)  
 |    |    |-- n5: boolean (nullable = true)  
 |    |-- n6: long (nullable = true)  
 |    |-- n7: long (nullable = true)  
 |-- 5_complex_key: struct (nullable = true)  
 |    |-- n1: string (nullable = true)  
 |    |-- n2: struct (nullable = true)  
 |    |    |-- n3: boolean (nullable = true)  
 |    |    |-- n4: boolean (nullable = true)  
 |    |    |-- n5: boolean (nullable = true)  
 |    |-- n6: long (nullable = true)  
 |    |-- n7: long (nullable = true)  建议的解决方案是针对单个列的,我不能将其应用于多个列。
我想做这样的事情:
每个struct_column:
我不介意为它创建一个额外的数据帧。我只需要为csv编写做好准备。
最小可重现示例:
from pyspark.sql import Row
d = d = {'1_complex_key': {0: Row(type='1_complex_key', s=Row(n1=False, n2=False, n3=True), x=954, y=238), 1: Row(type='1_complex_key', s=Row(n1=False, n2=False, n3=True), x=956, y=250), 2: Row(type='1_complex_key', s=Row(n1=True, n2=False, n3=False), x=886, y=269)}, '2_complex_key': {0: Row(type='2_complex_key', s=Row(n1=False, n2=False, n3=True), x=901, y=235), 1: Row(type='2_complex_key', s=Row(n1=False, n2=False, n3=True), x=905, y=249), 2: Row(type='2_complex_key', s=Row(n1=False, n2=False, n3=True), x=868, y=270)}, '3_complex_key': {0: Row(type='3_complex_key', s=Row(n1=True, n2=False, n3=False), x=925, y=197), 1: Row(type='3_complex_key', s=Row(n1=False, n2=False, n3=True), x=928, y=206), 2: Row(type='3_complex_key', s=Row(n1=False, n2=False, n3=True), x=883, y=236)}}
df = pd.DataFrame.from_dict(d)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
s_df = spark.createDataFrame(df)
s_df.printSchema()
s_df.write.csv('it_doesnt_write.csv')


所以-总结一下:我有一个spark数据帧,我想写到CSV。我无法将其写入CSV,因为:
'CSV data source does not support struct<s:struct<n1:boolean,n2:boolean,n3:boolean>,type:string,x:bigint,y:bigint> data type.;'因此,我希望对此数据帧执行一些操作/可逆转换,以便可以将其写入CSV,然后从CSV中读取它,并使其成为具有相同模式的spark数据帧。
我该怎么做呢?谢谢
发布于 2019-11-03 12:30:11
正如pault已经在评论中提到的,您需要一个列表理解能力。这样的列表理解需要列的列表和将这些列转换为字符串的函数。我将使用df.columns和to_json,但是您也可以提供您自己的列名称的python列表和一个定制函数来对复杂的列进行字符串处理。
#this converts all columns to json strings
#and writes it as to disk
s_df.select([F.to_json(x) for x in s_df.columns]).coalesce(1).write.csv('/tmp/testcsv')如果您不想将to_json应用于所有列,您可以像这样简单地修改它:
list4tojson = ['2_complex_key', '3_complex_key']
s_df.select('1_complex_key', *[F.to_json(x) for x in list4tojson]).coalesce(1).write.csv('/tmp/testcsv')您可以使用from_json恢复数据帧
df = spark.read.csv('/tmp/testcsv')
df.printSchema()
#root
# |-- _c0: string (nullable = true)
# |-- _c1: string (nullable = true)
# |-- _c2: string (nullable = true)
#interfering the schema
json_schema = spark.read.json(df.rdd.map(lambda row: row._c0)).schema
df.select([F.from_json(x, json_schema) for x in df.columns] ).printSchema()
#root
# |-- jsontostructs(_c0): struct (nullable = true)
# |    |-- s: struct (nullable = true)
# |    |    |-- n1: boolean (nullable = true)
# |    |    |-- n2: boolean (nullable = true)
# |    |    |-- n3: boolean (nullable = true)
# |    |-- type: string (nullable = true)
# |    |-- x: long (nullable = true)
# |    |-- y: long (nullable = true)
# |-- jsontostructs(_c1): struct (nullable = true)
# |    |-- s: struct (nullable = true)
# |    |    |-- n1: boolean (nullable = true)
# |    |    |-- n2: boolean (nullable = true)
# |    |    |-- n3: boolean (nullable = true)
# |    |-- type: string (nullable = true)
# |    |-- x: long (nullable = true)
# |    |-- y: long (nullable = true)
# |-- jsontostructs(_c2): struct (nullable = true)
# |    |-- s: struct (nullable = true)
# |    |    |-- n1: boolean (nullable = true)
# |    |    |-- n2: boolean (nullable = true)
# |    |    |-- n3: boolean (nullable = true)
# |    |-- type: string (nullable = true)
# |    |-- x: long (nullable = true)
# |    |-- y: long (nullable = true)如果您只想以一种可读的格式存储数据,您可以通过直接将其写入json来避免上述所有代码:
s_df.coalesce(1).write.json('/tmp/testjson')
df = spark.read.json('/tmp/testjson')https://stackoverflow.com/questions/58595189
复制相似问题