首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >Spark:如何在Dataframe API中转换计数(distinct(Value))

Spark:如何在Dataframe API中转换计数(distinct(Value))
EN

Stack Overflow用户
提问于 2015-05-13 22:40:09
回答 4查看 69.5K关注 0票数 33

我正在尝试比较聚合数据的不同方法。

这是我的输入数据,包含2个元素(页面,访问者):

代码语言:javascript
复制
(PAG1,V1)
(PAG1,V1)
(PAG2,V1)
(PAG2,V2)
(PAG2,V1)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG2,V2)
(PAG1,V3)

使用以下代码将SQL命令转换为Spark SQL:

代码语言:javascript
复制
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Log(p._1,p._2)).toDF()
logs.registerTempTable("logs")
val sqlResult= sqlContext.sql(
                              """select page
                                       ,count(distinct visitor) as visitor
                                   from logs
                               group by page
                              """)
val result = sqlResult.map(x=>(x(0).toString,x(1).toString))
result.foreach(println)

我得到以下输出:

代码语言:javascript
复制
(PAG1,3) // PAG1 has been visited by 3 different visitors
(PAG2,2) // PAG2 has been visited by 2 different visitors

现在,我希望使用Dataframes和thiers API获得相同的结果,但我无法获得相同的输出:

代码语言:javascript
复制
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Coppia(p._1,p._2)).toDF()
val result = log.select("page","visitor").groupBy("page").count().distinct
result.foreach(println)

实际上,这就是我得到的输出:

代码语言:javascript
复制
[PAG1,8]  // just the simple page count for every page
[PAG2,4]

它可能是什么愚蠢的东西,但我现在看不出来。

提前感谢!

FF

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2015-05-13 22:57:23

您需要的是DataFrame聚合函数countDistinct

代码语言:javascript
复制
import sqlContext.implicits._
import org.apache.spark.sql.functions._

case class Log(page: String, visitor: String)

val logs = data.map(p => Log(p._1,p._2))
            .toDF()

val result = logs.select("page","visitor")
            .groupBy('page)
            .agg('page, countDistinct('visitor))

result.foreach(println)
票数 65
EN

Stack Overflow用户

发布于 2018-08-14 13:10:51

您可以使用dataframe的groupBy命令两次来完成此操作。这里,df1是您的原始输入。

代码语言:javascript
复制
val df2 = df1.groupBy($"page",$"visitor").agg(count($"visitor").as("count"))

此命令将产生以下结果:

代码语言:javascript
复制
page  visitor  count
----  ------   ----
PAG2    V2       2
PAG1    V3       1
PAG1    V1       5
PAG1    V2       2
PAG2    V1       2

然后再次使用groupBy命令获得最终结果。

代码语言:javascript
复制
 df2.groupBy($"page").agg(count($"visitor").as("count"))

最终输出:

代码语言:javascript
复制
page   count
----   ----
PAG1    3
PAG2    2
票数 4
EN

Stack Overflow用户

发布于 2019-10-07 10:34:57

我认为在Spark的新版本中,它更容易实现。下面是使用2.4.0进行测试的。1.首先创建sample数组。

代码语言:javascript
复制
val myArr = Array(
      ("PAG1","V1"),
      ("PAG1","V1"),
      ("PAG2","V1"),
      ("PAG2","V2"),
      ("PAG2","V1"),
      ("PAG1","V1"),
      ("PAG1","V2"),
      ("PAG1","V1"),
      ("PAG1","V2"),
      ("PAG1","V1"),
      ("PAG2","V2"),
      ("PAG1","V3")
    )

创建数据帧

代码语言:javascript
复制
val logs = spark.createDataFrame(myArr)
    .withColumnRenamed("_1","page")
    .withColumnRenamed("_2","visitor")

现在使用distinctCount spark sql函数进行聚合

代码语言:javascript
复制
import org.apache.spark.sql.{functions => F}
logs.groupBy("page").agg(
    F.countDistinct("visitor").as("visitor"))
    .show()

预期结果:

代码语言:javascript
复制
+----+-------+
|page|visitor|
+----+-------+
|PAG1|      3|
|PAG2|      2|
+----+-------+
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/30218140

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档