首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >电火花approxQuantile函数

电火花approxQuantile函数
EN

Stack Overflow用户
提问于 2017-07-24 18:43:08
回答 4查看 49K关注 0票数 12

我有这些列为idpricetimestamp的数据。

我希望找到按id分组的中值。

我正在使用这个代码来找到它,但是它给了我这个错误。

代码语言:javascript
运行
复制
from pyspark.sql import DataFrameStatFunctions as statFunc
windowSpec = Window.partitionBy("id")
median = statFunc.approxQuantile("price",
                                 [0.5],
                                 0) \
                 .over(windowSpec)

return df.withColumn("Median", median)

难道不能使用DataFrameStatFunctions来填充新列中的值吗?

代码语言:javascript
运行
复制
TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)
EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2017-08-04 11:58:56

实际上,不可能使用approxQuantile来填充新的dataframe列中的值,但这并不是获得此错误的原因。不幸的是,整个底层故事是一个相当令人沮丧的故事,就像我争论过一样,很多星火(特别是PySpark)特性以及它们缺乏足够的文档。

首先,没有一个方法,而是两个 approxQuantile方法;第一个是标准DataFrame类的一部分,即不需要导入DataFrameStatFunctions:

代码语言:javascript
运行
复制
spark.version
# u'2.1.1'

sampleData = [("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)]

df = spark.createDataFrame(sampleData, schema=["Name","Role","Salary"])
df.show()
# +------+---------+------+ 
# |  Name|     Role|Salary|
# +------+---------+------+
# |   bob|Developer|125000| 
# |  mark|Developer|108000|
# |  carl|   Tester| 70000|
# | peter|Developer|185000|
# |   jon|   Tester| 65000|
# | roman|   Tester| 82000|
# | simon|Developer| 98000|
# |  eric|Developer|144000|
# |carlos|   Tester| 75000|
# | henry|Developer|110000|
# +------+---------+------+

med = df.approxQuantile("Salary", [0.5], 0.25) # no need to import DataFrameStatFunctions
med
# [98000.0]

第二个DataFrameStatFunctions的一部分,但是如果您像使用它一样使用它,您将得到报告的错误:

代码语言:javascript
运行
复制
from pyspark.sql import DataFrameStatFunctions as statFunc
med2 = statFunc.approxQuantile( "Salary", [0.5], 0.25)
# TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)

因为正确的用法是

代码语言:javascript
运行
复制
med2 = statFunc(df).approxQuantile( "Salary", [0.5], 0.25)
med2
# [82000.0]

虽然您将无法在PySpark文档中找到有关此问题的简单示例(我花了一些时间自己解决这个问题).最棒的部分是什么?这两个值是,而不是。

代码语言:javascript
运行
复制
med == med2
# False

我怀疑这是由于所使用的非确定性算法(毕竟,它应该是一个近似的中值),即使您用相同的玩具数据重新运行命令,您也可能得到不同的值(与我在这里报告的值不同)--我建议进行一些实验以获得这种感觉.

但是,正如我已经说过的,这并不是为什么不能使用approxQuantile填充新的dataframe列中的值的原因--即使使用正确的语法,也会得到不同的错误:

代码语言:javascript
运行
复制
df2 = df.withColumn('median_salary', statFunc(df).approxQuantile( "Salary", [0.5], 0.25))
# AssertionError: col should be Column

在这里,col引用了withColumn操作的第二个参数,即approxQuantile操作,错误消息说它不是Column类型-实际上,它是一个列表:

代码语言:javascript
运行
复制
type(statFunc(df).approxQuantile( "Salary", [0.5], 0.25))
# list

因此,在填充列值时,Spark需要Column类型的参数,您不能使用lists;下面是创建一个新列的示例,该列的每个角色的平均值而不是中间值:

代码语言:javascript
运行
复制
import pyspark.sql.functions as func
from pyspark.sql import Window

windowSpec = Window.partitionBy(df['Role'])
df2 = df.withColumn('mean_salary', func.mean(df['Salary']).over(windowSpec))
df2.show()
# +------+---------+------+------------------+
# |  Name|     Role|Salary|       mean_salary| 
# +------+---------+------+------------------+
# |  carl|   Tester| 70000|           73000.0| 
# |   jon|   Tester| 65000|           73000.0|
# | roman|   Tester| 82000|           73000.0|
# |carlos|   Tester| 75000|           73000.0|
# |   bob|Developer|125000|128333.33333333333|
# |  mark|Developer|108000|128333.33333333333| 
# | peter|Developer|185000|128333.33333333333| 
# | simon|Developer| 98000|128333.33333333333| 
# |  eric|Developer|144000|128333.33333333333|
# | henry|Developer|110000|128333.33333333333| 
# +------+---------+------+------------------+

这是因为与approxQuantile相反,mean返回一个Column

代码语言:javascript
运行
复制
type(func.mean(df['Salary']).over(windowSpec))
# pyspark.sql.column.Column
票数 55
EN

Stack Overflow用户

发布于 2018-08-10 17:16:00

按组计算分位数(聚合)示例

由于组缺少聚合函数,因此我将添加一个按名称构造函数调用的示例(本例中为percentile_approx):

代码语言:javascript
运行
复制
from pyspark.sql.column import Column, _to_java_column, _to_seq

def from_name(sc, func_name, *params):
    """
       create call by function name 
    """
    callUDF = sc._jvm.org.apache.spark.sql.functions.callUDF
    func = callUDF(func_name, _to_seq(sc, *params, _to_java_column))
    return Column(func)

percentile_approx中应用groupBy函数:

代码语言:javascript
运行
复制
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

# build percentile_approx function call by name: 
target = from_name(sc, "percentile_approx", [f.col("salary"), f.lit(0.95)])


# load dataframe for persons data 
# with columns "person_id", "group_id" and "salary"
persons = spark.read.parquet( ... )

# apply function for each group
persons.groupBy("group_id").agg(
    target.alias("target")).show()
票数 9
EN

Stack Overflow用户

发布于 2020-05-20 10:25:15

如果您可以使用聚合而不是窗口函数,那么也可以选择使用pandas_udf。不过,它们的速度不如纯火花那么快。下面是来自文档的一个适合的示例

代码语言:javascript
运行
复制
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "price")
)

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def median_udf(v):
    return v.median()

df.groupby("id").agg(median_udf(df["price"])).show()
票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45287832

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档