首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >在PySpark中对GroupedData应用UDF(带功能python示例)

在PySpark中对GroupedData应用UDF(带功能python示例)
EN

Stack Overflow用户
提问于 2016-10-13 03:01:10
回答 3查看 53.7K关注 0票数 43

我有一段python代码,它在本地运行在一个pandas数据帧中:

代码语言:javascript
复制
df_result = pd.DataFrame(df
                          .groupby('A')
                          .apply(lambda x: myFunction(zip(x.B, x.C), x.name))

我想在PySpark中运行它,但在处理pyspark.sql.group.GroupedData对象时遇到了问题。

我尝试过以下几种方法:

代码语言:javascript
复制
sparkDF
 .groupby('A')
 .agg(myFunction(zip('B', 'C'), 'A')) 

它会返回

代码语言:javascript
复制
KeyError: 'A'

我假设是因为'A‘不再是一个列,并且我找不到与x.name对应的列。

然后

代码语言:javascript
复制
sparkDF
 .groupby('A')
 .map(lambda row: Row(myFunction(zip('B', 'C'), 'A'))) 
 .toDF()

但会得到以下错误:

代码语言:javascript
复制
AttributeError: 'GroupedData' object has no attribute 'map'

如有任何建议,我们将不胜感激!

EN

回答 3

Stack Overflow用户

发布于 2017-11-26 23:35:57

从Spark2.3开始,你可以使用pandas_udfGROUPED_MAP接受Callable[[pandas.DataFrame], pandas.DataFrame],或者换句话说,是一个从与输入形状相同的Pandas DataFrame映射到输出DataFrame的函数。

例如,如果数据如下所示:

代码语言:javascript
复制
df = spark.createDataFrame(
    [("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
    ("key", "value1", "value2")
)

要计算value1 value2之间的成对最小值的平均值,您必须定义输出模式:

代码语言:javascript
复制
from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_min", DoubleType())
])

pandas_udf

代码语言:javascript
复制
import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    result = pd.DataFrame(df.groupby(df.key).apply(
        lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean()
    ))
    result.reset_index(inplace=True, drop=False)
    return result

并应用它:

代码语言:javascript
复制
df.groupby("key").apply(g).show()
代码语言:javascript
复制
+---+-------+
|key|avg_min|
+---+-------+
|  b|   -1.5|
|  a|   -0.5|
+---+-------+

除了模式定义和装饰器之外,您当前的Pandas代码可以按原样应用。

从Spark 2.4.0开始,还有GROUPED_AGG变体,它采用Callable[[pandas.Series, ...], T],其中T是一个原始标量:

代码语言:javascript
复制
import numpy as np

@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def f(x, y):
    return np.minimum(x, y).mean()

它可以与标准group_by / agg结构一起使用:

代码语言:javascript
复制
df.groupBy("key").agg(f("value1", "value2").alias("avg_min")).show()
代码语言:javascript
复制
+---+-------+
|key|avg_min|
+---+-------+
|  b|   -1.5|
|  a|   -0.5|
+---+-------+

请注意,GROUPED_MAPGROUPPED_AGG pandas_udf的行为方式都不同于UserDefinedAggregateFunctionAggregator,它更接近于具有无界框架的groupByKey或窗口函数。首先对数据进行混洗,然后才应用UDF。

为了优化执行,您应该使用implement Scala UserDefinedAggregateFunctionadd Python wrapper

另请参阅User defined function to be applied to Window in PySpark?

票数 60
EN

Stack Overflow用户

发布于 2016-10-14 04:50:36

您尝试编写的是UDAF (用户定义的聚合函数),而不是UDF (用户定义的函数)。UDAF是处理按键分组的数据的函数。具体地说,他们需要定义如何在单个分区中合并组中的多个值,然后如何跨键的分区合并结果。目前还没有办法在python中实现UDAF,它们只能在Scala中实现。

但是,您可以在Python中解决这个问题。可以使用collect set来收集已分组的值,然后使用常规UDF对它们执行所需的操作。唯一需要注意的是,collect_set只适用于原始值,因此您需要将它们编码为一个字符串。

代码语言:javascript
复制
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, collect_list, concat_ws, udf

def myFunc(data_list):
    for val in data_list:
        b, c = data.split(',')
        # do something

    return <whatever>

myUdf = udf(myFunc, StringType())

df.withColumn('data', concat_ws(',', col('B'), col('C'))) \
  .groupBy('A').agg(collect_list('data').alias('data'))
  .withColumn('data', myUdf('data'))

如果要执行重复数据消除,请使用collect_set。此外,如果您的一些键有很多值,这将会很慢,因为一个键的所有值都需要收集在集群上的某个单一分区中。如果最终结果是通过以某种方式组合每个键的值(例如求和)构建的值,那么使用RDD aggregateByKey方法实现它可能会更快,该方法允许您在打乱数据之前为分区中的每个键构建中间值。

编辑:2018年11月21日

由于这个答案已经写好了,pyspark增加了对使用Pandas的UDAF的支持。在使用Panda的UDF和UDAF而不是直接使用python函数和RDDs时,有一些很好的性能改进。在幕后,它对列进行矢量化(将多行中的值批处理在一起,以优化处理和压缩)。看看here可以获得更好的解释,或者看看下面user6910411的答案作为一个例子。

票数 52
EN

Stack Overflow用户

发布于 2018-07-11 04:47:25

我将扩展上面的答案。

因此,您可以使用@pandas_udf在pyspark中实现相同的逻辑,如pandas.groupby().apply,这是一种矢量化方法,比简单的udf更快。

代码语言:javascript
复制
from pyspark.sql.functions import pandas_udf,PandasUDFType

df3 = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_value1", DoubleType()),
    StructField("avg_value2", DoubleType()),
    StructField("sum_avg", DoubleType()),
    StructField("sub_avg", DoubleType())
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    gr = df['key'].iloc[0]
    x = df.value1.mean()
    y = df.value2.mean()
    w = df.value1.mean() + df.value2.mean()
    z = df.value1.mean() - df.value2.mean()
    return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])

df3.groupby("key").apply(g).show()

您将得到以下结果:

代码语言:javascript
复制
+---+----------+----------+-------+-------+
|key|avg_value1|avg_value2|sum_avg|sub_avg|
+---+----------+----------+-------+-------+
|  b|       6.5|      -1.5|    5.0|    8.0|
|  a|       0.0|      21.0|   21.0|  -21.0|
+---+----------+----------+-------+-------+

因此,您可以在分组data.and中的其他字段之间进行更多的计算,并以列表形式将它们添加到数据帧中。

票数 17
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40006395

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档