前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >最最简单的~WordCount¬

最最简单的~WordCount¬

作者头像
用户3003813
发布2018-09-06 13:12:39
3390
发布2018-09-06 13:12:39
举报
文章被收录于专栏:个人分享个人分享
代码语言:javascript
复制
sc.textFile("hdfs://....").flatMap(line =>line.split(" ")).map(w =>(w,1)).reduceByKey(_+_).foreach(println)

不使用reduceByKey

代码语言:javascript
复制
sc.textFile("hdfs://....").flatMap(l=>l.split(" ")).map(w=>(w,1)).groupByKey().map((p:(String,Iterable[Int]))=>(p._1,p._2.sum)).collect

步骤1:textFile先生成HadoopRDD,然后再通过map操作生成MappedRDD.

结果:res0:org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13

步骤2:val split = line =>line.split(" ")).flatMap(line => line.split(" ")) flatMap将原来的MappedRDD转换为FlatMappedRDD

步骤3:val wordCount = split.map(w =>(w,1)) 利用w生成相应的键值对,上一步的FlatMappedRDD被转换为MappedRDD

步骤4:val reduce = wordCount.reduceByKey(_+_)

步骤5:reduce.foreach(println) 触发执行  

 在执行foreach时,调用了runJob函数,实现了重载。 Final RDD和作用于RDD上的Function。 然后读取Finall RDD的分区数,通过allowLocal来表示是否在Standalone模式下执行。

从spark-shell到sparkContext的创建的调用路径:

spark-shell -> spark-submit ->spark-class->sparkSubmit.main ->SparkILoop -> createSparkContext

SpackContext初始化过程中 传入的入参是SparkConf

一、根据初始化生成SparkConf,再根据SparkConf来创建SparkEnv.

二、创建TaskScheduler,根据Spark的运行模式选择相应的SchedulerBackend,同时启动TaskScheduler

代码语言:javascript
复制
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this,master,appName)
taskScheduler.start()

 createTaskScheduler最为关键,根据master环境变量来判断Spark当前的部署方式,从而生成相应的SchedulerBackend的不同子类。taskScheduler.start的目的是启动相应的SchedulerBackend.

三、从上一步创建的taskScheduler实例为入参创建DAGScheduler并启动运行。

代码语言:javascript
复制
private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
dagScheduler.start()

四、启动WebUI.

代码语言:javascript
复制
ui.start()
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2015-10-21 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档