首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何用Scala在Spark GraphX中寻找特定节点与其相邻节点的交集

如何用Scala在Spark GraphX中寻找特定节点与其相邻节点的交集
EN

Stack Overflow用户
提问于 2019-08-20 20:40:03
回答 1查看 580关注 0票数 2

我是spark GraphX的新手,正在尝试分布式地计算特定节点(例如ID =1的节点)与spark graphx中的邻居之间的交集。

我已经用路径加载了边缘列表(sc,“GraphLoader.edgeListFile”)。然后,我使用collectNeighborIds找到节点ID =1的邻居id,并对其执行映射函数,以找到每个邻居并计算与选定节点( ID =1的节点)邻居的交集。下面是代码。

代码语言:javascript
运行
复制
val graph = GraphLoader.edgeListFile(sc,path to edgelist)
val node_collect_neighborsId1 = graph.collectNeighborIds(EdgeDirection.Either).filter(x=> x._1 == 1)

val node1_neighbors_ID=node_collect_neighborsId1.flatMap(x=> x._2)

def compute_intersection (vertex :VertexId) = {


  var node2_collect_neighborsId: RDD[(VertexId, Array[VertexId])] = graph.collectNeighborIds(EdgeDirection.Either).filter(x=> x._1 == vertex)

  var node2_neighbors_ID=node2_collect_neighborsId.flatMap(x=> x._2)

  var intersect_two_node = node1_neighbors_ID.intersection(node2_neighbors_ID)

  (vertex, intersect)

}

val result = node1_neighbors_ID.map(compute_intersection)

我期望在最后,结果变量应该包含包含顶点id的行,顶点id是邻居的id,以及两组节点邻居之间的公共节点,我们称之为它们的交集。但是我不能把它们打印出来,看看里面是什么。请帮我解决这个计算交集和打印结果的问题

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-08-21 19:18:12

不能生成RDD[RDD[T]]类型的结果。因此,您不应该为map中的每个邻居计算交集。

您可以使用aggregateMessages计算所有目标邻居的交叉点

代码语言:javascript
运行
复制
def computeIntersection[VD, ED](graph: Graph[VD, ED], targetVertexId: Long): VertexRDD[List[Long]] = {
  //mark the target's neighbors
  val verticesWithTargetNeighborFlag = graph.aggregateMessages[Boolean](
    triplet => {
      if(triplet.srcId == targetVertexId && triplet.dstId != targetVertexId) {
        triplet.sendToDst(true)
      } else if(triplet.dstId == targetVertexId && triplet.dstId != targetVertexId) {
        triplet.sendToSrc(true)
      }
    },
    (msg1, msg2) => msg1 || msg2,
    TripletFields.None
  )
  val graphWithTargetNeighborFlag = Graph(verticesWithTargetNeighborFlag, edges)
  //collect intersection vertices for each target's neighbor
  val verticesWithIntersection = graphWithTargetNeighborFlag.aggregateMessages[List[Long]](
    triplet => if (triplet.srcAttr && triplet.dstAttr) { //both are target's neighbors
      triplet.sendToDst(List(triplet.srcId))
      triplet.sendToSrc(List(triplet.dstId))
    },
    (msg1, msg2) => msg1 ::: msg2,
    TripletFields.All
  )
  verticesWithIntersection
}

并且可以使用collect打印RDD元素

代码语言:javascript
运行
复制
rdd.collect().foreach(println)
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57573911

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档