有没有一种更有效/更惯用的方式来重写这个查询:
spark.table('registry_data')
.withColumn('age_days', datediff(lit(today), col('date')))
.withColumn('timeframe',
when(col('age_days')<7, "1w")
.when(col('age_days')<30, '1m')
.when(col('age_days')<92, '3m')
.when(col('age_days')<183, '6m')
.when(col('age_days')<365, '1y')
.otherwise('1y+')
)
.groupby('make', 'model')
.pivot('timeframe')
.agg(countDistinct('id').alias('count'))
.fillna(0)
.withColumn('1y+', col('1y+')+col('1y')+col('6m')+col('3m')+col('1m')+col('1w'))
.withColumn('1y', col('1y')+col('6m')+col('3m')+col('1m')+col('1w'))
.withColumn('6m', col('6m')+col('3m')+col('1m')+col('1w'))
.withColumn('3m', col('3m')+col('1m')+col('1w'))
.withColumn('1m', col('1m')+col('1w'))
查询的要点是,对于每个品牌/型号组合,返回从今天开始的一组时间段内看到的条目数量。期间计数是累积的,即最近7天内注册的条目将计入1周、1个月、3个月等。
发布于 2021-06-26 03:19:19
如果要对每列使用累积求和而不是求和,则可以替换.groupby
以后的代码并使用窗口函数
from pyspark.sql.window import Window
import pyspark.sql.functions as F
spark.table('registry_data')
.withColumn('age_days', datediff(lit(today), col('date')))
.withColumn('timeframe',
when(col('age_days')<7, "1w")
.when(col('age_days')<30, '1m')
.when(col('age_days')<92, '3m')
.when(col('age_days')<183, '6m')
.when(col('age_days')<365, '1y')
.otherwise('1y+')
)
.groupBy('make', 'model', 'timeframe')
.agg(F.countDistinct('id').alias('count'),
F.max('age_days').alias('max_days')) # for orderBy clause
.withColumn('cumsum',
F.sum('count').over(Window.partitionBy('make', 'model')
.orderBy('max_days')
.rowsBetween(Window.unboundedPreceding, 0)))
.groupBy('make', 'model').pivot('timeframe').agg(F.first('cumsum'))
.fillna(0)
https://stackoverflow.com/questions/68138089
复制相似问题