前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >图解Spark Graphx实现顶点关联邻接顶点的collectNeighbors函数原理

图解Spark Graphx实现顶点关联邻接顶点的collectNeighbors函数原理

原创
作者头像
朱季谦
修改于 2023-09-02 04:05:19
修改于 2023-09-02 04:05:19
73610
代码可运行
举报
运行总次数:0
代码可运行

原创/朱季谦

一、场景案例

在一张社区网络里,可能需要查询出各个顶点邻接关联的顶点集合,类似查询某个人关系比较近的都有哪些人的场景。

在用Spark graphx中,通过函数collectNeighbors便可以获取到源顶点邻接顶点的数据。

下面以一个例子来说明,首先,先基于顶点集和边来创建一个Graph图。

该图的顶点集合为——

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
(1L, "Alice"),
(2L, "Bob"),
(3L, "Charlie"),
(4L, "David"),
(5L, "Eve"),
(6L, "Frank"),
(7L, "Grace"),
(8L, "Henry"),
(9L, "Ivy")

边的集合为——

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Edge(1L, 2L, "friend"),
Edge(1L, 5L, "friend"),
Edge(2L, 3L, "friend"),
Edge(2L, 4L, "friend"),
Edge(3L, 4L, "friend"),
Edge(4L, 6L, "friend"),
Edge(5L, 7L, "friend"),
Edge(5L, 8L, "friend"),
Edge(6L, 9L, "friend"),
Edge(7L, 8L, "friend"),
Edge(8L, 9L, "friend")

基于以上顶点和边,分别建立一个顶点RDD 和边RDD,然后通过Graph(vertices, edges, defaultVertex)创建一个Graph图,代码如下——

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val conf = new SparkConf().setMaster("local[*]").setAppName("graphx")
val ss = SparkSession.builder().config(conf).getOrCreate()

// 创建顶点RDD
val vertices = ss.sparkContext.parallelize(Seq(
  (1L, "Alice"),
  (2L, "Bob"),
  (3L, "Charlie"),
  (4L, "David"),
  (5L, "Eve"),
  (6L, "Frank"),
  (7L, "Grace"),
  (8L, "Henry"),
  (9L, "Ivy")
))


// 创建边RDD
val edges = ss.sparkContext.parallelize(Seq(
  Edge(1L, 2L, "friend"),
  Edge(1L, 5L, "friend"),
  Edge(2L, 3L, "friend"),
  Edge(2L, 4L, "friend"),
  Edge(3L, 4L, "friend"),
  Edge(4L, 6L, "friend"),
  Edge(5L, 7L, "friend"),
  Edge(5L, 8L, "friend"),
  Edge(6L, 9L, "friend"),
  Edge(7L, 8L, "friend"),
  Edge(8L, 9L, "friend")
))

val graph = Graph(vertices, edges, null)

在成功创建图之后,就可以基于已有的图,通过collectNeighbors方法,分别得到每个顶点关联邻接顶点的数据——

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val neighborVertexs = graph.mapVertices{
  case (id,(label)) => (label)
}.collectNeighbors(EdgeDirection.Either)

