首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >PySpark:使用RDD并行化的输出和原始的RDD执行其他操作

PySpark:使用RDD并行化的输出和原始的RDD执行其他操作
EN

Stack Overflow用户
提问于 2017-08-28 17:23:01
回答 1查看 93关注 0票数 0

假设我有一个RDD。在这个RDD上,我执行一些输出输出的操作。现在,我需要这个输出(原始的RDD )来执行其他一些操作。

怎么做呢?

这是我的代码:

代码语言:javascript
运行
复制
rdd = sc.parallelize(input)
rdd1 = rdd.map(...)
...
output1 =  rdd1.collect() # output I need

output2 = rdd.map(some operations using output1)
EN

回答 1

Stack Overflow用户

发布于 2017-08-28 22:18:52

具有窗口功能:

在开始之前,让我们将rdd转换为dataframe:

代码语言:javascript
运行
复制
df = spark.createDataFrame(
    sc.parallelize(
        [['a', 1, [1, 2]], ['a', 2, [1, 1]], ['a', 3, [2, 2]], ['b', 4, [2, 2]]]
    ), ['c1', 'c2', 'c3']
)

首先,我们计算发生的情况:

代码语言:javascript
运行
复制
from pyspark.sql import Window
import pyspark.sql.functions as psf
w1 = Window.partitionBy("c1", df.c3[0])
w2 = Window.partitionBy("c1", df.c3[1])
df1 = df.select(
    "c1", "c2", "c3", 
    psf.count("*").over(w1).alias("count1"), 
    psf.count("*").over(w2).alias("count2")
)

接下来,我们发现最常见的项目:

代码语言:javascript
运行
复制
w1 = Window.partitionBy("c1").orderBy(psf.desc("count1"))
w2 = Window.partitionBy("c1").orderBy(psf.desc("count2"))
df2 = df1.select(
    "c1", "c2", "c3",
    psf.first(df1.c3[0]).over(w1).alias("most_freq1"), 
    psf.first(df1.c3[1]).over(w2).alias("most_freq2") 
)

然后,我们计算出距离

代码语言:javascript
运行
复制
df3 = df2.withColumn(
    "dist", 
    psf.sqrt((df2.most_freq1 - df2.c3[0])**2 + (df2.most_freq2 - df2.c3[1])**2)
)
df3.show()
    +---+---+------+----------+----------+----+
    | c1| c2|    c3|most_freq1|most_freq2|dist|
    +---+---+------+----------+----------+----+
    |  b|  4|[2, 2]|         2|         2| 0.0|
    |  a|  1|[1, 2]|         1|         2| 0.0|
    |  a|  3|[2, 2]|         1|         2| 1.0|
    |  a|  2|[1, 1]|         1|         2| 1.0|
    +---+---+------+----------+----------+----+
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45924349

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档