前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark系列课程-0020Spark RDD图例讲解

Spark系列课程-0020Spark RDD图例讲解

作者头像
Albert陈凯
发布2018-04-08 10:25:25
6000
发布2018-04-08 10:25:25
举报
文章被收录于专栏:Albert陈凯

我们从这节课开始,讲Spark的内核,英文叫做Spark Core,在讲Spark Core之前我们先讲一个重要的概念,RDD,

image.png

我们Spark所有的计算,都是基于RDD来计算的,我们所有的计算都是通过RDD来计算的,那问题来了,RDD到底是什么?

resilient 美 rɪ'zɪlɪənt

adj. 弹回的,有弹力的

adj. 能复原的;有复原力的

distributed 美 dɪ'strɪbjʊtɪd

adj. 分布式的,分散式的

dataset 美 'detə美 sɛt

数据集合

这是我们Spark最核心的数据结构,我们所有的开发编程,都是围绕这这个RDD来展开的

根据这个名字,我们猜猜这个RDD是什么意思?

这个词是个什么词性,是个名词对吧?Dataset,数据集,我们学习Java的时候学过这个Set把,他是一个集合对吧,data名词,给这个Set这个当做定语对吧?说明这是一个数据的集合对吧?Resilient形容词,形容这个数据集,是有弹性的,弹性什么意思,可以伸缩对吧,你拉他一下,他自己可以回到原来的状态对吗?他把这个定语放在这什么意思,说明我们这个数据集是能够自我修复对吧?

能够自我修复是什么意思?

《Learning Spark:Lightning-fast Data Analysis》一书中解释“弹性”是指在任何时候都能进行重算。这样当集群中的一台机器挂掉而导致存储在其上的RDD丢失后,Spark还可以重新计算出这部分的分区的数据。但用户感觉不到这部分的内容丢失过。这样RDD数据集就像块带有弹性的海绵一样,不管怎样挤压(分区遭到破坏)都是完整的。

分布式是什么意思?

分布式对应的就是单机的系统

那我们说分布式的优点是什么啊?其实优点还是有很多的,但最显著的优点就是有3个,他可以负载均衡是吗?当我们的计算资源,紧缺时,可以使用其他的计算资源对吗?他可以容错对吗?当我们一台机器的数据坏掉了,还有另外一个机器做副本,所以还可以找回来对吗?扩展性强对吗?我们可以提供更多的机器,更多的计算资源,更多的存储资源,为整个分布式系统提供水平的线性扩展,对吗?

那我们现在明白这个RDD是什么了吗?

大家还记得我们之前写的WordCount是怎么写的吗?

sc.这个sc是Spark上下文这么一个对象,SparkContext

sc.textFile这是读一个文件,Spark会将这个文件,加载到RDD里面去啊?

sc.textFile返回的是一个RDD对象 是吧?

然后rdd.flatMap.map.reduceByKey 对吧然后foreach打印一下对吧?

这种...是链式编程结构的一种计算,啥叫链式编程结构呢,就是flatMap的返回对象其实还是一个rdd对吧?

所以说一切都是基于RDD的

现在我们知道了RDD是弹性分布式数据集,

那么他有5大特性,我们说RDD的5大特性,我们可以把他理解为RDD的5个属性

这5大特性,我们用画图的方式来讲一下

这5大特性,我讲完之后你们可能会有一些疑问

到时候你们可以来问我,但我不一定会回答,为什么?因为随着课程的深入,这5个特性都会给你们讲明白,这就叫循序渐进,知道吗?

如果你们现在问的所有问题我都去回答的话,太深入的东西你们肯定理解不了,能明白我的意思吗?

大家不用担心,对基础比较好的东西,我在PPT的最后,给大家提供个链接,这是我们这个系列课程最后一天要给大家讲的内容,有余力的同学可以深入研究一下

我们用画图的方式来讲这5大特性,

比如说我们Spark现在要计算的数据,是放到HDFS上面

image.png

比如我们现在要处理的这个文件由3个Block组成的

image.png

那么这个文件是不是可能在不同的节点上?

然后我要用Spark来计算一下这个文件,首先是不是要把这个文件加载到Spark里面来啊?

image.png