最终得到的neighborVertexs是一个VertexRDD[Array[(VertexId, VD)]]类型的RDD,可以通过neighborVertexs.foreach(println)打印观察一下,发现数据里,是每一个【顶点,元组】的结构,注意看,大概就能猜出来,通过neighborVertexs得到的RDD其实就是每个顶点关联了邻接顶点集合元组的数据——

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
(5,[Lscala.Tuple2;@bb793d7)
(8,[Lscala.Tuple2;@6d5786e6)
(1,[Lscala.Tuple2;@398cb9ea)
(9,[Lscala.Tuple2;@61c4eeb2)
(2,[Lscala.Tuple2;@d7d0256)
(6,[Lscala.Tuple2;@538f0156)
(7,[Lscala.Tuple2;@77a17e3d)
(3,[Lscala.Tuple2;@1be2a4fb)
(4,[Lscala.Tuple2;@1e0153f9)

可以进一步验证,将元组里的数据进行展开打印,通过以下代码进行验证——先通过coalesce(1)将分区设置为一个分区,多个分区打印难以确定打印顺序。然后再通过foreach遍历RDD里每一个元素,这里的元素结构如(5,[Lscala.Tuple2;@bb793d7),x._1表示是顶点5,x._2表示[Lscala.Tuple2;@bb793d7,既然是元组,那就可以进一步进行遍历打印,即 x._2.foreach(y => {...})——

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
neighborVertexs.coalesce(1).foreach(x => {
    print("顶点:" + x._1 + "关联的邻居顶点集合->{" )
    var str = "";
    x._2.foreach(y => {
      str += y + ","})
    print(str.substring(0, str.length - 1 ) +"}")
    println()
})

可以观察一下最后打印结果——

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
顶点:8关联的邻居顶点集合->{(5,Eve),(7,Grace),(9,Ivy)}
顶点:1关联的邻居顶点集合->{(2,Bob),(5,Eve)}
顶点:9关联的邻居顶点集合->{(6,Frank),(8,Henry)}
顶点:2关联的邻居顶点集合->{(1,Alice),(3,Charlie),(4,David)}
顶点:3关联的邻居顶点集合->{(2,Bob),(4,David)}
顶点:4关联的邻居顶点集合->{(2,Bob),(3,Charlie),(6,Frank)}
顶点:5关联的邻居顶点集合->{(1,Alice),(7,Grace),(8,Henry)}
顶点:6关联的邻居顶点集合->{(4,David),(9,Ivy)}
顶点:7关联的邻居顶点集合->{(5,Eve),(8,Henry)}

结合文章开始的那一个图验证一下,顶点1关联的邻接顶点是(2,Bob),(5,Eve),正确;顶点8关联的邻接顶点是(5,Eve),(7,Grace),(9,Ivy),正确。其他验证都与下图情况符合。可见,通过collectNeighbors(EdgeDirection.Either)确实可以获取网络里每个顶点关联邻接顶点的数据。

二、函数代码原理解析

以上就是顶点关联邻接顶点的用法案例,接下来,让我们分析一下collectNeighbors(EdgeDirection.Either)源码,该函数实现了收集顶点邻居顶点的信息——

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {
  val nbrs = edgeDirection match {
    //聚合本顶点出度指向的邻居顶点和入度指向本顶点的邻居顶点
    case EdgeDirection.Either =>
      graph.aggregateMessages[Array[(VertexId, VD)]](
        ctx => {
          ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
          ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
        },
        (a, b) => a ++ b, TripletFields.All)
    //聚合本顶点出度指向的邻居顶点
    case EdgeDirection.In =>
      graph.aggregateMessages[Array[(VertexId, VD)]](
        ctx => ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))),
        (a, b) => a ++ b, TripletFields.Src)
    //聚合入度指向本顶点的邻居顶点
    case EdgeDirection.Out =>
      graph.aggregateMessages[Array[(VertexId, VD)]](
        ctx => ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))),
        (a, b) => a ++ b, TripletFields.Dst)
    case EdgeDirection.Both =>
      throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" +
        "EdgeDirection.Either instead.")
  }
  graph.vertices.leftJoin(nbrs) { (vid, vdata, nbrsOpt) =>
    nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])
  }
} // end of collectNeighbor

该函数用match做了一个类似Java的switch匹配,匹配有四种结果,其中,最后一种EdgeDirection.Both已经不支持,故而这里就不解读了,只讲仍然有用的三种。

用一个图来说明吧,假如有以下边指向的图——

Edge(2L, 1L), Edge(2L, 4L), Edge(3L, 2L), Edge(2L, 5L),

  • EdgeDirection.Either表示本顶点的出度邻居和入度邻居。若本顶点为2,那么它得到邻居顶点包括(1,4,3,5),该参数表示只要与顶点2一度边关联的,都会聚集成邻居顶点。
  • EdgeDirection.In表示指向本顶点的邻居,即本顶点的入度邻居。若本顶点为2,图里邻居顶点只有3是指向2的,那么顶点2得到邻居顶点包括(3)。
  • EdgeDirection.Out表示本顶点的出度指向的邻居顶点。若本顶点为2,图里从顶点2指向邻居顶点的,将得到(1,4,5)。

由此可知,顶点关联邻居顶点的函数collectNeighbors(EdgeDirection.Either)里面的参数,就是可以基于该参数得到不同情况的邻居顶点。

这里以collectNeighbors(EdgeDirection.Either)说明函数核心逻辑——

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 graph.aggregateMessages[Array[(VertexId, VD)]](
        ctx => {
          ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
          ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
        },
        (a, b) => a ++ b, TripletFields.All)

该代码做了聚合,表示会对图里的所有边做处理。

图里有一种边结构,叫三元组(Triplet),这种结构由以下三个部分组成——

  1. 源顶点(Source Vertex):图中的一条边的起始点或源节点。
  2. 目标顶点(Destination Vertex):图中的一条边的结束点或目标节点。
  3. 边属性(Edge Attribute):连接源顶点和目标顶点之间的边上的属性值。

在graph.aggregateMessages[Array[(VertexId, VD)]]( ctx => {......})聚合函数里,就是基于三元组去做聚合统计的。

该聚合函数有两个参数,第一个参数是一个函数(ctx) => { ... },里面定义了每个顶点如何发送消息给邻居顶点。

注意看,这里的ctx正是一个三元组对象,基于该对象,可以获取一下信息——

  • ctx.srcId:获取源顶点的ID。
  • ctx.srcAttr:获取源顶点的属性。
  • ctx.dstId:获取目标顶点的ID。
  • ctx.dstAttr:获取目标顶点的属性。

ctx作为一个知道源顶点、目标顶点的三元组对象,就像一个邮差一样,负责给两边顶点发送消息。

1、ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))函数,这里顶点A是作为目标顶点,邻居节点B是源顶点,ctx对象就会将目标顶点B的顶点ID和属性组成的元组(ctx.dstId, ctx.dstAttr)当作消息传给源顶点A,A会将收到的消息保存下来,这样就知道EdgeDirection.Either无向边情况下,它有一个邻居B了。

