前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Spark】Spark Core Day04

【Spark】Spark Core Day04

作者头像
Maynor
发布2021-12-07 09:29:32
4240
发布2021-12-07 09:29:32
举报

Spark Day04:Spark Core

image-20210421210048055
image-20210421210048055

02-[了解]-今日课程内容提纲

主要讲解RDD函数,分为2类:Transformation转换函数和Action触发函数

代码语言:javascript
复制
RDD中函数:
	- 函数分类,不同类型函数功能
	- 常见函数概述
	- 5种类型RDD函数
		实际项目中使用最多的,必须要掌握
	- RDD 持久化函数
		可以将RDD分布式集合数据进行缓存,比如缓存到Executor内存中,再次处理数据时,直接从内存读取
	- RDD Checkpoint
		将RDD数据保存到可靠文件系统中,比如HDFS

首先创建Maven Module模块,编写好代码模块,讲解某个知识点时,在编写核心代码

image-20210422145413647
image-20210422145413647

03-[掌握]-RDD 函数分类

RDD 的操作主要可以分为 TransformationAction 两种。

  • Transformation 转换,将1个RDD转换为另一个RDD
  • Action 触发,当1个RDD调用函数以后,触发一个Job执行(调用Action函数以后,返回值不是RDD)
image-20210422145748863
image-20210422145748863

官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations

image-20210422145952097
image-20210422145952097

RDD中2种类型操作函数:Transformation(lazy)和Action(eager)函数

image-20210422150024443
image-20210422150024443
  • Transformation转换函数
image-20210422150238781
image-20210422150238781
  • Action触发函数,触发一个Job执行

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-a1fQcH5e-1638793130130)(/img/image-20210422150349862.png)]

04-[了解]-RDD 中常见函数概述

RDD中包含很多函数,主要可以分为两类:Transformation转换函数和Action函数。

image-20210422150627210
image-20210422150627210

主要常见使用函数如下,每个函数通过演示范例讲解。

image-20210422150702004
image-20210422150702004
代码语言:javascript
复制
1、分区操作函数
	对RDD中每个分区数据进行操作
	
2、重分区函数
	调整RDD中分区数目,要么变大,要么变小

3、聚合函数
	对RDD中数据进行聚合统计,比如使用reduce、redueBykey等
	
4、关联函数
	对2个RDD进行JOIN操作,类似SQL中JOIN,分为:等值JOIN、左外连接和右外连接、全外连接fullOuterJoin

RDD函数练习:运行spark-shell命令行,在本地模式运行,执行函数使用

image-20210422151116860
image-20210422151116860

05-[掌握]-RDD 函数之基本函数使用

​ RDD中map、filter、flatMap及foreach等函数为最基本函数,都是对RDD中每个元素进行操作,将元素传递到函数中进行转换

image-20210422151732695
image-20210422151732695

编写词频统计WordCount程序,使用基本函数

代码语言:javascript
复制
package cn.itcast.spark.func.basic

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 演示RDD中基本函数使用
 */
object _01SparkBasicTest {
	
	def main(args: Array[String]): Unit = {
		// 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息
		val sc: SparkContext = {
			// a. 创建SparkConf对象
			val sparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// b. 传递sparkConf对象,构建SparkContext实例
			SparkContext.getOrCreate(sparkConf)
		}
		
		// step1. 读取数据
		val inputRDD: RDD[String] = sc.textFile("datas/wordcount/input.data", minPartitions = 2)
		
		// step2. 处理数据
		val resultRDD: RDD[(String, Int)] = inputRDD
			// 过滤数据
			.filter(line => null != line && line.trim.length > 0)
			// 分割单词
			.flatMap(line => line.trim.split("\\s+"))
			// 转换为二元组
			.map(word => word -> 1)
			// 按照单词分组,对组内数据进行聚合求和
			.reduceByKey((tmp, item) => tmp + item) // TODO: 隐式转换,将RDD对象抓好为PairRDDFunctions对象,调用方法
		
		// step3. 输出数据
		resultRDD.foreach(item => println(item))
		
		// 应用结束,关闭资源
		sc.stop()
	}
	
}

06-[掌握]-RDD 函数之分区操作函数

每个RDD由多分区组成的,实际开发建议对每个分区数据的进行操作,map函数使用mapPartitions代替foreach函数使用foreachPartition代替前面编写WordCount词频统计代码中,使用map函数和forearch函数,针对RDD中每个元素操作,并不是针对每个分区数据操作的,如果针对分区操作:mapPartitions和foreachPartition

