专栏首页Spark学习技巧百度面试题:Spark 实现PageRank

百度面试题:Spark 实现PageRank

PageRank算法简介 PageRank是执行多次连接的一个迭代算法,因此它是RDD分区操作的一个很好的用例。算法会维护两个数据集:一个由(pageID,linkList)的元素组成,包含每个页面的相邻页面的列表;另一个由(pageID,rank)元素组成,包含每个页面的当前排序值。它按如下步骤进行计算。

  1. 将每个页面的排序值初始化为1.0。
  2. 在每次迭代中,对页面p,向其每个相邻页面(有直接链接的页面)发送一个值为rank(p)/numNeighbors(p)的贡献值。
  3. 将每个页面的排序值设为0.15 + 0.85 * contributionsReceived。

最后两个步骤会重复几个循环,在此过程中,算法会逐渐收敛于每个页面的实际PageRank值。在实际操作中,收敛通常需要大约10轮迭代。 模拟数据 假设一个由4个页面组成的小团体:A,B,C和D。相邻页面如下所示: A:B C B:A C C:A B D D:C

object SparkPageRank {

  def showWarning() {
    System.err.println(
      """WARN: This is a naive implementation of PageRank and is given as an example!
        |Please use the PageRank implementation found in org.apache.spark.graphx.lib.PageRank
        |for more conventional use.
      """.stripMargin)
  }

  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: SparkPageRank <file> <iter>")
      System.exit(1)
    }

    showWarning()

    val spark = SparkSession
      .builder
      .appName("SparkPageRank")
      .getOrCreate()

    val iters = if (args.length > 1) args(1).toInt else 10
    val lines = spark.read.textFile(args(0)).rdd
    val links = lines.map{ s =>
      val parts = s.split("\\s+")
      (parts(0), parts(1))
    }.distinct().groupByKey().cache()
    var ranks = links.mapValues(v => 1.0)

    for (i <- 1 to iters) {
      val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
        val size = urls.size
        urls.map(url => (url, rank / size))
      }
      ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
    }

    val output = ranks.collect()
    output.foreach(tup => println(s"${tup._1} has rank:  ${tup._2} ."))

    spark.stop()
  }
}

本文分享自微信公众号 - Spark学习技巧(bigdatatip)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-09-04

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • spark过节监控告警系统实现

    马上要过年了,大部分公司这个时候都不会再去谋求开新业务,而大数据工匠们,想要过好年,就要保证过年期间自己对自己的应用了如执掌。一般公司都会有轮值人员,至少要有春...

    Spark学习技巧
  • 必知|Scala类型层次结构

    java的除了原始类型的所有类都有一个默认的父类Object,那么scala的统一父类是什么呢?这个是有人在群里问浪尖的一个问题,今天浪尖就给大家讲解一下Sca...

    Spark学习技巧
  • Kafka源码系列之实现自己的kafka监控

    一,基本思路介绍 Kafka作为一个好用的且应用很广泛的消息队列,在大数据处理系统中基本是必不可少的。当然,作为缓存消息的消息队列,我们对其进行流量监控及消费滞...

    Spark学习技巧
  • 在Spark上用LDA计算文本主题模型

    在新闻推荐中,由于新闻主要为文本的特性,基于内容的推荐(Content-based Recommendation)一直是主要的推荐策略。基于内容的策略主要思路是...

    星回
  • restapi(3)- MongoDBEngine : MongoDB Scala编程工具库

    最近刚好有同事在学习MongoDB,我们讨论过MongoDB应该置于服务器端然后通过web-service为客户端提供数据的上传下载服务。我们可以用上节讨论的r...

    用户1150956
  • 第六节盒子模型和盒子模型偏移量

    河湾欢儿
  • PICE(3):CassandraStreaming - gRPC-CQL Service

      在上一篇博文里我们介绍了通过gRPC实现JDBC数据库的streaming,这篇我们介绍关于cassandra的streaming实现方式。如果我们需要从一...

    用户1150956
  • Akka-CQRS(8)- CQRS Reader Actor 应用实例

    前面我们已经讨论了CQRS-Reader-Actor的基本工作原理,现在是时候在之前那个POS例子里进行实际的应用示范了。

    用户1150956
  • SDP(12): MongoDB-Engine - Streaming

       在akka-alpakka工具包里也提供了对MongoDB的stream-connector,能针对MongoDB数据库进行streaming操作。这个M...

    用户1150956
  • Flutter 使用Navigator进行局部跳转页面的方法

    Navigator组件使用的频率不是很高,但在一些场景下非常适用,比如局部表单多页填写、底部导航一直存在,每个tab各自导航场景。

    砸漏

扫码关注云+社区

领取腾讯云代金券