Spark2.x学习笔记:9、 Spark编程实例

9、 Spark编程实例

9.1 SparkPi

package cn.hadron
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.math.random

object SparkPi {
  def main(args: Array[String]): Unit = {
    val masterUrl = "local[1]"
    val conf=new SparkConf().setMaster(masterUrl).setAppName("SparkPi")
    val sc=new SparkContext(conf)
    //启动Task数,默认2个
    val slices=if(args.length>0)args(0).toInt else 2
    // n是迭代次数(默认2w次),Int.MaxValue是防止溢出
    val n = math.min(100000L * slices, Int.MaxValue).toInt
    //默认两个patition,[1,100000]和[100001,20000]
    val count = sc.parallelize(1 until n, slices).map { i =>
      //产生的点范围[-1,1],圆心是(0,0)
      val x = random * 2 - 1
      val y = random * 2 - 1
      //如果产生的点落在圆内计数1,否则计数0
      if (x*x + y*y <= 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / (n - 1))
  }
}

9.2 平均值

(1)生成数据

[root@node1 data]# vi genAge.sh 
[root@node1 data]# cat genAge.sh 
#!/bin/sh

for i in {1..1000000};do
        echo -e $i'\t'$(($RANDOM%100))
done;
[root@node1 data]# sh genAge.sh > age.txt
[root@node1 data]# tail -10 age.txt 
999991  53
999992  63
999993  62
999994  14
999995  62
999996  27
999997  15
999998  99
999999  62
1000000 79

(2)上传到HDFS

[root@node1 data]# hdfs dfs -put age.txt input

(3)编写代码 AvgAge.scala

package cn.hadron
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object AvgAge {
  def main(args:Array[String]) {
    if (args.length < 1){
      println("Usage:AvgAge datafile")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile(args(0), 5);
    val count = rdd.count()
    val totalAge =rdd.map(line => line.split("\t")(1))
                      .map(age => Integer.parseInt(String.valueOf(age)))
                      .collect()
                      .reduce(_+_)
    println("Total Age:" + totalAge + ";Number of People:" + count )
    val avgAge : Double = totalAge.toDouble / count.toDouble
    println("Average Age is " + avgAge)
  }
}

(4)编译打包 (5)提交任务 spark-submit --master yarn --deploy-mode client --class cn.hadron.AvgAge /root/simpleSpark-1.0-SNAPSHOT.jar input/age.txt

[root@node1 ~]# spark-submit --master yarn --deploy-mode client --class cn.hadron.AvgAge /root/simpleSpark-1.0-SNAPSHOT.jar input/age.txt
17/09/22 10:30:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/09/22 10:30:56 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Total Age:49536760;Number of People:1000000                                     
Average Age is 49.53676
[root@node1 ~]# 

9.3 TopK

(1)问题描述 查找一个文本文件中词频最高的前K个词。 比如有1个txt格式的汉姆雷特Hamlet.txt,统计该文件中词频 最高的前10个。 (2)上传数据

[root@node1 data]# hdfs dfs -put Hamlet.txt input
[root@node1 data]# hdfs dfs -ls input
Found 3 items
-rw-r--r--   3 root supergroup     281498 2017-09-20 10:11 input/Hamlet.txt
-rw-r--r--   3 root supergroup         71 2017-08-27 09:18 input/books.txt
drwxr-xr-x   - root supergroup          0 2017-08-13 09:33 input/emp.bak
[root@node1 data]#

(3)spark-shell调试

[root@node1 data]# spark-shell
17/09/20 10:12:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/09/20 10:13:01 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/09/20 10:13:02 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/09/20 10:13:04 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.80.131:4040
Spark context available as 'sc' (master = local[*], app id = local-1505916766832).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val rdd1=sc.textFile("input/Hamlet.txt")
rdd1: org.apache.spark.rdd.RDD[String] = input/Hamlet.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> rdd1.count
res0: Long = 6878

scala> val rdd2=rdd1.flatMap(x=>x.split(" ")).filter(_.size>1)
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:26

scala> rdd2.take(2)
res1: Array[String] = Array(Hamlet, by)

scala> val rdd3=rdd2.map(x=>(x,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:28

scala> rdd3.take(2)
res2: Array[(String, Int)] = Array((Hamlet,1), (by,1))

scala> val rdd4=rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:30

scala> rdd4.take(3)
res3: Array[(String, Int)] = Array((rises.,1), (Let,35), (lug,1))

scala> val rdd5=rdd4.map{case(x,y)=>(y,x)}
rdd5: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[6] at map at <console>:32

scala> rdd5.take(2)
res4: Array[(Int, String)] = Array((1,rises.), (35,Let))

scala> val rdd6=rdd5.sortByKey(false)
rdd6: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[7] at sortByKey at <console>:34

scala> rdd6.take(2)
res5: Array[(Int, String)] = Array((988,the), (693,and))

scala> val rdd7=rdd6.map{case(a,b)=>(b,a)}
rdd7: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at map at <console>:36

scala> rdd7.take(10)
res6: Array[(String, Int)] = Array((the,988), (and,693), (of,621), (to,604), (my,441), (in,387), (HAMLET,378), (you,356), (is,291), (his,277))

scala> rdd7.take(10).foreach(println)
(the,988)
(and,693)
(of,621)
(to,604)
(my,441)
(in,387)
(HAMLET,378)
(you,356)
(is,291)
(his,277)

scala> 

(4)编写完整程序

package cn.hadron
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object TopK {
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      println("Usage:TopK KeyWordsFile K");
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("TopK Key Words")
    val sc = new SparkContext(conf)
    val rdd1 = sc.textFile(args(0))
    val result= rdd1.flatMap(x=>x.split(" "))
                    .filter(_.size>1)
                    .map(x=>(x,1))
                    .reduceByKey(_+_)
                    .map{case(x,y)=>(y,x)}
                    .sortByKey(false)
                    .map{case(a,b)=>(b,a)}
    result.take(10).foreach(println)
  }
}

(5)打包与上传

mvn package

(6)提交执行 spark-submit –master yarn –deploy-mode client –class cn.hadron.TopK /root/simpleSpark-1.0-SNAPSHOT.jar input/Hamlet.txt 10

[root@node1 ~]# spark-submit --master yarn --deploy-mode client --class cn.hadron.TopK /root/simpleSpark-1.0-SNAPSHOT.jar input/Hamlet.txt 10
17/09/21 09:48:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(the,988)
(and,693)
(of,621)
(to,604)
(my,441)
(in,387)
(HAMLET,378)
(you,356)
(is,291)
(his,277)
[root@node1 ~]# 

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏开发技术

spring-boot-2.0.3不一样系列之源码篇 - springboot源码一,绝对有值得你看的地方

  上篇:spring-boot-2.0.3不一样系列之shiro - 搭建篇,实现了spring-boot与shiro的整合,效果大家也看到了,工程确实集成了...

1602
来自专栏技术记录

前端插件——头像截图上传插件的使用(带后台)

效果图:实现上传头像,右边是预览,有三个大小,可以对头像进行裁剪 ? HTML: toParentData 和 img 返回的是图片裁剪后的base64编码。其...

6065
来自专栏菩提树下的杨过

重温delphi之控制台程序:Hello World!

这二天用c#开发ActiveX时,发现不管怎么弄,c#就是没办法生成ocx的纯正activeX控件,而且还要强迫用户安装巨大的.net framework(我只...

1878
来自专栏漏斗社区

Java代码审计| Spring框架知识篇

在上期的Java代码审计Spring框架思路篇中,斗哥为大家讲述了如何得到Spring审计的Demo,审计源码,根据Spring框架审计思路初步判定是否存在漏洞...

692
来自专栏用户2442861的专栏

Qt学习之路_14(简易音乐播放器)

http://www.cnblogs.com/tornadomeet/archive/2012/09/23/2699077.html

1351
来自专栏岑玉海

WF追忆

  前一阵子学习了一下工作流,现在写个总结记录一下这个过程。要弄工作流,首先就要有个界面来画图,做web的,没办法,只能选择javascript和silverl...

2706
来自专栏用户2442861的专栏

Qt学习之路_14(简易音乐播放器)

  这一节实现一个简易的音乐播放器,其音乐播放的核心功能是采用Qt支持的Phonon框架,该框架在前一篇博文Qt学习之路_13(简易俄罗斯方块) 中已经使用过...

1043
来自专栏编程心路

SSH框架之旅-spring(4)

下面对 SSH 框架做一个整合,所用的三大框架的版本号 Struts2.3.x,Spring4.x,hibernate5.x。

774
来自专栏游戏杂谈

使用phpexcel操作excel

使用这个组件最让我郁闷的是,它对sheet的名称为中文的不进行处理,暂时还没仔细去查原因。

692
来自专栏Flutter&Dart

DartVM服务器开发(第二十一天)--Dart中的Gson(jaguar_serializer)

将上面的Info改为List<Info> 重新运行命令pub run build_runner build 转换跟上面一样

961

扫码关注云+社区