image-20210422153554948
image-20210422153554948

针对分区数据进行操作时,函数的参数类型:迭代器Iterator,封装分区中所有数据

针对词频统计WordCount代码进行修改,针对分区数据操作,范例代码如下:

代码语言:javascript
复制
package cn.itcast.spark.func.iter

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 分区操作函数:mapPartitions和foreachPartition
 */
object _02SparkIterTest {
	
	def main(args: Array[String]): Unit = {
		// 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息
		val sc: SparkContext = {
			// a. 创建SparkConf对象
			val sparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// b. 传递sparkConf对象,构建SparkContext实例
			SparkContext.getOrCreate(sparkConf)
		}
		
		// step1. 读取数据
		val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data", minPartitions = 2)
		
		// step2. 处理数据
		val resultRDD: RDD[(String, Int)] = inputRDD
			// 过滤数据
			.filter(line => line.trim.length != 0 )
			// 对每行数据进行单词分割
			.flatMap(line => line.trim.split("\\s+"))
			// 转换为二元组
    		//.map(word => word -> 1)
			/*
			  def mapPartitions[U: ClassTag](
			      f: Iterator[T] => Iterator[U],
			      preservesPartitioning: Boolean = false
			  ): RDD[U]
			 */
    		.mapPartitions(iter => iter.map(word => (word, 1)))
			// 分组聚合
			.reduceByKey((tmp, item) => tmp + item)
		
		// step3. 输出数据
		//resultRDD.foreach(item => println(item))
		/*
		  def foreachPartition(f: Iterator[T] => Unit): Unit
		 */
		resultRDD.foreachPartition(iter => iter.foreach(item => println(item)))
		
		// 应用结束,关闭资源
		sc.stop()
	}
	
}

为什么要对分区操作,而不是对每个数据操作,好处在哪里呢???

image-20210422154406031
image-20210422154406031

07-[掌握]-RDD 函数之重分区函数

如何对RDD中分区数目进行调整(增加分区或减少分区),在RDD函数中主要有如下三个函数。

image-20210422154745033
image-20210422154745033
代码语言:javascript
复制
上述2个函数最为关键:
	- 增加RDD分区数目:repartition
	- 减少RDD分区数目:coalesce,不产生Shuffle
代码语言:javascript
复制
package cn.itcast.spark.func.iter

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 分区操作函数:mapPartitions和foreachPartition
 */
object _02SparkPartitionTest {
	
	def main(args: Array[String]): Unit = {
		// 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息
		val sc: SparkContext = {
			// a. 创建SparkConf对象
			val sparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// b. 传递sparkConf对象,构建SparkContext实例
			SparkContext.getOrCreate(sparkConf)
		}
		
		// step1. 读取数据
		val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data", minPartitions = 2)
		println(s"raw rdd partitions = ${inputRDD.getNumPartitions}")
		
		// TODO: 增加RDD分区数目
		val etlRDD: RDD[String] = inputRDD.repartition(3)
		println(s"etl rdd partitions = ${etlRDD.getNumPartitions}")
		
		// step2. 处理数据
		val resultRDD: RDD[(String, Int)] = inputRDD
			// 过滤数据
			.filter(line => line.trim.length != 0 )
			// 对每行数据进行单词分割
			.flatMap(line => line.trim.split("\\s+"))
			// 转换为二元组
    		//.map(word => word -> 1)
			/*
			  def mapPartitions[U: ClassTag](
			      f: Iterator[T] => Iterator[U],
			      preservesPartitioning: Boolean = false
			  ): RDD[U]
			 */
    		.mapPartitions(iter => iter.map(word => (word, 1)))
			// 分组聚合
			.reduceByKey((tmp, item) => tmp + item)
		
		// step3. 输出数据
		//resultRDD.foreach(item => println(item))
		/*
		  def foreachPartition(f: Iterator[T] => Unit): Unit
		 */
		// TODO: 降低结果RDD分区数目
		val outputRDD: RDD[(String, Int)] = resultRDD.coalesce(1)
		println(s"output rdd partitions = ${outputRDD.getNumPartitions}")
		outputRDD.foreachPartition(iter => iter.foreach(item => println(item)))
		
		// 应用结束,关闭资源
		sc.stop()
	}
	
}

在实际开发中,什么时候适当调整RDD的分区数目呢?让程序性能更好好呢????

