假设我有这样的数据集:
| item | event | timestamp | user |
|:-----------|------------:|:---------:|:---------:|
| titanic | view | 1 | 1 |
| titanic | add to bag | 2 | 1 |
| titanic | close | 3 | 1 |
| avatar | view | 6 | 1 |
| avatar | close | 10 | 1 |
| titanic | view | 20 | 1 |
| titanic | purchase | 30 | 1 |
诸若此类。我需要为每个用户计算对应于特定项目的连续进行事件的sessionId。
因此,对于该特定数据,输出应如下所示:
| item | event | timestamp | user | sessionId |
|:-----------|------------:|:---------:|:---------:|:--------------:|
| titanic | view | 1 | 1 | session1 |
| titanic | add to bag | 2 | 1 | session1 |
| titanic | close | 3 | 1 | session1 |
| avatar | view | 6 | 1 | session2 |
| avatar | close | 10 | 1 | session2 |
| titanic | view | 20 | 1 | session3 |
| titanic | purchase | 30 | 1 | session3 |
我正在尝试使用类似的方法,就像在window中描述的Spark: How to create a sessionId based on userId and timestamp:
Window.partitionBy("user", "item").orderBy("timestamp")
但这是行不通的,因为相同的用户-项目组合可能在不同的会话中。例如,请参阅session1和session3。
有了这个窗口,它们就变成了相同的会话。需要帮助的另一种方法如何实现。
发布于 2018-07-29 08:38:51
这里有一种方法,首先使用条件null
生成一列时间戳值,然后使用last(ts, ignoreNulls)
和rowsBetween
用最后一个非空的时间戳值进行回填,最后使用dense_rank
构造sessionId
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df = Seq(
("titanic", "view", 1, 1),
("titanic", "add to bag", 2, 1),
("titanic", "close", 3, 1),
("avatar", "view", 6, 1),
("avatar", "close", 10, 1),
("titanic", "view", 20, 1),
("titanic", "purchase", 30, 1)
).toDF("item", "event", "timestamp", "user")
val win1 = Window.partitionBy($"user").orderBy($"timestamp")
val win2 = Window.partitionBy($"user").orderBy($"sessTS")
df.
withColumn( "firstTS",
when( row_number.over(win1) === 1 || $"item" =!= lag($"item", 1).over(win1),
$"timestamp" )
).
withColumn( "sessTS",
last($"firstTS", ignoreNulls = true).
over(win1.rowsBetween(Window.unboundedPreceding, 0))
).
withColumn("sessionId", concat(lit("session"), dense_rank.over(win2))).
show
// +-------+----------+---------+----+-------+------+---------+
// | item| event|timestamp|user|firstTS|sessTS|sessionId|
// +-------+----------+---------+----+-------+------+---------+
// |titanic| view| 1| 1| 1| 1| session1|
// |titanic|add to bag| 2| 1| null| 1| session1|
// |titanic| close| 3| 1| null| 1| session1|
// | avatar| view| 6| 1| 6| 6| session2|
// | avatar| close| 10| 1| null| 6| session2|
// |titanic| view| 20| 1| 20| 20| session3|
// |titanic| purchase| 30| 1| null| 20| session3|
// +-------+----------+---------+----+-------+------+---------+
发布于 2018-07-29 12:46:17
您似乎需要累计计算"view“记录的数量。如果是这样的话:
select t.*,
sum(case when event = 'view' then 1 else 0 end) over (partition by user order by timestamp) as session
from t;
https://stackoverflow.com/questions/51575900
复制相似问题