前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Spark】 Spark的基础环境 Day03

【Spark】 Spark的基础环境 Day03

作者头像
Maynor
发布2022-05-08 13:59:34
4380
发布2022-05-08 13:59:34
举报

Spark Day03:Spark 基础环境

02-[了解]-今日课程内容提纲

主要讲解2个方面内容:Spark on YARN集群和RDD 是什么

代码语言:javascript
复制
1、Spark on YARN
	将Spark应用程序,提交运行到YARN集群上,企业中绝大多数运行模式,必须掌握
	- 如何配置
	- 提交应用运行
	- Spark应用运行在集群上2种Deploy-Mode
	- yarn-client模式
	- yarn-cluster模式

2、RDD是什么
	RDD,弹性分布式数据集,抽象概念,相当于集合,比如列表List,分布式集合,存储海量数据
	引入RDD数据结构
	RDD 官方定义,从文档和源码
	RDD 5大特性(面试必问)
	词频统计WordCount查看RDD有哪些
	
	RDD创建方式,如何将数据封装到RDD集合中,2种方式
	创建RDD时,如何处理小文件(面试)

03-[掌握]-Spark on YARN之属性配置和服务启动

将Spark Application提交运行到YARN集群上,至关重要,企业中大多数都是运行在YANR上 文档:http://spark.apache.org/docs/2.4.5/running-on-yarn.html

​ 当Spark Application运行到YARN上时,在提交应用时指定master为yarn即可,同时需要告知YARN集群配置信息(比如ResourceManager地址信息),此外需要监控Spark Application,配置历史服务器相关属性

image-20210421083743595
image-20210421083743595

在实际项目中,只需要配置:6.1.1 至 6.1.4即可,由于在虚拟机上测试,所以配置6.1.5解除资源检查限制。

04-[掌握]-Spark on YARN之提交应用

先将圆周率PI程序提交运行在YARN上,命令如下:

