Spark2.x学习笔记:7、Spark应用程序设计

7、 Spark应用程序设计

7.1 基本流程

  • 1.创建SparkContext对象
    • 每个Spark应用程序有且仅有一个SparkContext对象,封装了Spark执行环境信息
  • 2.创建RDD
    • 可以冲Scala集合或者Hadoop数据集上创建
  • 3.在RDD之上进行转换和Action
    • MapReduce只提供了map和reduce两种操作,而Spark提供了多种转换和action函数
  • 4.返回结果
    • 保存到HDFS中,或者直接输出到终端

7.2 创建SparkContext对象

(1)创建SparkConf对象

val conf=new SparkConf()
conf.setAppName(appName)
conf.set(“spark.app.name”,”MySpark”)
conf.set(“spark.yarn.queue”,”infrastructure”)

不过,不建议这样设置参数。可以在提交Spark作业时,通过spark-submit –conf设置。 (2)创建SparkContext对象,封装了调度器等信息

val sc=new SparkContext(conf)

7.3 创建RDD

(1)Scala集合

sc.parallelize(List(1,2,3),2)

(2)本地文件/HDFS文件

  • 1) 文本文件
sc.textFile(“file:///data/a.txt”)  //将本地文件加载成RDD
sc.textFile(“hdfs:///data/inpt”)
sc.textFile(“hdfs://nn:9000/path”)//HDFS文件或目录

以hdfs://开头的文件表示HDFS上的文件,以hdfs://开头的文件表示本地文件; - 2) sequenceFile文件 处理图片、语音、视频等二进制文件

sc.sequenceFile(“file.mp3”)
sc.sequenceFile[String,Int](“hdfs://nn:9000/path”)

(3)使用任意自定义的Hadoop InputFormat

sc.hadoopFile(path,inputFmt,keyClass,valCLass)

7.4 在RDD之上进行转换和Action

  • Transformation:将一个RDD通过一种规则,映射成另一种RDD;
  • Action:返回结果或者保存结果,只有action才出发程序的执行。

(1)RDD transformation

//创建RDD
val listRdd =sc.parallelize(List(1,2,3),3)
//将RDD传入函数,生成新的RDD
val squares =listRdd.map(x=>x*x)//{1,4,9}
//对RDD中的元素进行过滤,生产新的RDD
val even=sequres.filter(_%2==0)//{4}
//将一个元素映射成多个,生成新的RDD
nums.flatMap(x=>1 to x)//{1,1,2,1,2,3}

注解:

  • map:一一映射,元素数量不变
  • filter:过滤,输出元素数量小于等于
  • flatMap:展开,放大,输出元素数大于原来

(2)RDD Action

//创建新的RDD
val nums=sc.parallelize(List(1,2,3),2)

//将RDD保存为本地集合(返回到driver端)
nums.collect()   //Array(1,2,3)

//返回前k个元素
nums.take(2)//Array(1,2)

//计算集合大小
nums.count()//3

合并集合元素
nums.reduce(_+_)//6

//将RDD写到HDFS中,注意该输出目录不能存在,Hadoop自动创建
//输出文件数和patition数相同
nums.saveAsTextFile(“hdfs://nn:8020/output”)
nums.saveAsSequenceFile(“hdfs://nn:8020/output”)

7.5 Key/Value类型RDD操作

(1)KV型的RDD

Spark提供了强大的算子来处理KV型的RDD

Val pets=sc.parallelize(List((“cat”,1),(“dog”,1,(“cat”,2)) ))
pets.reduceByKey(_+_) //{(“cat”,3),(“dog”,1)}
pets.groupByKey() //{(“cat”,seq(1,2)),(dog,seq(1))}
pets.sortByKey()  //{(“cat”,1),(“cat”,2),(“dog”,1)}

(2)级联操作 由于Transformation返回都是RDD,所以可以将Transformation进行级联操作, 比如

val resultRdd = rowRdd.flatMap(line => line.split("\\s+"))
                      .map(word => (word, 1))
                      .reduceByKey(_ + _)

7.6 join

  • def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
  • def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
  • def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
  • def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

说明: 1)cogroup函数对两个RDD(如:(K,V)和(K,W))相同Key的元素先分别做聚合,最后返回(K,Iterator,Iterator)形式的RDD。numPartitions设置分区数,提高作业并行度。 2)join相当于SQL中的内关联join,只返回两个RDD根据K可以关联上的结果,join只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。numPartitions设置分区数,提高作业并行度

package cn.hadron

import org.apache.spark._

object JoinDemo {
  def main(args: Array[String]) {
    val masterUrl = "local[1]"

    val sparkConf = new SparkConf().setMaster(masterUrl).setAppName("JoinDemo")
    val sc = new SparkContext(sparkConf)

    val visits=sc.parallelize(List(
      ("index.jsp","192.168.1.100"),("about.jsp","192.168.1.101"),("index.jsp","192.168.1.102")
    ))
    val pageNames=sc.parallelize(List(
      ("index.jsp","Home"),("about.jsp","About")
    ))
    val cogRdd=visits.cogroup(pageNames)
    cogRdd.take(2).foreach(println)
    println("--------------")
    val joinRdd=visits.join(pageNames)
    joinRdd.take(3).foreach(println)
  }
}

输出结果

(index.jsp,(CompactBuffer(192.168.1.100, 192.168.1.102),CompactBuffer(Home)))
(about.jsp,(CompactBuffer(192.168.1.101),CompactBuffer(About)))
--------------
(index.jsp,(192.168.1.100,Home))
(index.jsp,(192.168.1.102,Home))
(about.jsp,(192.168.1.101,About))

7.7 cache

(1)Spark RDD Cache允许将RDD缓存到内存中,以便重用 (2)Spark提供了多种缓存级别,以便用户根据实际需求进行调整 rdd.chache()等价于rdd.persist(StorageLevel.DISK_ONLY_2) (3)实例分析

val rdd=sc.textFile("hdfs://master:8020/input")
rdd.chache()
rdd.fileter(_.startWith("error")).count
rdd.fileter(_.endWith("hadoop")).count
rdd.fileter(_.endWith("hbase")).count

上面代码使用cache后,从HDFS(磁盘)读取1次,之后从内存中读取3次 如果不使用chache,则上面代码从HDFS读取3次。

7.8 控制ReduceTask数目

所有Key/value型RDD操作符均包含一个整形可选参数,表示reduce task并发度。 比如: def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] 用户也可以通过修改spark.default.parallelism设置默认并行度(默认并行度是最初RDD partition数目)

7.9 其他RDD操作符

  • samaple():从数据集中采样
  • union():合并多个RDD
  • cartesian():求笛卡尔积
  • pipe():传入一个外部程序

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java开发者杂谈

RocketMQ专题2:三种常用生产消费方式(顺序、广播、定时)以及顺序消费源码探究

​ 在进行常用的三种消息类型例子展示的时候,我们先来说一说RocketMQ的几个重要概念:

971
来自专栏Android 研究

OKHttp源码解析(三)--中阶之线程池和消息队列

android的异步任务一般都是用Thread+Handler或者AsyncTask来实现,其中笔者当初经历过各种各样坑,特别是内存泄漏,当初笔者可是相当的欲死...

973
来自专栏Java Web

Java 面试知识点解析(二)——高并发编程篇

在遨游了一番 Java Web 的世界之后,发现了自己的一些缺失,所以就着一篇深度好文:知名互联网公司校招 Java 开发岗面试知识点解析 ,来好好的对 Jav...

4697
来自专栏精讲JAVA

java面试线程必备知识点,怼死面试官,从我做起

内存屏障:限制命令操作顺序,有LoadLoad、LoadStore、LoadStore、StroreStreo四种屏障

952
来自专栏Java Web

Java 面试知识点解析(二)——高并发编程篇

19510
来自专栏Java架构沉思录

Java中如何提升锁性能

比如100个人去银行办理业务,要填一百张表,但是只有一支笔,那么很显然,每个人用笔的时间越短,效率也就越高。看代码:

522
来自专栏yukong的小专栏

【java并发编程实战2】无锁编程CAS与atomic包1、无锁编程CAS2、 atomic族类

如果V值等于E值,则将V的值设为N。若V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。通俗的理解就是CAS操作需要我们提供一个期望值,当期望...

1033
来自专栏Golang语言社区

多线程编程10个例子--2

// TODO: Add extra initialization here m_ctrlProgress.SetRange(0,99); m_nMilliSe...

4637
来自专栏xdecode

Java高并发之锁优化

923
来自专栏蓝天

Kafka C++客户端库librdkafka笔记

librdkafka提供的异步的生产接口,异步的消费接口和同步的消息接口,没有同步的生产接口。

1501

扫码关注云+社区