image-20210422155319278
image-20210422155319278

08-[掌握]-RDD 函数之RDD 中聚合函数

​ 回顾列表List中reduce聚合函数核心概念:聚合的时候,往往需要聚合中间临时变量。查看列表List中聚合函数reduce和fold源码如下:

image-20210422160633985
image-20210422160633985

通过代码,看看列表List中聚合函数使用:

image-20210422160739389
image-20210422160739389

运行截图如下所示:

image-20210422160823574
image-20210422160823574

fold聚合函数,比reduce聚合函数,多提供一个可以初始化聚合中间临时变量的值参数:

image-20210422160851002
image-20210422160851002

聚合操作时,往往聚合过程中需要中间临时变量(到底时几个变量,具体业务而定),如下案例:

image-20210422160914522
image-20210422160914522

在RDD中提供类似列表List中聚合函数reduce和fold,查看如下:

image-20210422161035258
image-20210422161035258

案例演示:求列表List中元素之和,RDD中分区数目为2,核心业务代码如下:

image-20210422161117193
image-20210422161117193

运行结果解析如下:

image-20210422162324117
image-20210422162324117

查看RDD中高级聚合函数aggregate,函数声明如下:

image-20210422164405671
image-20210422164405671

业务需求:对RDD中数据进行求和sum。

代码语言:javascript
复制
		// TODO:aggregate函数,累计求和
		/*
		def aggregate[U: ClassTag]
		(zeroValue: U)
		(
		   seqOp: (U, T) => U,
		   combOp: (U, U) => U
		): U
		 */
		val aggSum: Int = datasRDD.aggregate(0)(
			// seqOp: (U, T) => U    分区内数据聚合
			(tmp: Int, item: Int) => {
				println(s"seq -> p-${TaskContext.getPartitionId()}: tmp = ${tmp}, item = ${item}, sum = ${tmp + item}")
				tmp + item
			},
			// combOp: (U, U) => U    分区间数据聚合
			(tmp, item) => {
				println(s"comb -> p-${TaskContext.getPartitionId()}: tmp = ${tmp}, item = ${item}, sum = ${tmp + item}")
				tmp + item
			}
		)
		println(s"aggSum = ${aggSum}")
image-20210422165334554
image-20210422165334554

09-[掌握]-RDD 函数之PairRDDFunctions 聚合函数

​ 在Spark中有一个object对象PairRDDFunctions,主要针对RDD的数据类型是Key/Value对的数据提供函数,方便数据分析处理。比如使用过的函数:reduceByKey、groupByKey等。

*ByKey函数将相同Key的Value进行聚合操作的,省去先分组再聚合。

image-20210422165601819
image-20210422165601819
  • 第一类:分组函数groupByKey
image-20210422165638229
image-20210422165638229
  • 第二类:分组聚合函数reduceByKey和foldByKey
image-20210422165827493
image-20210422165827493
  • 第三类:分组聚合函数aggregateByKey
image-20210422165937654
image-20210422165937654

​ 在企业中如果对数据聚合使用,不能使用reduceByKey完成时,考虑使用aggregateByKey函数,基本上都能完成任意聚合功能。

10-[掌握]-RDD 函数之关联JOIN函数

当两个RDD的数据类型为二元组Key/Value对时,可以依据Key进行关联Join。

image-20210422170653905
image-20210422170653905

RDD中关联JOIN函数都在PairRDDFunctions中,具体截图如下:

image-20210422170821018
image-20210422170821018

具体看一下join(等值连接)函数说明:

image-20210422170848690
image-20210422170848690

范例演示代码:

代码语言:javascript
复制
package cn.itcast.spark.func.join

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * RDD中关联函数Join,针对RDD中数据类型为Key/Value对
 */
object _04SparkJoinTest {
	
