前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark RDD

spark RDD

作者头像
小石头
发布2022-11-10 21:25:29
4530
发布2022-11-10 21:25:29
举报
文章被收录于专栏:小石头

RDD简介

RDD,全称为Resilient Distributed Datasets(弹性分布式数据集),是一个容错的并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。同时,RDD还提供了一组丰富的操作来操作这些数据。在这些操作中,诸如map、flatMap、filter等转换操作实现了函数式编程模式,很好地契合了Scala的集合操作。除此之外,RDD还提供了诸如join、groupBy、reduceByKey等更为方便的操作(注意,reduceByKey是action,而非transformation),以支持常见的数据运算。

通常来讲,针对数据处理有几种常见模型,包括:Iterative Algorithms,Relational Queries,MapReduce,Stream Processing。例如Hadoop MapReduce采用了MapReduces模型,Storm则采用了Stream Processing模型。RDD混合了这四种模型,使得Spark可以应用于各种大数据处理场景。

定义只读的,可分区的分布式数据集;数据集可全部或部分缓存在内存中,在一个App多次计算间重用, RDD是Spark的核心。

血统容错:根据血统(父子间依赖关系)重计算恢复丢失数据

RDD操作: Transformation算子和Action算子。

计算机生成了可选文字:
1自过嫩:掘空肩]
《[二I[二I[二I[二
计算机生成了可选文字: 1自过嫩:掘空肩] 《[二I[二I[二I[二

原生数据空间转RDD

原生的SCALA数据集合可以转换为RDD进行操作

包含一下两种方式

makeRDD

parallelize

计算机生成了可选文字:
ObjectScalaToRdd{
defmain(args:Array[String]):Unit=
valconf=newSparkConf()
conf.setMaster("local")
conf.setAppName("test")
valsc=newSparkContext()onf)
print/n(sc.makeRDD(1,to(100)).sum0)
sc.stop()
计算机生成了可选文字: ObjectScalaToRdd{ defmain(args:Array[String]):Unit= valconf=newSparkConf() conf.setMaster("local") conf.setAppName("test") valsc=newSparkContext()onf) print/n(sc.makeRDD(1,to(100)).sum0) sc.stop()

存储文件转RDD

计算机生成了可选文字:
objectFileToRdd{
defmain(args:Array[String]):Unit
valconf=newSparkConfO
conf.setMaster("IocaI)
conf.setAppName("test")
valsc=newSparkContext(conf)
yaLtext=sctextfiIe("hdfs://master:8020/tmp/scmlog")
text,flatMap(_.split("")).map(k=>(k,l)).reduceByKey(_+_);
sc.stop()
计算机生成了可选文字: objectFileToRdd{ defmain(args:Array[String]):Unit valconf=newSparkConfO conf.setMaster("IocaI) conf.setAppName("test") valsc=newSparkContext(conf) yaLtext=sctextfiIe("hdfs://master:8020/tmp/scmlog") text,flatMap(_.split("")).map(k=>(k,l)).reduceByKey(_+_); sc.stop()

Partition(分区)

一份待处理的原始数据会被按照相应的逻辑切分成n份,每份数据对应到RDD中的一个Partition,Partition的数量决定了task的数量,影响着程序的并行度,所以理解Partition是了解spark背后运行原理的第一步。

分区数的设置:

  1. 在local模式下通过设置local[*],设置分区数

local[2]:2个

local[*]:拿到当前CPU的内核数,比如是双核的就是2  4核就是4

  1. 通过自定义分区数
  2. 通过设置初始化分区数

sc.makeRDD(1 to 1000,5)

  1. 可通过算子来进行修改分区数.repartition(3)
  2. 如果使用的是scala集合的话,在特定的格式下,会根据数量量来创建分区makeRdd
  3. 读取HDFS上的数据时根据块的数量来划分分区数
计算机生成了可选文字:
valunit:sc.parallelize(1to100,20)
,夕存刭分区
/乡惫ni岁ar000s.size一一另一一庐尹2壅《区焱.
varnpO1=unit.getNumPartitIons
计算机生成了可选文字: valunit:sc.parallelize(1to100,20) ,夕存刭分区 /乡惫ni岁ar000s.size一一另一一庐尹2壅《区焱. varnpO1=unit.getNumPartitIons

Spark核心概念 – 宽依赖和窄依赖

RDD父子依赖关系:窄( Narrow)依赖和宽( Wide)依赖。

窄依赖:指父RDD的每一个分区最多被一个子RDD的分区所用。

宽依赖:指子RDD的分区依赖于父RDD的所有分区。

计算机生成了可选文字:
NarrowDependencies:
map,filter
join”i山input、
co-partitioned
WideDependencies:
groupByKey
joinwithinputsnot
co-partitioned
groupBy
map
,Stage2
.J01n
Stage3
计算机生成了可选文字: NarrowDependencies: map,filter join”i山input、 co-partitioned WideDependencies: groupByKey joinwithinputsnot co-partitioned groupBy map ,Stage2 .J01n Stage3

Stage:

一个Job会被拆分为多组Task,每组任务被称为一个Stage就像Map Stage, Reduce Stage。Stage的划分在RDD的论文中有详细的介绍,简单的说是以shuffle和result这两种类型来划分。在Spark中有两类task,一类是shuffleMapTask,一类是resultTask,第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也以此为依据,shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。比如 rdd.parallize(1 to 10).foreach(println) 这个操作没有shuffle,直接就输出了,那么只有它的task是resultTask,stage也只有一个;如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 这个job因为有reduce,所以有一个shuffle过程,那么reduceByKey之前的是一个stage,执行shuffleMapTask,输出shuffle所需的数据,reduceByKey到最后是一个stage,直接就输出结果了。如果job中有多次shuffle,那么每个shuffle之前都是一个stage.

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-12-01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据保险箱
数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档