代码语言:javascript
复制
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--class org.apache.spark.examples.SparkPi \
${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \
10

运行完成在YARN 监控页面截图如下

image-20210421090750926
image-20210421090750926

设置资源信息,提交运行WordCount程序至YARN上,命令如下:

代码语言:javascript
复制
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--num-executors 2 \
--queue default \
--class cn.itcast.spark.submit.SparkSubmit \
hdfs://node1.itcast.cn:8020/spark/apps/spark-day02_2.11-1.0.0.jar \
/datas/wordcount.data /datas/swcy-output

当WordCount应用运行YARN上完成以后,从8080 WEB UI页面点击应用历史服务连接,查看应用运行状态信息。

image-20210421091219880
image-20210421091219880

05-[掌握]-DeployMode两种模式区别

Spark Application提交运行时部署模式Deploy Mode,表示的是Driver Program运行的地方,要么是提交应用的Client:client,要么是集群中从节点(Standalone:Worker,YARN:NodeManager):cluster

image-20210421091454276
image-20210421091454276
  • client 模式

​ 默认DeployMode为Client,表示应用Driver Program运行在提交应用Client主机上(启动JVM Process进程),示意图如下:

image-20210421091612992
image-20210421091612992

假设运行圆周率PI程序,采用client模式,命令如下:

代码语言:javascript
复制
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--total-executor-cores 2 \
--class org.apache.spark.examples.SparkPi \
${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \
10
  • cluster 模式

如果采用cluster模式运行应用,应用Driver Program运行在集群从节点Worker某台机器上。

image-20210421092349596
image-20210421092349596

假设运行圆周率PI程序,采用cluster模式,命令如下:

代码语言:javascript
复制
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 \
--deploy-mode cluster \
--supervise \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--total-executor-cores 2 \
--class org.apache.spark.examples.SparkPi \
${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \
10

06-[掌握]-Spark on YARN之YARN Client 模式

当应用运行YARN上时,有2部分组成:

  • AppMaster,应用管理者,申请资源和调度Job执行
  • Process,运行在NodeManager上进程,运行Task任务

Spark 应用运行集群上时,也有2部分组成:

  • Driver Program,应用管理者,申请资源运行Executors和调度Job执行
  • Executors,运行JVM进程,其中执行Task任务和缓存数据

当Spark应用运行在YARN集群上时,运行架构是什么样子的呢????

  • YARN Client 模式

当Spark 运行在YARN集群时,采用client DeployMode时,有如下三个进程:

  • AppMaster,申请资源,运行Executors
  • Driver Program,调度Job执行和监控
  • Executors,运行JVM进程,其中执行Task任务和缓存数据
  • YARN Cluster 模式

当Spark 运行在YARN集群时,采用clusterDeployMode时,有如下2个进程:

  • Driver Program(AppMaster),既进行资源申请,又进行Job调度
  • Executors,运行JVM进程,其中执行Task任务和缓存数据

所以Spark Application运行在YARN上时,采用不同DeployMode时架构不一样,企业实际生产环境还是以cluster模式为主,client模式用于开发测试,两者的区别面试中常问。


在YARN Client模式下,Driver在任务提交的本地机器上运行,示意图如下:

image-20210421094103770
image-20210421094103770

采用yarn-client方式运行词频统计WordCount程序

代码语言:javascript
复制
/export/server/spark/bin/spark-submit \
--master yarn \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--num-executors 2 \
--queue default \
--class cn.itcast.spark.submit.SparkSubmit \
hdfs://node1.itcast.cn:8020/spark/apps/spark-day02_2.11-1.0.0.jar \
/datas/wordcount.data /datas/swcy-client
image-20210421094944809
image-20210421094944809

07-[掌握]-Spark on YARN之YARN Cluster模式

​ 在YARN Cluster模式下,Driver运行在NodeManager Contanier中,此时Driver与AppMaster合为一体,示意图如下:

image-20210421102035198
image-20210421102035198

以运行词频统计WordCount程序为例,提交命令如下:

代码语言:javascript
复制
/export/server/spark/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--num-executors 2 \
--queue default \
--class cn.itcast.spark.submit.SparkSubmit \
hdfs://node1.itcast.cn:8020/spark/apps/spark-day02_2.11-1.0.0.jar \
/datas/wordcount.data /datas/swcy-cluster

08-[理解]-Spark 应用MAIN函数代码执行

Spark Application应用程序运行时,无论client还是cluster部署模式DeployMode,当DriverProgram和Executors启动完成以后,就要开始执行应用程序中MAIN函数的代码,以词频统计WordCount程序为例剖析讲解。

image-20210421102717862
image-20210421102717862

上述图片中,A、B都是在Executor中执行,原因在于对RDD数据操作的,针对C来说,如果没有返回值时,在Executor中执行,有返回值,比如调用count、first等函数时,在Driver中执行的。

image-20210421103547417
image-20210421103547417

09-[了解]-RDD 概念之引入说明

​ 对于大量的数据,Spark 在内部保存计算的时候,都是用一种叫做弹性分布式数据集(ResilientDistributed Datasets,RDD)的数据结构来保存的,所有的运算以及操作都建立在 RDD 数据结构的基础之上 在Spark框架中,将数据封装到集合中:RDD,如果要处理数据,调用集合RDD中函数即可。

image-20210421104439744
image-20210421104439744

也就是说RDD设计的核心点为:

image-20210421104619864
image-20210421104619864

文档:http://spark.apache.org/docs/2.4.5/rdd-programming-guide.html

10-[掌握]-RDD 概念之官方定义

​ RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变可分区、里面的元素可并行计算的集合。

image-20210421105136603
image-20210421105136603

拆分核心要点三个方面:

image-20210421105228417
image-20210421105228417

​ 可以认为RDD是分布式的列表List或数组Array,抽象的数据结构,RDD是一个抽象类AbstractClass和泛型Generic Type

image-20210421105450025
image-20210421105450025

RDD弹性分布式数据集核心点示意图如下:

image-20210421105515014
image-20210421105515014

11-[掌握]-RDD 概念之5大特性剖析

RDD 数据结构内部有五个特性(摘录RDD 源码):前3个特性,必须包含的;后2个特性,可选的。

image-20210421111147705
image-20210421111147705
  • 第一个:a list of partitions
    • 每个RDD由一系列分区Partitions组成,一个RDD包含多个分区
image-20210421111348686
image-20210421111348686
  • 第二个:A function for computing each split
    • 对RDD中数据处理时,每个分区(分片)数据应用函数进行处理
image-20210421111515954
image-20210421111515954
  • 第三个:A list of dependencies on other RDDs
    • 一个RDD依赖于一些列RDD
image-20210421111601003
image-20210421111601003

在RDD类中,对应一个方法:

image-20210421111702879
image-20210421111702879
  • 第四个:Optionally, a Partitioner for key-value RDDs
    • 当RDD中数据类型为Key/Value(二元组),可以设置分区器Partitioner
image-20210421111915023
image-20210421111915023
  • 第五个:Optionally, a list of preferred locations to compute each split on
    • 对RDD中每个分区数据进行计算时,找到最佳位置列表
    • 对数据计算时,考虑数据本地行,数据在哪里,尽量将Task放在哪里,快速读取数据进行处理
image-20210421112140594
image-20210421112140594

​ RDD 是一个数据集的表示,不仅表示了数据集,还表示了这个数据集从哪来、如何计算,主要属性包括五个方面(必须牢记,通过编码加深理解,面试常问):

image-20210421112212408
image-20210421112212408

12-[掌握]-RDD 概念之词频统计WordCount中RDD

以词频统计WordCount程序为例,查看整个Job中各个RDD类型及依赖关系,WordCount程序代码如下:

image-20210421112449649
image-20210421112449649

运行程序结束后,查看WEB UI监控页面,此Job(RDD调用foreach触发)执行DAG图:

image-20210421112505327
image-20210421112505327

13-[掌握]-RDD 创建的两种方式

​ 如何将数据封装到RDD集合中,主要有两种方式:并行化本地集合(Driver Program中)和引用加载外部存储系统(如HDFS、Hive、HBase、Kafka、Elasticsearch等)数据集。

image-20210421113605139
image-20210421113605139

实际使用最多的方法:textFile,读取HDFS或LocalFS上文本文件,指定文件路径和RDD分区数目。

image-20210421114453428
image-20210421114453428

实际项目中如果从HDFS读取海量数据,应用运行在YARN上,默认情况下,RDD分区数目等于HDFS上Block块数目。

14-[掌握]-创建RDD时小文件读取

​ 在实际项目中,有时往往处理的数据文件属于小文件(每个文件数据数据量很小,比如KB,几十MB等),文件数量又很大,如果一个个文件读取为RDD的一个个分区,计算数据时很耗时性能低下,使用SparkContext中提供:wholeTextFiles类,专门读取小文件数据。

image-20210421114928772
image-20210421114928772

范例演示:读取100个小文件数据,每个文件大小小于1MB,设置RDD分区数目为2。

代码语言:javascript
复制
package cn.itcast.spark.source

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 采用SparkContext#wholeTextFiles()方法读取小文件
 */
object _02SparkWholeTextFileTest {
	
	def main(args: Array[String]): Unit = {
		val sc: SparkContext = {
			// sparkConf对象
			val sparkConf = new SparkConf()
				// _01SparkParallelizeTest$  ->(.stripSuffix("$"))   ->  _01SparkParallelizeTest
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// sc 实例对象
			SparkContext.getOrCreate(sparkConf)
		}
		
		/*
		  def wholeTextFiles(
		      path: String,
		      minPartitions: Int = defaultMinPartitions
		  ): RDD[(String, String)]
		  Key: 每个小文件名称路径
		  Value:每个小文件的内容
		 */
		val inputRDD: RDD[(String, String)] = sc.wholeTextFiles("datas/ratings100", minPartitions = 2)
		
		println(s"RDD 分区数目 = ${inputRDD.getNumPartitions}")
		
		inputRDD.take(2).foreach(tuple => println(tuple))
		
		// 应用结束,关闭资源
		sc.stop()
		
	}
	
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-02-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark Day03:Spark 基础环境
    • 02-[了解]-今日课程内容提纲
      • 03-[掌握]-Spark on YARN之属性配置和服务启动
        • 04-[掌握]-Spark on YARN之提交应用
          • 05-[掌握]-DeployMode两种模式区别
            • 06-[掌握]-Spark on YARN之YARN Client 模式
              • 07-[掌握]-Spark on YARN之YARN Cluster模式
                • 08-[理解]-Spark 应用MAIN函数代码执行
                  • 09-[了解]-RDD 概念之引入说明
                    • 10-[掌握]-RDD 概念之官方定义
                      • 11-[掌握]-RDD 概念之5大特性剖析
                        • 12-[掌握]-RDD 概念之词频统计WordCount中RDD
                          • 13-[掌握]-RDD 创建的两种方式
                            • 14-[掌握]-创建RDD时小文件读取
                            相关产品与服务
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档