首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Spark Streaming -计数状态中的不同元素

Spark Streaming -计数状态中的不同元素
EN

Stack Overflow用户
提问于 2017-03-07 18:03:36
回答 2查看 1.1K关注 0票数 2

我有一个包含VideoID-UserID的键值对的数据流,按VideoID计算不同的UserID组的最佳做法是什么?

代码语言:javascript
运行
复制
// VideoID,UserID
foo,1
foo,2
bar,1
bar,2
foo,1
bar,2

如上所述,我想随时通过删除冗余的foo,1bar,2来获得VideoID-CountUserID,所以结果应该是:

代码语言:javascript
运行
复制
foo: 2
bar: 2

换句话说,我想在内存中保存一个大的状态数据集。当新的一批数据流到达时,将其与数据集进行比较,以统计每个视频的不同用户。

该怎么做呢?

我正在开发Spark 1.6,但是后续版本的答案是可以接受的。Python代码如果可能的话。

EN

回答 2

Stack Overflow用户

发布于 2017-03-09 08:11:35

为了获得按视频ID分组的用户ID的不同计数,请考虑使用aggregateByKey.对不起,这是Scala,所以你必须翻译。

代码语言:javascript
运行
复制
val rdd = sc.textFile("your_file.txt")

val initialSet = Set.empty[Int]
val addToSet = (s: Set[Int], v:Int) => s + v
val mergeSets = (s1: Set[Int], s2: Set[Int]) => s1 ++ s2

val distinctValSets = rdd.aggregateByKey(initialSet)(addToSet, mergeSets)
val distinctValCountd = rdd.map({case(k,s) => (k,s.size)})

Initial set是聚合对象的初始值,addToSet和mergeSets指定如何将值添加到set中,并根据key合并不同的set。这应该会为你提供与每个视频相关联的不同数量的用户,而且(在空间上)比reduceByKey和groupByKey更便宜。

票数 1
EN

Stack Overflow用户

发布于 2017-03-23 05:14:47

代码语言:javascript
运行
复制
  val rdd1 = sc.parallelize(Seq(("foo", 1),("foo", 2),("foo", 1)))
  rdd1.groupByKey.mapValues(x=>x.toSet.toSeq).flatMapValues(x=>x).collect
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42645253

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档