首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >改进Pandas在火花放电中的应用

改进Pandas在火花放电中的应用
EN

Stack Overflow用户
提问于 2021-04-11 10:30:56
回答 1查看 678关注 0票数 2

我必须在Pyspark中的滑动窗口内执行聚合。特别是,我必须做以下工作:

aggregation

  • Sum的最后一个值,
  1. 每次考虑100天值数据,并返回结果

这些任务必须在带有.rangeBetween(-100 days, 0)的滑动窗口中计算。

我可以很容易地通过构造一个Pandas来实现这一结果,它将Pandas中的某些列作为输入,将它们转换为Pandas DataFrame,然后计算聚合并返回标量结果。然后将UDF应用于所需的滑动窗口。

尽管此解决方案工作良好,但完成任务需要很长时间(3-4小时),因为DFs包含数百万行。是否有办法改善这种运算的计算时间?我正在数据库中使用Pyspark。

我的熊猫UDF是:

代码语言:javascript
运行
复制
@pandas_udf(FloatType(), PandasUDFType.GROUPED_AGG)
def method2(analyst: pd.Series, revisions: pd.Series) -> float:
  df = pd.DataFrame({
    'analyst': analyst,
    'revisions': revisions
  })
  return df.groupby('analyst').last()['revisions'].sum() / df.groupby('analyst').last()['revisions'].abs().sum()

适用于:

代码语言:javascript
运行
复制
days = lambda x: x*60*60*24
w = Window.partitionBy('csecid').orderBy(F.col('date').cast('timestamp').cast('long')).rangeBetween(-days(100), 0)
df = df.withColumn('new_col', method2(F.col('analystid'), F.col('revisions_improved')).over(w))

编辑:我知道这种聚合可以通过使用numpy数组来实现,而PySpark UDF处理numpy结构的速度要快得多。但是,我想避免这个解决方案,因为我需要在相同的框架函数中应用,这些函数比所显示的要复杂得多,而且很难与numpy复制。

EN

回答 1

Stack Overflow用户

发布于 2021-04-15 23:24:05

最近,我不得不实现一个类似的聚合,我的第一次尝试是在滑动窗口中使用Pandas。性能很差,我使用以下方法改进了它。

collect_list 尝试使用来组合滑动窗口向量,然后将它们映射到UDF中。注意,只有当滑动窗口能够容纳工作人员内存(通常是这样)时,这才能工作。

这是我的测试代码。第一部分只是您的代码,但作为一个完整的可复制的例子。

代码语言:javascript
运行
复制
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf, PandasUDFType, udf
from pyspark.sql.types import FloatType, StructType, StructField, IntegerType, StringType

df = spark.createDataFrame(
  [(1, "2021-04-01", 10, -30),
   (1, "2021-03-01", 10, 20),
   (1, "2021-02-01", 10, -1),
   (1, "2021-01-01", 10, 10),
   (1, "2020-12-01", 10, 5),
   (1, "2021-04-01", 20, -5),
   (1, "2021-03-01", 20, -4),
   (1, "2021-02-01", 20, -3),
   (2, "2021-03-01", 10, 5),
   (2, "2021-02-01", 10, 6),
  ], 
  StructType([
    StructField("csecid", StringType(), True), 
    StructField("date", StringType(), True), 
    StructField("analystid", IntegerType(), True), 
    StructField("revisions_improved", IntegerType(), True)
  ]))

### Baseline
@pandas_udf(FloatType(), PandasUDFType.GROUPED_AGG)
def method2(analyst: pd.Series, revisions: pd.Series) -> float:
  df = pd.DataFrame({
    'analyst': analyst,
    'revisions': revisions
  })
  return df.groupby('analyst').last()['revisions'].sum() / df.groupby('analyst').last()['revisions'].abs().sum()

days = lambda x: x*60*60*24
w = Window.partitionBy('csecid').orderBy(F.col('date').cast('timestamp').cast('long')).rangeBetween(-days(100), 0)

# df.withColumn('new_col', method2(F.col('analystid'), F.col('revisions_improved')).over(w))

拟议的备选方案:

代码语言:javascript
运行
复制
### Method 3
from typing import List

@udf(FloatType())
def method3(analyst: List[int], revisions: List[int]) -> float:
  df = pd.DataFrame({
    'analyst': analyst,
    'revisions': revisions
  })
  return float(df.groupby('analyst').last()['revisions'].sum() / df.groupby('analyst').last()['revisions'].abs().sum())

(df
.withColumn('new_col', method2(F.col('analystid'), F.col('revisions_improved')).over(w))

.withColumn('analyst_win', F.collect_list("analystid").over(w))
.withColumn('revisions_win', F.collect_list("revisions_improved").over(w))

.withColumn('method3', method3(F.collect_list("analystid").over(w), 
                               F.collect_list("revisions_improved").over(w)))
.orderBy("csecid", "date", "analystid")
.show(truncate=False))

结果:

代码语言:javascript
运行
复制
+------+----------+---------+------------------+---------+----------------------------+-----------------------------+---------+
|csecid|date      |analystid|revisions_improved|new_col  |analyst_win                 |revisions_win                |method3  |
+------+----------+---------+------------------+---------+----------------------------+-----------------------------+---------+
|1     |2020-12-01|10       |5                 |1.0      |[10]                        |[5]                          |1.0      |
|1     |2021-01-01|10       |10                |1.0      |[10, 10]                    |[5, 10]                      |1.0      |
|1     |2021-02-01|10       |-1                |-1.0     |[10, 10, 10, 20]            |[5, 10, -1, -3]              |-1.0     |
|1     |2021-02-01|20       |-3                |-1.0     |[10, 10, 10, 20]            |[5, 10, -1, -3]              |-1.0     |
|1     |2021-03-01|10       |20                |0.6666667|[10, 10, 10, 20, 10, 20]    |[5, 10, -1, -3, 20, -4]      |0.6666667|
|1     |2021-03-01|20       |-4                |0.6666667|[10, 10, 10, 20, 10, 20]    |[5, 10, -1, -3, 20, -4]      |0.6666667|
|1     |2021-04-01|10       |-30               |-1.0     |[10, 10, 20, 10, 20, 10, 20]|[10, -1, -3, 20, -4, -30, -5]|-1.0     |
|1     |2021-04-01|20       |-5                |-1.0     |[10, 10, 20, 10, 20, 10, 20]|[10, -1, -3, 20, -4, -30, -5]|-1.0     |
|2     |2021-02-01|10       |6                 |1.0      |[10]                        |[6]                          |1.0      |
|2     |2021-03-01|10       |5                 |1.0      |[10, 10]                    |[6, 5]                       |1.0      |
+------+----------+---------+------------------+---------+----------------------------+-----------------------------+---------+

analyst_winrevisions_win只是展示如何创建滑动窗口并将其传递给UDF。它们在生产中应该被移除。

将Pandas组移到UDF之外可能会提高性能。星星之火可以解决这一问题。但是,我并没有质疑这个部分,因为您提到的函数并不代表实际的任务。

查看SparkUI中的性能,特别是应用UDF的任务的时间统计信息。如果时间很长,请尝试使用repartition来增加分区的数量,以便每个任务执行一个较小的数据子集。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67043906

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档