前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark resampling

Spark resampling

原创
作者头像
flavorfan
发布2021-09-18 16:08:15
8530
发布2021-09-18 16:08:15
举报
文章被收录于专栏:范传康的专栏范传康的专栏

对时间序列的index进行resample是很常见的操作。比如,按日、周、月、季度统计用户新增、活跃、累计等,就需要对用户表进行resample操作。 pandas 的resample函数可以轻松地对时间序列数据进行重采样,并按照一定的频率聚合数据。但是因为spark中没有index的概念,所以做起来并不容易。

以下介绍是如何在 spark 中进行重采样的示例。

1. 笨拙的方法

def resample(column, agg_interval=900, time_format='yyyy-MM-dd HH:mm:ss'):
    if type(column)==str:
        column = F.col(column)

    # Convert the timestamp to unix timestamp format.
    # Unix timestamp = number of seconds since 00:00:00 UTC, 1 January 1970.
    col_ut =  F.unix_timestamp(column, format=time_format)

    # Divide the time into dicrete intervals, by rounding. 
    col_ut_agg =  F.floor(col_ut / agg_interval) * agg_interval  

    # Convert to and return a human readable timestamp
    return F.from_unixtime(col_ut_agg)`

测试如下

导入数据:

sdf = spark.read.csv('production.csv', header=True, inferSchema=True) 
sdf = (
    sdf
    .withColumn('_c0',f.to_timestamp(f.col('_c0')))
)
sdf.show(2)

运行

sdf = sdf.withColumn('dt_resampled', resample(sdf._c0, agg_interval=3600)) # 1 hour
sdf.show(5)
  1. groupby + window
group = sdf.groupBy('Cantons', f.window("_c0", "1 day")).agg(f.sum("Production").alias('Sum Production'))
group.show(5,truncate=False)
sdf_resampled = group.select(group.window.start.alias("Start"), group.window.end.alias("End"), "Cantons", "Sum Production").orderBy('Start', ascending=True)
sdf_resampled.show()

)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 笨拙的方法
    • 测试如下
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档