我是spark GraphX的新手,正在尝试分布式地计算特定节点(例如ID =1的节点)与spark graphx中的邻居之间的交集。
我已经用路径加载了边缘列表(sc,“GraphLoader.edgeListFile”)。然后,我使用collectNeighborIds找到节点ID =1的邻居id,并对其执行映射函数,以找到每个邻居并计算与选定节点( ID =1的节点)邻居的交集。下面是代码。
val graph = GraphLoader.edgeListFile(sc,path to edgelist)
val node_collect_neighborsId1 = graph.collectNeighborIds(EdgeDirection.Either).filter(x=> x._1 == 1)
val node1_neighbors_ID=node_collect_neighborsId1.flatMap(x=> x._2)
def compute_intersection (vertex :VertexId) = {
var node2_collect_neighborsId: RDD[(VertexId, Array[VertexId])] = graph.collectNeighborIds(EdgeDirection.Either).filter(x=> x._1 == vertex)
var node2_neighbors_ID=node2_collect_neighborsId.flatMap(x=> x._2)
var intersect_two_node = node1_neighbors_ID.intersection(node2_neighbors_ID)
(vertex, intersect)
}
val result = node1_neighbors_ID.map(compute_intersection)
我期望在最后,结果变量应该包含包含顶点id的行,顶点id是邻居的id,以及两组节点邻居之间的公共节点,我们称之为它们的交集。但是我不能把它们打印出来,看看里面是什么。请帮我解决这个计算交集和打印结果的问题
发布于 2019-08-21 11:18:12
不能生成RDD[RDD[T]]
类型的结果。因此,您不应该为map
中的每个邻居计算交集。
您可以使用aggregateMessages
计算所有目标邻居的交叉点
def computeIntersection[VD, ED](graph: Graph[VD, ED], targetVertexId: Long): VertexRDD[List[Long]] = {
//mark the target's neighbors
val verticesWithTargetNeighborFlag = graph.aggregateMessages[Boolean](
triplet => {
if(triplet.srcId == targetVertexId && triplet.dstId != targetVertexId) {
triplet.sendToDst(true)
} else if(triplet.dstId == targetVertexId && triplet.dstId != targetVertexId) {
triplet.sendToSrc(true)
}
},
(msg1, msg2) => msg1 || msg2,
TripletFields.None
)
val graphWithTargetNeighborFlag = Graph(verticesWithTargetNeighborFlag, edges)
//collect intersection vertices for each target's neighbor
val verticesWithIntersection = graphWithTargetNeighborFlag.aggregateMessages[List[Long]](
triplet => if (triplet.srcAttr && triplet.dstAttr) { //both are target's neighbors
triplet.sendToDst(List(triplet.srcId))
triplet.sendToSrc(List(triplet.dstId))
},
(msg1, msg2) => msg1 ::: msg2,
TripletFields.All
)
verticesWithIntersection
}
并且可以使用collect
打印RDD元素
rdd.collect().foreach(println)
https://stackoverflow.com/questions/57573911
复制