前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark之基本流程(一)

Spark之基本流程(一)

原创
作者头像
千万别过来
修改2023-08-24 07:56:45
9270
修改2023-08-24 07:56:45
举报
文章被收录于专栏:推荐算法学习推荐算法学习

前言

最近在拜读许老师的《大数据处理框架Apache Spark设计与实现》,之前看豆瓣评分很高,阅读了一下果然通俗易懂,在这里记录一下相关的笔记,补充了一些个人理解,如有不对还请指正。参考链接:https://github.com/JerryLead/SparkInternals

1.1 Spark部署

Spark在集群上部署有多个版本:Standalone、Mesos、YARN、Kubernetes。其中Standalone是Spark官方的,其他都是第三方框架。YARN是目前主流之一,可以同时管理Spark、Hadoop Mapreduce任务。

1.2 Spark 系统架构

1.2.1 基本名词概念

Spark和MapReduce一样是Master-Worker结构。由于在介绍Spark原理的时候会涉及到很多名词,一不小心就容易搞混淆,因此先梳理一下几个名词:

  • Master节点:本质上是一台机器,常驻Master进程,负责分配任务以及监控Worker存活。
  • Worker节点:本质上是多台机器,常驻Worker进程,负责执行任务以及监控任务运行状态。
  • Spark Application:用户自己写的程序,比如 HelloWorld.scala。
  • Spark Driver:一个进程。负责运行main(),以及创建SparkContext。如果是 YARN 集群,那么 Driver 可能被调度到 Worker 节点上运行(比如上图中的 Worker Node 2)。
  • Executor:一个JVM进程。一个Worker可以管理一个或多个Executor,但一个Executor只有一个线程池,线程池里有多个线程,每个线程可以执行一个 task。Spark先以Executor为单位占用集群资源,然后Driver再分配任务执行。通常来说一个Executor可以分配多个CPU和内存。
  • Task:一个Executor内的线程,最小的计算单位。一个task一般使用一个CPU,且多个task共享同一个Executor的内存。
  • Job:Spark的作业。通常执行几次action(),就会有几个作业数。比如count()两次就有两个Job。
  • Stage:Spark Job的阶段。一个Job可以分为1~n个stage。(物理执行计划里面的概念)
  • Partition:数据的分区。分区个数可以决定该数据最多部署在几台机器上。
  • RDD:本质上是一个封装好的抽象类(abstract class)。并行数据集的抽象表示(Resilient Distributed Datasets, RDD)。另外提一下,Spark的Dataframe是在RDD基础上再封装的。

1.2.2优点缺点

以上介绍可以看出来Spark这么设计相比于Hadoop MapReduce的优点和缺点:

  • 优点:多个task以线程形式执行,互相可以共享内存,避免资源浪费;同时线程启动比进程启动更快。(MR里面的task是以java进程方式运行)
  • 缺点:多个task之间由于是线程的形式会导致资源竞争,另外多个task并行的日志会比较混乱。

1.3 Spark应用例子

1.3.1 GroupBy例子

下面举一个groupby的例子,来了解spark运行的流程。

代码语言:java
复制
```scala
package org.apache.spark.examples

import java.util.Random

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

/**
  * Usage: GroupByTest [numMappers] [numKVPairs] [valSize] [numReducers]
  */
object GroupByTest {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("GroupBy Test")
    var numMappers = 3 //该应用包含3个map task 
    var numKVPairs = 4 //每个task随机生成4个<K,V>record 
    var valSize = 1000 //每个Value大小1000byte 
    var numReducers = 2 //由于随机产生的key会有重复,groupby聚合过程使用2个reduce task 

    val sc = new SparkContext(sparkConf) //也可以通过SparkSession初始化

    // 生成一个k-v形式的array
    val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
      val ranGen = new Random
      var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
      for (i <- 0 until numKVPairs) {
        val byteArr = new Array[Byte](valSize)
        ranGen.nextBytes(byteArr)
        arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
      }
      arr1
    }.cache()
    // Enforce that everything has been calculated and in cache
    pairs1.count() //=numMappers * numKVPairs = 12,到这里cache()才会生效

    println(pairs1.groupByKey(numReducers).count()) //=4

    sc.stop()
  }
}

```

代码大致意思是:先用RDD的形式生成一堆key-value形式的数组,key是随机给0~Int最大值,value是一个随机的byte。然后调用groupby和count,把相同的key聚合,计算个数。

这里需要注意,真正在写应用的时候一般不用自己指定map task的个数,通常自动计算为:

MapTask个数=\frac{输入数据大小}{每个分片大小(HDFS默认是128MB)}

实际执行流程比自己的要复杂,需要先建立逻辑处理流程(Logical Plan),然后根据逻辑处理流程生成 物理逻辑流程(Physical Plan),然后生成具体 task 执行。

1.3.2逻辑处理流程(Logical Plan)

逻辑处理流程通俗的话来说就是:各种各样的RDD不停的变换。逻辑处理流程表示的是数据上的依赖关系,不是 task 的执行图。仔细观察上面代码可以发现,action()一共有两次:

  1. 一次是flatmap生成array之后进行了一次count()。
  2. 一次是groupby之后进行了一次count()。

由于第二次count()时候数据依赖于前面,因此以变量result为例。使用 result.toDebugString 输出日志,可以看到整个逻辑处理流程如下:

代码语言:scala
复制
(2) ShuffledRDD[2] at groupByKey at GroupByTest.scala:55 []
  +-(3) MapPartitionsRDD[1] at flatMap at GroupByTest.scala:41 []
     |      CachedPartitions: 3; MemorySize: 12.4 кв;
            ExternalBlockStoreSize: 0.0 в; DiskSize: 0.0 B
     |  ParallelCollectionRDD[0] at parallelize at GroupByTest. scala:41 []

不难看出生成了3个RDD,后面的中括号是生成的顺序,即:ParallelCollectionRDDMapPartitionsRDDShuffledRDD

另外,(2) ShuffledRDD[2] at groupByKey at GroupByTest.scala:55 []前面的小括号里面的(2)、(3)是分区(partition)个数的意思。

这个日志“从里往外“看:可以看出来由于生成了3个数组,因此一直到MapPartitionsRDD都是3个分区。另外,由于我们使用了cache(),因此其中的3个分区计算时候会被缓存为CachedPartitions。而到groupby的时候,由于我们指定了var numReducers = 2,因此变成了2个分区。并且转成了ShuffledRDD

1.3.3 物理执行计划(Physical Plan)

上一节说的逻辑处理流程(Logical Plan)基本上可以理解是RDD之间的变化的关系,但是并不能执行计算任务,因此需要再转换成物理执行计划(Physical Plan)对任务执行。其中包括执行阶段(Stage)和执行任务(Task)。简单来说可以分成三个步骤:

  1. 确定应用(Application)会产生哪些作业(Job)。 比如上面例子因为count()两次,就是两个Job。
  2. 将每个作业(Job)拆分成1~n个执行阶段(Stage)。 这里是根据逻辑处理流程的数据依赖关系来拆分。比如上面例子第一个Job就只拆了1个stage,而第二个Job拆成了2个Stage。为什么这么拆,后面再说。
  3. 确定执行任务(task)的个数和种类。 这里是依据RDD的分区(Partition)个数来确定,比如第二个Job,一开始是3个partition,因此在stage 0 里面是3个task来计算任务。到了stage 1,由于定义了var numReducers = 2,变成了2个分区,因此在这里是2个task来计算任务。stage 0→stage 1,这个过程称为shuffle机制,会将数据重新分配。

注:为什么要拆分执行阶段(Stage)?

  1. 便于并行执行。 先看同一个stage里面,多个task大小合适,且为同构的,并行起来方便。
  2. 提高数据处理效率。 再看同一个task里面,多个操作串行处理,效率高。
  3. 方便错误容忍。 如果一个stage挂了,直接重新运行这个stage就行了,不用把整个job都重新运行。

1.4 查看日志

如果想知道自己Spark Application的运行流程,可以根据Spark提供的执行界面查看。

1.4.1 查看Job日志

Job日志可以查看Stage的运行的情况

1.4.2 查看Stage日志

上图点开后,可以看到多个stage,点击stage的超链接(或者从Job那边点超链接也可以),可以查看该stage的运行情况。

打开之后可以查看DAG,查看RDD的生成顺序,同时也可以查看每个task的运行时间,方便排查问题。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 1.1 Spark部署
  • 1.2 Spark 系统架构
    • 1.2.1 基本名词概念
      • 1.2.2优点缺点
      • 1.3 Spark应用例子
        • 1.3.1 GroupBy例子
          • 1.3.2逻辑处理流程(Logical Plan)
            • 1.3.3 物理执行计划(Physical Plan)
            • 1.4 查看日志
              • 1.4.1 查看Job日志
                • 1.4.2 查看Stage日志
                相关产品与服务
                大数据
                全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档