2、 ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))函数,这时A成为了源顶点,C成为了目标顶点,ctx对象就会将源顶点A的顶点ID和属性组成的元组(ctx.dstId, ctx.dstAttr)当作消息传给源顶点B。B会将收到的消息以数组格式Array((ctx.dstId, ctx.dstAttr))保存下来,这样B以后就知道EdgeDirection.Either无向边情况下,它有一个邻居A了。

这里ctx.sendToDst()用Array((ctx.dstId, ctx.dstAttr))数组形式发送,是方便后面的(a, b) => a ++ b 合并函数操作,最后每个顶点可以将它收到的邻居顶点数组合并到一个大的数组,即所有邻居顶点聚集到一个数组里返回。

还有一个TripletFields枚举需要了解下——

TripletFields.All表示本顶点将聚合包括源顶点以及目标顶点发送顶点消息。

TripletFields.Src表示本顶点只聚合源顶点发送过来的顶点消息。

TripletFields.Dst表示本顶点只聚合目标顶点发送过来的顶点消息。

EdgeDirection.Either参数对应的是TripletFields.All,表示需要将本顶点接收到的所有源顶点以及目标顶点发送的顶点消息进行聚合。

接下来,就是做聚合了——

整个图里会有许多类似邮差角色的ctx对象,只需要处理完这些对象,那么,每个顶点就会收到通过ctx对象传送过来的邻居顶点信息。

例如,A收到的ctx对象发过来的邻居消息如下——

Array((B,属性))

Array((C,属性))

Array((D,属性))

......

这时,就可以基于顶点A作为分组key,将组内的Array((B,属性))、Array((C,属性))、Array((D,属性))都合并到一个组里,即通过(a, b) => a ++ b将分组各个数据合并成一个大数组{(B,属性),(C,属性),(D,属性)},这个分组group的key是收到各个ctx对象发送邻居消息过来的顶点A。

各个顶点聚合完后,返回一个nbrs,该RDD的每一个元素,即(顶点,顶点属性,Array(邻居顶点))——

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val nbrs = edgeDirection match {
  case EdgeDirection.Either =>
    graph.aggregateMessages[Array[(VertexId, VD)]](
      ctx => {
        ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
        ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
      },
      (a, b) => a ++ b, TripletFields.All)
		......  
}

接着将原图graph的顶点vertices的rdd与聚合结果nbrs做左连接,返回一个新的 VertexRDD 对象,其中每个顶点都附带了它的邻居信息。如果某个顶点没有邻居信息(在 nbrs 中不存在对应的条目),则使用空数组来表示它的邻居。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
graph.vertices.leftJoin(nbrs) { (vid, vdata, nbrsOpt) =>
  nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])
}

最后,得到的顶点关联邻居顶点的RDD情况,就如前文打印的那样——

