我必须在Pyspark中的滑动窗口内执行聚合。特别是,我必须做以下工作:
aggregation
这些任务必须在带有.rangeBetween(-100 days, 0)
的滑动窗口中计算。
我可以很容易地通过构造一个Pandas来实现这一结果,它将Pandas中的某些列作为输入,将它们转换为Pandas DataFrame,然后计算聚合并返回标量结果。然后将UDF应用于所需的滑动窗口。
尽管此解决方案工作良好,但完成任务需要很长时间(3-4小时),因为DFs包含数百万行。是否有办法改善这种运算的计算时间?我正在数据库中使用Pyspark。
我的熊猫UDF是:
@pandas_udf(FloatType(), PandasUDFType.GROUPED_AGG)
def method2(analyst: pd.Series, revisions: pd.Series) -> float:
df = pd.DataFrame({
'analyst': analyst,
'revisions': revisions
})
return df.groupby('analyst').last()['revisions'].sum() / df.groupby('analyst').last()['revisions'].abs().sum()
适用于:
days = lambda x: x*60*60*24
w = Window.partitionBy('csecid').orderBy(F.col('date').cast('timestamp').cast('long')).rangeBetween(-days(100), 0)
df = df.withColumn('new_col', method2(F.col('analystid'), F.col('revisions_improved')).over(w))
编辑:我知道这种聚合可以通过使用numpy数组来实现,而PySpark UDF处理numpy结构的速度要快得多。但是,我想避免这个解决方案,因为我需要在相同的框架函数中应用,这些函数比所显示的要复杂得多,而且很难与numpy复制。
发布于 2021-04-15 23:24:05
最近,我不得不实现一个类似的聚合,我的第一次尝试是在滑动窗口中使用Pandas。性能很差,我使用以下方法改进了它。
collect_list
尝试使用来组合滑动窗口向量,然后将它们映射到UDF中。注意,只有当滑动窗口能够容纳工作人员内存(通常是这样)时,这才能工作。
这是我的测试代码。第一部分只是您的代码,但作为一个完整的可复制的例子。
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf, PandasUDFType, udf
from pyspark.sql.types import FloatType, StructType, StructField, IntegerType, StringType
df = spark.createDataFrame(
[(1, "2021-04-01", 10, -30),
(1, "2021-03-01", 10, 20),
(1, "2021-02-01", 10, -1),
(1, "2021-01-01", 10, 10),
(1, "2020-12-01", 10, 5),
(1, "2021-04-01", 20, -5),
(1, "2021-03-01", 20, -4),
(1, "2021-02-01", 20, -3),
(2, "2021-03-01", 10, 5),
(2, "2021-02-01", 10, 6),
],
StructType([
StructField("csecid", StringType(), True),
StructField("date", StringType(), True),
StructField("analystid", IntegerType(), True),
StructField("revisions_improved", IntegerType(), True)
]))
### Baseline
@pandas_udf(FloatType(), PandasUDFType.GROUPED_AGG)
def method2(analyst: pd.Series, revisions: pd.Series) -> float:
df = pd.DataFrame({
'analyst': analyst,
'revisions': revisions
})
return df.groupby('analyst').last()['revisions'].sum() / df.groupby('analyst').last()['revisions'].abs().sum()
days = lambda x: x*60*60*24
w = Window.partitionBy('csecid').orderBy(F.col('date').cast('timestamp').cast('long')).rangeBetween(-days(100), 0)
# df.withColumn('new_col', method2(F.col('analystid'), F.col('revisions_improved')).over(w))
拟议的备选方案:
### Method 3
from typing import List
@udf(FloatType())
def method3(analyst: List[int], revisions: List[int]) -> float:
df = pd.DataFrame({
'analyst': analyst,
'revisions': revisions
})
return float(df.groupby('analyst').last()['revisions'].sum() / df.groupby('analyst').last()['revisions'].abs().sum())
(df
.withColumn('new_col', method2(F.col('analystid'), F.col('revisions_improved')).over(w))
.withColumn('analyst_win', F.collect_list("analystid").over(w))
.withColumn('revisions_win', F.collect_list("revisions_improved").over(w))
.withColumn('method3', method3(F.collect_list("analystid").over(w),
F.collect_list("revisions_improved").over(w)))
.orderBy("csecid", "date", "analystid")
.show(truncate=False))
结果:
+------+----------+---------+------------------+---------+----------------------------+-----------------------------+---------+
|csecid|date |analystid|revisions_improved|new_col |analyst_win |revisions_win |method3 |
+------+----------+---------+------------------+---------+----------------------------+-----------------------------+---------+
|1 |2020-12-01|10 |5 |1.0 |[10] |[5] |1.0 |
|1 |2021-01-01|10 |10 |1.0 |[10, 10] |[5, 10] |1.0 |
|1 |2021-02-01|10 |-1 |-1.0 |[10, 10, 10, 20] |[5, 10, -1, -3] |-1.0 |
|1 |2021-02-01|20 |-3 |-1.0 |[10, 10, 10, 20] |[5, 10, -1, -3] |-1.0 |
|1 |2021-03-01|10 |20 |0.6666667|[10, 10, 10, 20, 10, 20] |[5, 10, -1, -3, 20, -4] |0.6666667|
|1 |2021-03-01|20 |-4 |0.6666667|[10, 10, 10, 20, 10, 20] |[5, 10, -1, -3, 20, -4] |0.6666667|
|1 |2021-04-01|10 |-30 |-1.0 |[10, 10, 20, 10, 20, 10, 20]|[10, -1, -3, 20, -4, -30, -5]|-1.0 |
|1 |2021-04-01|20 |-5 |-1.0 |[10, 10, 20, 10, 20, 10, 20]|[10, -1, -3, 20, -4, -30, -5]|-1.0 |
|2 |2021-02-01|10 |6 |1.0 |[10] |[6] |1.0 |
|2 |2021-03-01|10 |5 |1.0 |[10, 10] |[6, 5] |1.0 |
+------+----------+---------+------------------+---------+----------------------------+-----------------------------+---------+
analyst_win
和revisions_win
只是展示如何创建滑动窗口并将其传递给UDF。它们在生产中应该被移除。
将Pandas组移到UDF之外可能会提高性能。星星之火可以解决这一问题。但是,我并没有质疑这个部分,因为您提到的函数并不代表实际的任务。
查看SparkUI中的性能,特别是应用UDF的任务的时间统计信息。如果时间很长,请尝试使用repartition
来增加分区的数量,以便每个任务执行一个较小的数据子集。
https://stackoverflow.com/questions/67043906
复制相似问题