哪一行代码?我们用sc.textFile(“hdfs://hostname:port/path”)

image.png

我们调用了sc.textFite方法,他给我们返回回来一个rdd是吧?

image.png

这个RDD里面是有一系列的分区的,我们刚才说了,RDD是不是一个弹性分布式数据集啊?

他是一个分布式的数据集,我们可以和hdfs里面的概念和这个类比,Hadoop里面是不是有block啊?hdfs之所以是分布式,是不是因为里面有hdfs里面叫做block的存储单元啊?如果hdfs里面没有存储单元,那他能完成分布式吗?如果没有这个存储单元这么一说,一个大文件,要么存在A机器,要么存在B机器。如果一台机器存不下,那我们只能去扩展这台机器的硬盘对吗?我们想实现分布式存储,我们首先需要把一个大文件拆成很多小文件,通过索引的方式知道文件的先后顺序和存储位置,就可以实现分布式存储系统了,对吗?

那我们RDD他也是分布式的,是不是也需要类似Block这样的分布式的存储单元啊?那我们Spark里面就叫他是Partition

image.png

那到底有多少个partition是有谁来决定的?那么到底有多少个Partition是由我要读取的这个file的Block数量决定的?能理解吗?

那再跟大家深入的说一下,这个textFile是一个读文件的方法

它的底层封装的是MR读文件的方法,Spark本身是没有读文件的方法的,他调用的是MapReduce读文件的方法

那我们mapreduce读文件就有特点了,首先他要干嘛?

MR读文件之前是不是要先划分split

那准确的来说,每一个Partition是和我们Split对应的对吗?

又因为Split和Block一般情况下是一样的,所以Partition的数量和Block的数量是一样的

那一般情况下就把partition的个数记成和block数量是一样的

hadoop里面的Split数量是如何决定的?

【Hadoop】三句话告诉你 mapreduce 中MAP进程的数量怎么控制?

然后我们继续画图

image.png

我们读完文件,通过flatmap.map.reduceByKey一系列算子的转换,完成了我们WordCount这个业务

新的RDD都是依赖上级的RDD的,

我们的resultRDD是依赖mapRDD的

mapRDD是依赖flatMapRDD的

flatMapRDD是依赖readFileRDD

我们继续画图,我们之前编程的时候是针对RDD的方法,其实通过我们之后的讲解,大家就会知道,我们的textFile算子,flatMap算子,map算子,reduceByKey算子都是作用在partition上的,所以我们这个图要修改一下

image.png

我们继续来讲第四个特性

image.png

在什么阶段有分区器啊?在Suffle阶段才会有分区器,

分区器的作用是什么?决定我这一条记录是写在哪个磁盘小文件上

那什么是kv格式的RDD我们画图演示一下

image.png

我们这个flatMepRDD是基于readFileRDD的,readFileRDD里面的partition可能是在不同的Block节点上面进行存储的,那要是基于RDD进行计算,是要启动task任务,那我这个Task任务分发到哪个节点上是最好的呢?是不是分发到数据所在的节点上是最好的啊?这就符合我们大数据的计算原则,移动计算而不移动数据

第五个这个特性就是说,这个readFileRDD会对外提供一个接口,我调用这个接口我就知道每个Partition所在的节点,和具体的位置

知道他的位置以后,可以参考这个位置,分发Task去执行,执行完之后就得到flatMap这个RDD了

答疑时间,哪不理解

有同学说第五个特性不理解,

rdd可以提供最佳的计算位置,task计算的数据本地化

第五个特性,我们在讲Spark调优的时候,会专门拿出一节课的时间来讲这第五个特性

第四个特性,我们会在讲shuffle的时候,详细的讲为什么会有这第四个特性

我们在这里看一下源码,这五个特性是RDD这个类里面的注释

那我们该如何去找RDD这个类呢?

用搜索,快捷键是按两下shift,

或者我们从代码里面找,core->src->main->scala->rdd->RDD

rdd这个包里面有很多的各种各样的RDD,其中有一个叫RDD的这样一个抽象类

包里面的其他RDD都是这个RDD的子类

image.png

最后总结一点,RDD实际上是不存储数据的

对于初学者的你们来说,今天一天,你们把RDD以为里面是存储数据的,这样便于你们与JAVA开发里面的集合类做类比,这样比较方便你们记住这个概念,但这样的理解就仅限于今天一天,因为随着你们学习的深入,你们更多的概念你们会记住,就不需要用这个存数据来关联这个记忆了

RDD不存数据这个事,我明天会给你们讲明白,这样更方便你们入门

最终这张图片放在这里

SparkRDD的五大特性

好我们RDD就先讲到这里

下面我们看这张PPT

image.png

这是一张进化的图,非常像我们的RDD

这里有一个名词,叫做Lineage,翻译成中文,是血统的意思

人的这一个方向是我们的resultRDD 猴子的这个方向是我们的数据源

当我们在中间的RDD出现问题的过程中,我们可以通过他的父RDD来做重新计算,这是Spark保证数据容错的一个根本。

Spark任务执行的流程

image.png

这是一个最简单的Spark执行的流程,之后我们会逐步深入的讲这个执行流程,这个执行流程也是我们在面试中经常会被问到的。

看这张图,一共有四台服务器规模的一个集群

这台服务器上启动了一个进程叫做Driver进程,还有其他三台服务器,启动一个Worker进程

这些都是真实的服务器,所以服务器上面会有一些计算用的RAM内存,这些服务器还有磁盘,磁盘上存储着我们要计算的数据

通过这张图来看,Driver这个进程主要负责人什么?

调度,什么调度啊?任务调度是吗?

是不是task的下发和result的收集啊?

大家想Driver是负责结果的回收,那如果计算的结果特别大,会有什么一个后果啊?

刚才我有讲说Driver他是一个进程,进程的内存是不是有限啊?

如果计算的结果result非常的大,是不是会导致Driver进程OOM啊?

我们看到的Driver,Worker他都是一个JVM进程,

JVM是什么啊?是不是java虚拟机啊?来大家跟上我的思路啊,我问的问题,一定在脑袋里面过一下啊

从这张图来看,Driver是不是跟我们的集群,会有频繁的通信啊?

都有哪些通信啊,分发Task是不是通信啊?收集结果是不是通信啊?

那我们说Driver做调度,Driver是不是要知道Worker执行的情况啊?

举个例子,我们在公司里面工作,领导分配给你一个任务,你说这任务特别的难,我自己研究一个礼拜,一个礼拜以后领导来问你,你告诉他你搞不定,领导会喜欢你这样的员工吗?

那你要是员工你应该怎么做啊?

讲这张图的目的,是让大家对Spark任务执行流程有个初步的了解,

Driver与Worker之间会有频繁的通信,就是你跟你领导要有频繁的沟通。

大家能记住这一点吗?要怎么?要沟通,对吗?谁跟谁来沟通?Driver和Worker来沟通

然后我们继续往下,如果我们要写一个Spark程序,那流程是什么样子的呢?

我们来感受一下写应用程序怎么写?

image.png

首先加载数据集对吗?数据集可以是Hadoop分布式文件系统上的数据

可以是Hbase的数据,可以是Hive的数据,可以是NoSQL的数据,可以是本机的数据,

也可以是内存对象的数据,加载完的数据对象,就是我们的RDD

我们现在有了RDD以后是不是要对这个RDD进行一系列的操作啊?

我们可以对RDD进行transformation类的操作

什么叫transformation类的操作啊?

我们之前所讲的flatMap、map、reduceByKey都属于transformation类的算子

transformation他是一个类别的名称,在这个类里面有很多具体的算子

那这些就是具体的算子

所有的transformation类算子他有一个特点,就是他是延迟执行的。

对刚加载过来的RDD,执行一把flatMap,实际上他并不会真正的去执行,他等待一个时机来触发执行,

还有一类的算子叫做action类算子,Action类算子他是立即执行,或者说叫触发执行

一个Spark 应用程序 你写的一系列转换,他是由transformation类算子进行转换的

遇到Action类算子才会执行

我们在写WordCount的时候,最后有一个foreach算子,老师在这告诉大家,foreach算子是一个Action算子,所以我们的程序可以执行出结果

大家可以式一下,如果不写foreach算子,程序并不会执行

image.png

我们可以看一下这段代码,这是一段伪代码

sc.textFile他是读一个文件对吧?

filter是过滤的这样一个transformation类算子

他会将lines这个RDD的内容进行过滤,那过滤的条件是什么?startWith(“ERROR”)

我们之前讲过Boolean类型的匿名函数,我们称他为谓词,大家还记得吗?

那大家回忆一下,Boolean是true的时候是保留还是false的时候是保留啊?

那保留的结果是不是放到errors这个RDD里面去了

那下面一行将errors又进行了一次过滤,包含MySQL的内容过滤出来了

在.count之前也是一个延迟执行的transformation类算子,我们把这个结果进行一个count计数由于count是Action类算子,所以他立即执行

最下面一行也是一样,过滤了包含Http这样一个字符串的所有记录

整个这一段代码我们叫他为Spark Application,Spark应用程序

在这段代码里面有几个Action类算子,那么这个应用程序就有多少个Job

Job的个数与我们的Action类算子是一一对应的。

那么一个Application里面可以有很多个job,那有多少个Action类算子就有多少个Job

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017.07.13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档