(5,[Lscala.Tuple2;@bb793d7) 顶点5展开邻居顶点=> 顶点:5关联的邻居顶点集合->{(1,Alice),(7,Grace),(8,Henry)} (8,[Lscala.Tuple2;@6d5786e6) 顶点8展开邻居顶点=> 顶点:8关联的邻居顶点集合->{(5,Eve),(7,Grace),(9,Ivy)} (1,[Lscala.Tuple2;@398cb9ea) 顶点1展开邻居顶点=> 顶点:1关联的邻居顶点集合->{(2,Bob),(5,Eve)} (9,[Lscala.Tuple2;@61c4eeb2) 顶点9展开邻居顶点=> 顶点:9关联的邻居顶点集合->{(6,Frank),(8,Henry)} (2,[Lscala.Tuple2;@d7d0256) 顶点2展开邻居顶点=> 顶点:2关联的邻居顶点集合->{(1,Alice),(3,Charlie),(4,David)} (6,[Lscala.Tuple2;@538f0156) 顶点6展开邻居顶点=> 顶点:6关联的邻居顶点集合->{(4,David),(9,Ivy)} (7,[Lscala.Tuple2;@77a17e3d) 顶点7展开邻居顶点=> 顶点:7关联的邻居顶点集合->{(5,Eve),(8,Henry)} (3,[Lscala.Tuple2;@1be2a4fb) 顶点3展开邻居顶点=> 顶点:3关联的邻居顶点集合->{(2,Bob),(4,David)} (4,[Lscala.Tuple2;@1e0153f9) 顶点4展开邻居顶点=> 顶点:4关联的邻居顶点集合->{(2,Bob),(3,Charlie),(6,Frank)}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
1 条评论
热度
最新
,,,,,,,,
,,,,,,,,
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
不要到处翻了 | Hive开窗函数总结与实践
平常我们使用 hive或者 mysql时,一般聚合函数用的比较多。但对于某些偏分析的需求,group by可能很费力,子查询很多,这个时候就需要使用窗口分析函数了~ 注:hive、oracle提供开窗函数,mysql8之前版本不提供,但Oracle发布的 MySQL 8.0版本支持窗口函数(over)和公用表表达式(with)这两个重要的功能!
大数据真好玩
2021/01/26
6.1K0
Hsql函数下_sql nvl函数
1、使用标准的聚合函数COUNT、SUM、MIN、MAX、AVG 2、使用PARTITION BY语句,使用一个或者多个原始数据类型的列 3、使用PARTITION BY与ORDER BY语句,使用一个或者多个数据类型的分区或者排序列 4、使用窗口规范,窗口规范支持以下格式:
全栈程序员站长
2022/09/29
1.3K0
Hsql函数下_sql nvl函数
Hive的利器:强大而实用的开窗函数
与聚合函数类似,开窗函数也是对行集组进行聚合计算。但是它不像普通聚合函数那样,每组通常只返回一个值,开窗函数可以为每组返回多个值,因为开窗函数所执行聚合计算的行集组是窗口。
大数据学习与分享
2021/04/22
3.5K0
Hive的利器:强大而实用的开窗函数
Hive窗口函数/分析函数详解
在sql中有一类函数叫做聚合函数,例如sum()、avg()、max()等等,这类函数可以将多行数据按照规则聚集为一行,一般来讲聚集后的行数是要少于聚集前的行数的。但是有时我们想要既显示聚集前的数据,又要显示聚集后的数据,这时我们便引入了窗口函数。窗口函数又叫OLAP函数/分析函数,窗口函数兼具分组和排序功能。
五分钟学大数据
2021/03/04
8850
Hive窗口函数/分析函数详解
hive窗口函数/分析函数详细剖析
在sql中有一类函数叫做聚合函数,例如sum()、avg()、max()等等,这类函数可以将多行数据按照规则聚集为一行,一般来讲聚集后的行数是要少于聚集前的行数的。但是有时我们想要既显示聚集前的数据,又要显示聚集后的数据,这时我们便引入了窗口函数。窗口函数又叫OLAP函数/分析函数,窗口函数兼具分组和排序功能。
五分钟学大数据
2021/01/21
9600
hive窗口函数/分析函数详细剖析
Hive学习-lateral view 、explode、reflect和窗口函数
然后挂了FAILED: SemanticException [Error 10081]: UDTF's are not supported outside the SELECT clause, nor nested in expressions
顾翔
2024/09/10
4280
Hive学习-lateral view 、explode、reflect和窗口函数
Spark SQL/Hive实用函数大全
本篇文章主要介绍Spark SQL/Hive中常用的函数,主要分为字符串函数、JSON函数、时间函数、开窗函数以及在编写Spark SQL代码应用时实用的函数算子五个模块。
大数据学习与分享
2020/12/31
5.1K0
Hive窗口函数保姆级教程
在SQL中有一类函数叫做聚合函数,例如sum()、avg()、max()等等,这类函数可以将多行数据按照规则聚集为一行,一般来讲聚集后的行数是要少于聚集前的行数的。但是有时我们想要既显示聚集前的数据,又要显示聚集后的数据,这时我们便引入了窗口函数。窗口函数又叫OLAP函数/分析函数,窗口函数兼具分组和排序功能。
五分钟学大数据
2021/07/07
2.6K0
Hive窗口函数保姆级教程
Hive常见的分析函数
窗口排序函数提供了数据的排序信息,比如行号和排名。在一个分组的内部将行号或者排名作为数据的一部分进行返回,最常用的排序函数主要包括:
大数据老哥
2021/07/05
7710
Hive常见的分析函数
hive sql 窗口函数
1) 窗口函数 Lag, Lead, First_value,Last_value Lag, Lead、这两个函数为常用的窗口函数,可以返回上下数据行的数据. LAG(col,n,DEFAULT) 用于统计窗口内往上第n行值 LEAD(col,n,DEFAULT) 用于统计窗口内往下第n行值, 与LAG相反 -- 组内排序后,向后或向前偏移 -- 如果省略掉第三个参数,默认为NULL,否则补上。
用户1217611
2020/06/19
1.2K0
hive sql 窗口函数
Hive 中的排序和开窗函数
排序操作是一个比较常见的操作,尤其是在数据分析的时候,我们往往需要对数据进行排序,hive 中和排序相关的有四个关键字,今天我们就看一下,它们都是什么作用。
大数据真好玩
2021/09/18
1.8K0
Hive 中的排序和开窗函数
大数据学习之数据仓库代码题总结上
请编写 SQL 查询,计算从注册当天开始的每个用户在注册后第1天、第3天、第7天的学习留存率。留存率的计算方式是在注册后的特定天数内继续学习的用户数除以当天注册的用户总数。结果应包含日期、留存天数和留存率。
bxia的厨房_公众号
2024/03/05
2181
大数据学习之数据仓库代码题总结上
最强最全面的Hive SQL开发指南,超四万字全面解析!
hive -S -e 'select table_cloum from table' -S,终端上的输出不会有mapreduce的进度,执行完毕,只会把查询结果输出到终端上。
五分钟学大数据
2021/12/02
7.7K0
最强最全面的Hive SQL开发指南,超四万字全面解析!
窗口函数到底有多「神奇」?
实习和秋招笔面试的时候,SQL的考察必不可少,除了题目中会涉及业务背景外,大同小异的,大都考察聚合、表连接、窗口函数,尤以各种各样的窗口函数为重。
数据森麟
2021/01/25
7830
窗口函数到底有多「神奇」?
HIVE窗口函数
窗口函数的应用场景http://yugouai.iteye.com/blog/1908121
努力在北京混出人样
2019/02/18
1.5K0
HIVE窗口函数
CSDN博客地址:https://mp.csdn.net/mdeditor/81067060
努力在北京混出人样
2018/07/16
1.2K0
Hive根据用户自定义函数、reflect函数和窗口分析函数
(2)UDAF(User-Defined Aggregation Function)
顾翔
2024/09/10
3820
Hive根据用户自定义函数、reflect函数和窗口分析函数
一文学完所有的Hive Sql(两万字最全详解)
lateral view用于和split、explode等UDTF一起使用的,能将一行数据拆分成多行数据,在此基础上可以对拆分的数据进行聚合,lateral view首先为原始表的每行调用UDTF,UDTF会把一行拆分成一行或者多行,lateral view在把结果组合,产生一个支持别名表的虚拟表。
五分钟学大数据
2021/04/02
3.1K0
Hive SQL 大厂必考常用窗口函数及相关面试题
二、窗口函数的基本用法 1.基本语法 2.设置窗口的方法 1)window_name 2)partition by 子句 3) order by子句 4)rows 指定窗口大小 3.开窗函数中加order by 和 不加 order by的区别
王知无-import_bigdata
2022/11/11
3.8K0
Hive SQL 大厂必考常用窗口函数及相关面试题
postgreSQL窗口函数总结
1、我们都知道在SQL中有一类函数叫做聚合函数,例如sum()、avg()、max()等等,这类函数可以将多行数据按照规则聚集为一行,一般来讲聚集后的行数是要少于聚集前的行数的,但是有时我们想要既显示聚集前的数据,又要显示聚集后的数据,这时我们便引入了窗口函数。
小徐
2020/02/16
2.7K0
postgreSQL窗口函数总结
相关推荐
不要到处翻了 | Hive开窗函数总结与实践
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文