首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >spark sql:如何为用户项创建sessionId

spark sql:如何为用户项创建sessionId
EN

Stack Overflow用户
提问于 2018-07-29 07:01:55
回答 2查看 833关注 0票数 2

假设我有这样的数据集:

|    item    | event       | timestamp |   user    |
|:-----------|------------:|:---------:|:---------:|
| titanic    | view        |     1     |    1      |
| titanic    | add to bag  |     2     |    1      | 
| titanic    | close       |     3     |    1      | 
| avatar     | view        |     6     |    1      |
| avatar     | close       |     10    |    1      |
| titanic    | view        |     20    |    1      |
| titanic    | purchase    |     30    |    1      |

诸若此类。我需要为每个用户计算对应于特定项目的连续进行事件的sessionId。

因此,对于该特定数据,输出应如下所示:

|    item    | event       | timestamp |   user    |   sessionId    |
|:-----------|------------:|:---------:|:---------:|:--------------:|
| titanic    | view        |     1     |    1      |   session1     |
| titanic    | add to bag  |     2     |    1      |   session1     |
| titanic    | close       |     3     |    1      |   session1     |
| avatar     | view        |     6     |    1      |   session2     |
| avatar     | close       |     10    |    1      |   session2     |
| titanic    | view        |     20    |    1      |   session3     |
| titanic    | purchase    |     30    |    1      |   session3     |

我正在尝试使用类似的方法,就像在window中描述的Spark: How to create a sessionId based on userId and timestamp

Window.partitionBy("user", "item").orderBy("timestamp")

但这是行不通的,因为相同的用户-项目组合可能在不同的会话中。例如,请参阅session1和session3。

有了这个窗口,它们就变成了相同的会话。需要帮助的另一种方法如何实现。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-07-29 08:38:51

这里有一种方法,首先使用条件null生成一列时间戳值,然后使用last(ts, ignoreNulls)rowsBetween用最后一个非空的时间戳值进行回填,最后使用dense_rank构造sessionId

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df = Seq(
  ("titanic", "view", 1, 1),
  ("titanic", "add to bag", 2, 1),
  ("titanic", "close", 3, 1),
  ("avatar", "view", 6, 1),
  ("avatar", "close", 10, 1),
  ("titanic", "view", 20, 1),
  ("titanic", "purchase", 30, 1)
).toDF("item", "event", "timestamp", "user")

val win1 = Window.partitionBy($"user").orderBy($"timestamp")
val win2 = Window.partitionBy($"user").orderBy($"sessTS")

df.
  withColumn( "firstTS",
    when( row_number.over(win1) === 1 || $"item" =!= lag($"item", 1).over(win1),
      $"timestamp" )
  ).
  withColumn( "sessTS",
    last($"firstTS", ignoreNulls = true).
      over(win1.rowsBetween(Window.unboundedPreceding, 0))
  ).
  withColumn("sessionId", concat(lit("session"), dense_rank.over(win2))).
  show

// +-------+----------+---------+----+-------+------+---------+
// |   item|     event|timestamp|user|firstTS|sessTS|sessionId|
// +-------+----------+---------+----+-------+------+---------+
// |titanic|      view|        1|   1|      1|     1| session1|
// |titanic|add to bag|        2|   1|   null|     1| session1|
// |titanic|     close|        3|   1|   null|     1| session1|
// | avatar|      view|        6|   1|      6|     6| session2|
// | avatar|     close|       10|   1|   null|     6| session2|
// |titanic|      view|       20|   1|     20|    20| session3|
// |titanic|  purchase|       30|   1|   null|    20| session3|
// +-------+----------+---------+----+-------+------+---------+
票数 6
EN

Stack Overflow用户

发布于 2018-07-29 12:46:17

您似乎需要累计计算"view“记录的数量。如果是这样的话:

select t.*,
       sum(case when event = 'view' then 1 else 0 end) over (partition by user order by timestamp) as session
from t;
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51575900

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档