首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >来自另一个rdd和dataframe的rdd

来自另一个rdd和dataframe的rdd
EN

Stack Overflow用户
提问于 2018-02-16 04:57:20
回答 1查看 277关注 0票数 0

我有一个类似于以下内容的rdd:

代码语言:javascript
运行
复制
rdd1 = sc.parallelize([('C3', ['P8', 'P3', 'P2']), ('C1', ['P1', 'P5', 'P5', 'P2']), ('C4', ['P3', 'P4']), ('C2', ['P3']), ('C5', ['P3', 'P9'])])

我有一个与以下类似的数据框架:

代码语言:javascript
运行
复制
new_df = spark.createDataFrame([
("P1", "Shirt", "Green", 25, 2000),
("P2", "Jeans", "yello", 30, 1500),
("P3", "Sweater", "Red", 35, 1000),
("P4", "Kurta", "Black", 28, 950),
("P5", "Saree", "Green", 25, 1500),
("P8", "Shirt", "Black", 32, 2500),
("P9", "Sweater", "Red", 30, 1000)
], ["Product", "Item", "Color", "Size", "Price"])

我需要从rdd1创建一个rdd,其中的值列表应该替换为来自dataframe的详细信息,例如,P8信息应该从new_df dataframe替换。我期望得到类似于以下内容的输出rdd:

代码语言:javascript
运行
复制
[('C3', [{'Price': '2500', 'Color ': 'Black', 'Size': '32', 'Item': 'Shirt'}, {'Price': '1000', 'Color ': 'Red', 'Size': '35', 'Item': 'Sweater'}, {'Price': '1500', 'Color ': 'Yellow', 'Size': '30', 'Item': 'Jeans'}]), ('C1', [{'Price': '2000', 'Color ': 'Green', 'Size': '25', 'Item': 'Shirt'}, {'Price': '1500', 'Color ': 'Green', 'Size': '25', 'Item': 'Saree'}, {'Price': '1500', 'Color ': 'Green', 'Size': '25', 'Item': 'Saree'}, {'Price': '1500', 'Color ': 'Yellow', 'Size': '30', 'Item': 'Jeans'}]), ('C4', [{'Price': '1000', 'Color ': 'Red', 'Size': '35', 'Item': 'Sweater'}, {'Price': '950', 'Color ': 'Black', 'Size': '28', 'Item': 'Kurta'}]), ('C2', [{'Price': '1000', 'Color ': 'Red', 'Size': '35', 'Item': 'Sweater'}]), ('C5', [{'Price': '1000', 'Color ': 'Red', 'Size': '35', 'Item': 'Sweater'}, {'Price': '1000', 'Color ': 'Red', 'Size': '30', 'Item': 'Sweater'}])]
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-02-16 08:35:17

您也应该将您的rdd1转换为dataframe。然后,您需要在所创建的dataframe中对产品数组进行explode,以便可以使用公共的Product列来join这两个数据格式。然后,您可以将new_df的联合列转换为json,并只选择所需的列。最后一步是像原来的rdd1那样分组并收集json字符串。

代码语言:javascript
运行
复制
from pyspark.sql import functions as F
dataframe = sqlContext.createDataFrame(rdd1, ['id', 'Product'])\
    .withColumn('Product', F.explode(F.col('Product')))\
    .join(new_df, ['Product'], 'left')\
    .select('id', F.to_json(F.struct(F.col('Price'), F.col('Color'), F.col('Size'), F.col('Item'))).alias('json'))\
    .groupBy('id')\
    .agg(F.collect_list('json'))

,这将使您的输出dataframe

代码语言:javascript
运行
复制
+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |collect_list(json)                                                                                                                                                                                                                  |
+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|C3 |[{"Price":1500,"Color":"yello","Size":30,"Item":"Jeans"}, {"Price":2500,"Color":"Black","Size":32,"Item":"Shirt"}, {"Price":1000,"Color":"Red","Size":35,"Item":"Sweater"}]                                                         |
|C4 |[{"Price":1000,"Color":"Red","Size":35,"Item":"Sweater"}, {"Price":950,"Color":"Black","Size":28,"Item":"Kurta"}]                                                                                                                   |
|C5 |[{"Price":1000,"Color":"Red","Size":35,"Item":"Sweater"}, {"Price":1000,"Color":"Red","Size":30,"Item":"Sweater"}]                                                                                                                  |
|C1 |[{"Price":1500,"Color":"yello","Size":30,"Item":"Jeans"}, {"Price":2000,"Color":"Green","Size":25,"Item":"Shirt"}, {"Price":1500,"Color":"Green","Size":25,"Item":"Saree"}, {"Price":1500,"Color":"Green","Size":25,"Item":"Saree"}]|
|C2 |[{"Price":1000,"Color":"Red","Size":35,"Item":"Sweater"}]                                                                                                                                                                           |
+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

将上面的dataframe更改为rdd只是在调用.rdd api

更新的

来自下面的评论

预期的数据应该类似于:\c3\x{e76f}地图(项目->恤,价格-> 2500,尺寸-> 32,彩色->黑色),地图(项目-> Sweater,价格-> 1000,大小-> 35,彩色->红色),地图(项目-> Jeans,价格-> 1500,尺码-> 30,彩色->黄色),只有我才能正确地将其转换为rdd。

在收集的列表中,您似乎正在寻找一个MapType,而不是一个StringType。为此,您必须编写一个udf函数

代码语言:javascript
运行
复制
from pyspark.sql import functions as F
from pyspark.sql import types as T

def mapFunction(y):
    print y
    newMap = {}
    for key, value in zip(columns, y):
        newMap.update({key: value})
    return newMap

udfFunction = F.udf(mapFunction, T.MapType(T.StringType(), T.StringType()))

并在代码中调用它,而不是to_jsonstruct函数。

代码语言:javascript
运行
复制
dataframe = sqlContext.createDataFrame(rdd1, ['id', 'Product']) \
    .withColumn('Product', F.explode(F.col('Product'))) \
    .join(new_df, ['Product'], 'left') \
    .select('id', udfFunction(F.array([F.col(x) for x in columns])).alias('json')) \
    .groupBy('id') \
    .agg(F.collect_list('json'))
dataframe.show(truncate=False)

你应该得到输出

代码语言:javascript
运行
复制
+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |collect_list(json)                                                                                                                                                                                                                                          |
+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|C3 |[Map(Item -> Jeans, Price -> 1500, Size -> 30, Color -> yello), Map(Item -> Shirt, Price -> 2500, Size -> 32, Color -> Black), Map(Item -> Sweater, Price -> 1000, Size -> 35, Color -> Red)]                                                               |
|C4 |[Map(Item -> Sweater, Price -> 1000, Size -> 35, Color -> Red), Map(Item -> Kurta, Price -> 950, Size -> 28, Color -> Black)]                                                                                                                               |
|C5 |[Map(Item -> Sweater, Price -> 1000, Size -> 35, Color -> Red), Map(Item -> Sweater, Price -> 1000, Size -> 30, Color -> Red)]                                                                                                                              |
|C1 |[Map(Item -> Jeans, Price -> 1500, Size -> 30, Color -> yello), Map(Item -> Shirt, Price -> 2000, Size -> 25, Color -> Green), Map(Item -> Saree, Price -> 1500, Size -> 25, Color -> Green), Map(Item -> Saree, Price -> 1500, Size -> 25, Color -> Green)]|
|C2 |[Map(Item -> Sweater, Price -> 1000, Size -> 35, Color -> Red)]                                                                                                                                                                                             |
+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48820304

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档