我意识到我可能需要添加更多的细节。假设我在一个数据帧中有2列。两个都是字符串,一个是ID,另一个是json字符串。
这可以在下面构造:
>>> a1 = [{"a": 1, "b": "[{\"h\": 3, \"i\": 5} ,{\"h\": 4, \"i\": 6}]" },
... {"a": 1, "b": "[{\"h\": 6, \"i\": 10},{\"h\": 8, \"i\": 12}]"}]
>>> df1 = sqlContext.read.json(sc.parallelize(a1))
>>> df1.show()
+---+--------------------+
| a| b|
+---+--------------------+
| 1|[{"h": 3, "i": 5}...|
| 1|[{"h": 6, "i": 10...|
+---+--------------------+
>>> df1.printSchema()
root
|-- a: long (nullable = true)
|-- b: string (nullable = true)
注意,json代码是StringType。我想写一个函数来创建一个新的列,它将数据存储为嵌套表,如下所示:
root
|-- a: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- h: long (nullable = true)
| | |-- i: long (nullable = true)
我使用的是1.6,因此我没有to_json cast函数。我试过这样做
>>> df1.withColumn('new', get_json_object(df1.b,'$')).show()
+---+--------------------+--------------------+
| a| b| new|
+---+--------------------+--------------------+
| 1|[{"h": 3, "i": 5}...|[{"h":3,"i":5},{"...|
| 1|[{"h": 6, "i": 10...|[{"h":6,"i":10},{...|
+---+--------------------+--------------------+
问题是创建的新列仍然是一个字符串。:(
发布于 2019-01-07 08:29:51
我可以使用map函数来解决这个问题:
a1 = [{"a": 1, "b": "[{\"h\": 3, \"i\": 5} ,{\"h\": 4, \"i\": 6}]"},{"a": 1, "b": "[{\"h\": 6, \"i\": 10},{\"h\": 8, \"i\": 12}]"}]
df1 = sqlContext.read.json(sc.parallelize(a1))
rdd = df1.map(lambda x: x.b)
df2 = sqlContext.read.json(rdd)
>>> df2.printSchema()
root
|-- h: long (nullable = true)
|-- i: long (nullable = true)
问题是我失去了其他的列:
+---+---+
| h| i|
+---+---+
| 3| 5|
| 4| 6|
| 6| 10|
| 8| 12|
+---+---+
因此,我尝试使用withColumn数据帧函数,创建一个udf来显式地将其转换为json。这就是问题所在,withColumn似乎不能与json对象一起工作。
我的另一种选择是编写一个函数来组合前两列,如下所示:
# This is a 2.7 workaroud, all string read from configuration file for some reason are converted
# to unicode. This issue does not appear to impact v3.6 and above
def convert_dict(mydict):
return {k.encode('ascii', 'ignore'): str(v).encode('ascii','ignore') for k, v in mydict.iteritems()}
rdd = df1.map(lambda x: {'a': x.a, 'b': [convert_dict(y) for y in json.loads(x.b)]})
df2 = sqlContext.read.json(rdd)
>>> df2.printSchema()
root
|-- a: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- h: string (nullable = true)
| | |-- i: string (nullable = true)
https://stackoverflow.com/questions/53848908
复制相似问题