	def main(args: Array[String]): Unit = {
		// 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息
		val sc: SparkContext = {
			// a. 创建SparkConf对象
			val sparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// b. 传递sparkConf对象,构建SparkContext实例
			SparkContext.getOrCreate(sparkConf)
		}
		
		// 模拟数据集
		val empRDD: RDD[(Int, String)] = sc.parallelize(
			Seq((1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhaoliu"))
		)
		val deptRDD: RDD[(Int, String)] = sc.parallelize(
			Seq((1001, "sales"), (1002, "tech"))
		)
		
		// TODO: 等值连接
		//                deptno  empname  deptname
		val joinRDD: RDD[(Int, (String, String))] = empRDD.join(deptRDD)
		joinRDD.foreach{case (deptno, (empname, deptname)) =>
			println(s"deptno = ${deptno}, empname = ${empname}, deptname = ${deptname}")
		}
		
		println("======================================================")
		// TODO:左外连接
		val leftRDD: RDD[(Int, (String, Option[String]))] = empRDD.leftOuterJoin(deptRDD)
		leftRDD.foreach{case (deptno, (empname, option)) =>
			val deptname: String = option match {
				case Some(name) => name
				case None => "未知"
			}
			println(s"deptno = ${deptno}, empname = ${empname}, deptname = ${deptname}")
		}
		
		// 应用结束,关闭资源
		sc.stop()
	}
	
}

11-[掌握]-RDD 持久化

​ 在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。 将RDD数据进行缓存时,本质上就是将RDD各个分区数据进行缓存

image-20210422172010838
image-20210422172010838
  • 缓存函数

可以将RDD数据直接缓存到内存中,函数声明如下:

image-20210422172128993
image-20210422172128993

​ 但是实际项目中,不会直接使用上述的缓存函数,RDD数据量往往很多,内存放不下的。在实际的项目中缓存RDD数据时,往往使用如下函数,依据具体的业务和数据量,指定缓存的级别:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bxNtlFD7-1638793130145)(/img/image-20210422172215367.png)]

  • 缓存级别

在Spark框架中对数据缓存可以指定不同的级别,对于开发来说至关重要,如下所示:

image-20210422172241510
image-20210422172241510

实际项目中缓存数据时,往往选择如下两种级别:

image-20210422172519961
image-20210422172519961

缓存函数与Transformation函数一样,都是Lazy操作,需要Action函数触发,通常使用count函数触发

image-20210422172557086
image-20210422172557086
  • 释放缓存

缓存的RDD数据,不再被使用时,考虑释资源,使用如下函数:

image-20210422172704697
image-20210422172704697

此函数属于eager,立即执行。

  • 何时缓存数据

在实际项目开发中,什么时候缓存RDD数据,最好呢???

image-20210422172816627
image-20210422172816627

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TKk5WJgJ-1638793130147)(img/image-20210422172821282.png)]

image-20210422172916683
image-20210422172916683

12-[了解]-RDD Checkpoint

​ RDD 数据可以持久化,但是持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。 Checkpoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在在HDFS上,这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用。

​ 在Spark Core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;

image-20210422174859969
image-20210422174859969

案例演示代码如下:

代码语言:javascript
复制
package cn.itcast.spark.ckpt

import org.apache.spark.{SparkConf, SparkContext}

/**
 * RDD数据Checkpoint设置,案例演示
 */
object _06SparkCkptTest {
	
	def main(args: Array[String]): Unit = {
		// 创建应用程序入口SparkContext实例对象
		val sc: SparkContext = {
			// 1.a 创建SparkConf对象,设置应用的配置信息
			val sparkConf: SparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// 1.b 传递SparkConf对象,构建Context实例
			new SparkContext(sparkConf)
		}
		
		// TODO: 设置检查点目录,将RDD数据保存到那个目录
		sc.setCheckpointDir("datas/ckpt/")
		
		// 读取文件数据
		val datasRDD = sc.textFile("datas/wordcount.data")
		
		// TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发
		datasRDD.checkpoint()
		datasRDD.count()
		
		
		// TODO: 再次执行count函数, 此时从checkpoint读取数据
		println(datasRDD.count())
		
		
		// 应用程序运行结束,关闭资源
		Thread.sleep(1000000000)
		sc.stop()
	}
	
}

面试题:持久化和Checkpoint的区别:

image-20210422175315464
image-20210422175315464
image-20210422175450596
image-20210422175450596
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-12-06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark Day04:Spark Core
    • 02-[了解]-今日课程内容提纲
      • 03-[掌握]-RDD 函数分类
        • 04-[了解]-RDD 中常见函数概述
          • 05-[掌握]-RDD 函数之基本函数使用
            • 06-[掌握]-RDD 函数之分区操作函数
              • 07-[掌握]-RDD 函数之重分区函数
                • 08-[掌握]-RDD 函数之RDD 中聚合函数
                  • 09-[掌握]-RDD 函数之PairRDDFunctions 聚合函数
                    • 10-[掌握]-RDD 函数之关联JOIN函数
                      • 11-[掌握]-RDD 持久化
                        • 12-[了解]-RDD Checkpoint
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档