首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >具有复杂条件的Spark SQL窗口函数

具有复杂条件的Spark SQL窗口函数
EN

Stack Overflow用户
提问于 2017-02-25 05:25:10
回答 2查看 25K关注 0票数 24

这可能是最容易通过示例来解释的。假设我有一个用户登录网站的DataFrame,例如:

代码语言:javascript
复制
scala> df.show(5)
+----------------+----------+
|       user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
|  OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows

我想在此添加一个栏目,指出他们何时成为网站上的活跃用户。但有一点需要注意:有一段时间,用户被认为是活动的,在这段时间之后,如果他们再次登录,他们的became_active日期将重置。假设这段时间是5 days。那么从上面的表导出的所需的表将类似于:

代码语言:javascript
复制
+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-11|   2012-01-11|
+----------------+----------+-------------+

因此,特别是SirChillingtonIV的became_active日期被重置,因为他们的第二次登录是在活动期到期之后,但是Booooo99900098的became_active日期没有在他/她第二次登录时被重置,因为它在活动期内。

我最初的想法是在lag中使用窗口函数,然后使用lag的值来填充became_active列;例如,大致如下所示:

代码语言:javascript
复制
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))

然后,填写became_active日期的规则是,如果tmpnull (即,如果它是第一次登录),或者如果是login_date - tmp >= 5,则填写became_active = login_date;否则,转到tmp中的下一个最新值并应用相同的规则。这表明了一种递归方法,我很难想象有一种方法可以实现它。

我的问题是:这是一种可行的方法吗?如果可行,我如何“返回”并查看tmp的早期值,直到我找到一个停止的值?据我所知,我不能遍历Spark SQL Column的值。有没有其他方法可以达到这个效果呢?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-02-25 06:51:10

Spark >= 3.2

最新的Spark版本在批处理和结构化流式查询中都提供了对会话窗口的本机支持(参见SPARK-10816及其子任务,特别是SPARK-34893)。

官方文档提供了很好的usage example

火花< 3.2

这就是诀窍。导入一组函数:

代码语言:javascript
复制
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}

定义窗口:

代码语言:javascript
复制
val userWindow = Window.partitionBy("user_name").orderBy("login_date")
val userSessionWindow = Window.partitionBy("user_name", "session")

找到新会话的启动点:

代码语言:javascript
复制
val newSession =  (coalesce(
  datediff($"login_date", lag($"login_date", 1).over(userWindow)),
  lit(0)
) > 5).cast("bigint")

val sessionized = df.withColumn("session", sum(newSession).over(userWindow))

查找每个会话的最早日期:

代码语言:javascript
复制
val result = sessionized
  .withColumn("became_active", min($"login_date").over(userSessionWindow))
  .drop("session")

数据集定义为:

代码语言:javascript
复制
val df = Seq(
  ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
  ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"), 
  ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
  ("SirChillingtonIV", "2012-08-11")
).toDF("user_name", "login_date")

结果是:

代码语言:javascript
复制
+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-04|   2012-01-04| <- The first session for user
|SirChillingtonIV|2012-01-11|   2012-01-11| <- The second session for user
|SirChillingtonIV|2012-01-14|   2012-01-11| 
|SirChillingtonIV|2012-08-11|   2012-08-11| <- The third session for user
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
+----------------+----------+-------------+
票数 43
EN

Stack Overflow用户

发布于 2018-12-21 09:06:51

重构the other answer以使用Pyspark

Pyspark中,你可以像下面这样做。

create data frame

代码语言:javascript
复制
df = sqlContext.createDataFrame(
[
("SirChillingtonIV", "2012-01-04"), 
("Booooooo99900098", "2012-01-04"), 
("Booooooo99900098", "2012-01-06"), 
("OprahWinfreyJr", "2012-01-10"), 
("SirChillingtonIV", "2012-01-11"), 
("SirChillingtonIV", "2012-01-14"), 
("SirChillingtonIV", "2012-08-11")
], 
("user_name", "login_date"))

上面的代码创建一个如下所示的数据框

代码语言:javascript
复制
+----------------+----------+
|       user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
|  OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
|SirChillingtonIV|2012-01-14|
|SirChillingtonIV|2012-08-11|
+----------------+----------+

现在我们首先要找出login_date5 days之间的区别。

要做到这一点,请如下所示。

必要的导入

代码语言:javascript
复制
from pyspark.sql import functions as f
from pyspark.sql import Window


# defining window partitions  
login_window = Window.partitionBy("user_name").orderBy("login_date")
session_window = Window.partitionBy("user_name", "session")

session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))

当我们运行上面的代码行时,如果date_diffNULL,那么coalesce函数将把NULL替换为0

代码语言:javascript
复制
+----------------+----------+-------+
|       user_name|login_date|session|
+----------------+----------+-------+
|  OprahWinfreyJr|2012-01-10|      0|
|SirChillingtonIV|2012-01-04|      0|
|SirChillingtonIV|2012-01-11|      1|
|SirChillingtonIV|2012-01-14|      1|
|SirChillingtonIV|2012-08-11|      2|
|Booooooo99900098|2012-01-04|      0|
|Booooooo99900098|2012-01-06|      0|
+----------------+----------+-------+


# add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above step
final_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session")

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-04|   2012-01-04|
|SirChillingtonIV|2012-01-11|   2012-01-11|
|SirChillingtonIV|2012-01-14|   2012-01-11|
|SirChillingtonIV|2012-08-11|   2012-08-11|
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
+----------------+----------+-------------+
票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42448564

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档