前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SparkCore之RDD

SparkCore之RDD

作者头像
用户1483438
发布2022-04-26 15:02:10
6430
发布2022-04-26 15:02:10
举报
文章被收录于专栏:大数据共享

RDD 五大特性

  • A list of partitions 一组分区:多个分区,在RDD中用分区的概念。
  • A function for computing each split 函数:每个(split/partitions)对应的计算逻辑
  • A list of dependencies on other RDDs 依赖关系:可对其他RDD有依赖关系,比如上一个RDD结果需要由下一个RDD进行处理。
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 分区器:key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的paritioner接口,控制Key分到哪个reduce。
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 优先位置:作用在每个分区上的优先位置。由spark自动分配

https://blog.csdn.net/zym1117/article/details/79532458

RDD的创建方式

  1. 通过本地集合创建

makeRDD

代码语言:javascript
复制
def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism)

底层实现采用 parallelize

代码语言:javascript
复制
def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
}

seq:传入一个集合队列 numSlices:指定分区数;若不指定会采用默认的 通过源码知道numSlices默认值通过 spark.default.parallelism 配置

代码语言:javascript
复制
 override def defaultParallelism(): Int =scheduler.conf.getInt("spark.default.parallelism", totalCores)

totalCores 通过 LocalSchedulerBackend 类传进来的。 至于为啥是LocalSchedulerBackend,因为我用的是本地模式,当然也有StandaloneSchedulerBackend(集群模式)

代码语言:javascript
复制
private[spark] class LocalSchedulerBackend(conf: SparkConf,scheduler: TaskSchedulerImpl,val totalCores: Int)

LocalSchedulerBackend 会从SparkContext中背创建

代码语言:javascript
复制
val conf=new SparkConf().setMaster("local[4]").setAppName("custom-app")
val sc=new SparkContext(conf)

master 就是在.setMaster("local[4]")指定的值。也就是说,该值会在初始化SparkContext是指定。

代码语言:javascript
复制
master match {
      case "local" =>
        checkResourcesPerTask(clusterMode = false, Some(1))
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_REGEX(threads) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        if (threadCount <= 0) {
          throw new SparkException(s"Asked to run locally with $threadCount threads")
        }
        checkResourcesPerTask(clusterMode = false, Some(threadCount))
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*, M] means the number of cores on the computer with M failures
        // local[N, M] means exactly N threads with M failures
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        checkResourcesPerTask(clusterMode = false, Some(threadCount))
        val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

      case SPARK_REGEX(sparkUrl) =>
        checkResourcesPerTask(clusterMode = true, None)
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
        checkResourcesPerTask(clusterMode = true, Some(coresPerSlave.toInt))
        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
        val memoryPerSlaveInt = memoryPerSlave.toInt
        if (sc.executorMemory > memoryPerSlaveInt) {
          throw new SparkException(
            "Asked to launch cluster with %d MiB RAM / worker but requested %d MiB/worker".format(
              memoryPerSlaveInt, sc.executorMemory))
        }

        // For host local mode setting the default of SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED
        // to false because this mode is intended to be used for testing and in this case all the
        // executors are running on the same host. So if host local reading was enabled here then
        // testing of the remote fetching would be secondary as setting this config explicitly to
        // false would be required in most of the unit test (despite the fact that remote fetching
        // is much more frequent in production).
        sc.conf.setIfMissing(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, false)

        val scheduler = new TaskSchedulerImpl(sc)
        val localCluster = new LocalSparkCluster(
          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
        val masterUrls = localCluster.start()
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
          localCluster.stop()
        }
        (backend, scheduler)

      case masterUrl =>
        checkResourcesPerTask(clusterMode = true, None)
        val cm = getClusterManager(masterUrl) match {
          case Some(clusterMgr) => clusterMgr
          case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
        }
        try {
          val scheduler = cm.createTaskScheduler(sc, masterUrl)
          val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
          cm.initialize(scheduler, backend)
          (backend, scheduler)
        } catch {
          case se: SparkException => throw se
          case NonFatal(e) =>
            throw new SparkException("External scheduler cannot be instantiated", e)
        }
    }

local有三种模式

  • setMaster("local");默认指定一个处理线程
代码语言:javascript
复制
case "local" =>
        checkResourcesPerTask(clusterMode = false, Some(1))
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)

通过此代码可以看出,totalCores 默认值为1

代码语言:javascript
复制
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
  • setMaster("local[*]") 或者 setMaster("local[N]"); * 表示服务器cpu总核数 N 表示自己指定一个core

通过正则的方式匹配.setMaster("local[4]")local[*]还是 local[N]

代码语言:javascript
复制
 val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
代码语言:javascript
复制
// 当前设置的local[4],所以 threads=4
case LOCAL_N_REGEX(threads) =>
        // 获取当前服务器的cpu核数
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
       // 校验 threads是否大于0
        if (threadCount <= 0) {
          throw new SparkException(s"Asked to run locally with $threadCount threads")
        }
        checkResourcesPerTask(clusterMode = false, Some(threadCount))
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        // 通过上面代码得出,
        // 如果是 * 那么 threadCount =计算机cpu 核数
        // 如果是数字,那么就是具体的数字
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

那么此时验证一下,是否如想到那样 partition 数是 4

代码语言:javascript
复制
def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local[4]").setAppName("custom-app")
    val sc=new SparkContext(conf)

    val list=List(1,2,3,4,5,6,7,8)
    val value: RDD[Int] = sc.makeRDD(list)
    println(s"Partitions=${value.getNumPartitions}")
    println(value.collect.toList)
}
代码语言:javascript
复制
Partitions=4
List(1, 2, 3, 4, 5, 6, 7, 8)

如果是local[*];我的电脑就是16核

代码语言:javascript
复制
Partitions=16
List(1, 2, 3, 4, 5, 6, 7, 8)

除了使用默认的(由spark分配),当然我们也可以自动分配,比如设置为3

代码语言:javascript
复制
  def main(args: Array[String]): Unit = {
    val list=List(1,2,3,4,5,6,7,8)
    val value: RDD[Int] = sc.makeRDD(list,3)
    println(s"Partitions=${value.getNumPartitions}")
    println(value.collect.toList)
  }
代码语言:javascript
复制
Partitions=3
List(1, 2, 3, 4, 5, 6, 7, 8)

前面说过makeRDD底层就是parallelize,所以直接使用parallelize也是没问题的。

代码语言:javascript
复制
def main(args: Array[String]): Unit = {
    //val lines: RDD[String] = sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",3)
    val list=List(1,2,3,4,5,6,7,8)
    val value: RDD[Int] = sc.parallelize(list,3)
    println(s"Partitions=${value.getNumPartitions}")
    println(value.collect.toList)
  }
代码语言:javascript
复制
Partitions=3
List(1, 2, 3, 4, 5, 6, 7, 8)

parallelize:底层实现

代码语言:javascript
复制
def parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
  1. 通过读取文件创建

通过读取文件创建RDD 如果集群配置文件中有配置HADOOP_CONF_DIR,此时spark默认读取的是HDFS文件 1、读取HDFS文件: 1、sc.textFile("/.../...") 2、sc.textFile("hdfs:///.../...") 3、sc.textFile("hdfs://hadoop102:8020/.../...") 2、读取本地文件: sc.textFile("file:///.../...") 如果集群配置文件中没有配置HADOOP_CONF_DIR,此时spark默认读取的是本地文件 1、读取HDFS文件: sc.textFile("hdfs://hadoop102:8020/.../...") 2、读取本地文件: 1、sc.textFile("/.../...") 2、sc.textFile("file:///.../...")

读取hdfs文件hdfs://

代码语言:javascript
复制
  @Test
  def readHdfs():Unit={
    val conf=new SparkConf().setMaster("local[4]").setAppName("custom-app")
    val sc=new SparkContext(conf)

    val lines=sc.textFile("hdfs://hadoop102:9820/input/wordcount")
    println(s"Partitions=${lines.getNumPartitions}")
    println(lines.collect.toList)

  }

为了方便测试,使用了junit

代码语言:javascript
复制
Partitions=2
List(你好 google , python 你好 count word hello, count 你好 google)

读取本地文件file://

代码语言:javascript
复制
  @Test
  def readLocalFile():Unit={
    val conf=new SparkConf().setMaster("local[4]").setAppName("custom-app")
    val sc=new SparkContext(conf)

    val lines= sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt")
    println(s"Partitions=${lines.getNumPartitions}")
    println(lines.collect.toList)
  }
代码语言:javascript
复制
Partitions=2
List(hello java shell, python java java, wahaha java shell)

明明指定的是local[4] 为啥Partitions=2? 分享textFile源码 从textFile参数列表中可以看出,除了指定文件地址外,还需要配置一个minPartitions;入股不配置使用默认值

代码语言:javascript
复制
def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }
代码语言:javascript
复制
  /**
   * Default min number of partitions for Hadoop RDDs when not given by user
   * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
   * The reasons for this are discussed in https://github.com/mesos/spark/pull/718
   */
  def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

通过源码追踪 defaultParallelism 就是我们上面获取到的(如下)。所以defaultParallelism=4

代码语言:javascript
复制
override def defaultParallelism(): Int = scheduler.conf.getInt("spark.default.parallelism", totalCores)

由此得出若不指定minPartitions默认值为 小于等于2

我们指定一个分区数试试,比如5

代码语言:javascript
复制
@Test
def readLocalFile():Unit={
    val lines= sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",5)
    println(s"Partitions=${lines.getNumPartitions}")
    println(lines.collect.toList)
}
代码语言:javascript
复制
Partitions=5

通过读取文件创建的RDD的分区数 1、如果有设置minPartitions参数, RDD分区数 >= minPartitions 2、如果没有设置minPartitions参数,RDD分区数 >= Math.min( defaultParallelism, 2) RDD的分区数最终看文件的切片数

  1. 通过其他RDD衍生 完善worldCount功能
代码语言:javascript
复制
 @Test
  def readLocalFile():Unit={
    val lines= sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)
    println(s"Partitions=${lines.getNumPartitions}")

    val groupList: RDD[(String, Iterable[String])] = lines.flatMap(_.split(" ")).groupBy(x=>x)
    val value: RDD[(String, Int)] = groupList.map({ case (k, v) => (k, v.size) })

    println(value.collect.toList)
  }

以其他方式参数的RDD如:flatMap,map 就称为RDD的衍生

代码语言:javascript
复制
Partitions=4
List((python,2), (wahaha,2), (shell,3), (hello,2), (java,7))
代码语言:javascript
复制
    * 通过集合创建的RDD
    *       如果在通过parallelize在创建RDD的时候有指定numSlices,此时RDD的分区数 = numSlices
    *       如果在通过parallelize在创建RDD的时候没有指定numSlices
    *             defaultParallelism的值:
    *                 如果有设置spark.default.parallelism参数,则RDD的分区数 = spark.default.parallelism参数值
    *                 如果没有设置spark.default.parallelism参数
    *                       1、master=local[N]的时候,RDD的分区数 = N
    *                       2、master=local[*]的时候,RDD的分区数 = 机器CPU个数
    *                       3、master=local的时候,RDD的分区数 = 1
    *                       4、master=spark://.. 的时候,RDD的分区数 = 本次任务所有executor cpu总核数
    */
  • textFile 文件切片

运行程序

代码语言:javascript
复制
 @Test
  def readLocalFile():Unit={
    val lines= sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)
    println(s"Partitions=${lines.getNumPartitions}")

    val groupList: RDD[(String, Iterable[String])] = lines.flatMap(_.split(" ")).groupBy(x=>x)
    val value: RDD[(String, Int)] = groupList.map({ case (k, v) => (k, v.size) })
    println(value.collect.toList)
  }

worldCount.txt 文件大小

267 字节 (267 字节)

切片源码

代码语言:javascript
复制
public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    StopWatch sw = new StopWatch().start();
    FileStatus[] files = listStatus(job);
    
    // 获取文件总大小  totalSize =267
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }
    // 计算每个分区分配多大文件 267/4 =66 取整
    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    // 获取最低切片个数,默认为1。
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    // splits 存储切块数据。
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
     // 判断文件大小
      if (length != 0) {
        FileSystem fs = path.getFileSystem(job);
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        // 判断文件是否可切
        if (isSplitable(fs, path)) {
          // 获取分块大小,本地 32M;HDFS:128M
          long blockSize = file.getBlockSize();
          // 获取切片大小(通过下面分析得出 splitSize =66)
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);
          // 循环得出切片大小
          //SPLIT_SLOP=1.1 用于处理剩余文件
          // 第一次   267/66 = 4.045454545454546 ;bytesRemaining =267-66=201
          // 第二次   201/66 = 3.0454545454545454 ;bytesRemaining =201-66=135
          // 第三次   135/66 = 2.0454545454545454 ;bytesRemaining =267-66=69
          // 第四次   69/66 = 1.0454545454545454; 该结果小于  SPLIT_SLOP 所以不进while 
          long bytesRemaining = length; // bytesRemaining =267
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                length-bytesRemaining, splitSize, clusterMap);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts[0], splitHosts[1]));
            bytesRemaining -= splitSize;
          }
          // 第四次   由于上面最后一次不满足while 条件,所以不进;剩下的数据都将合在一起。
          if (bytesRemaining != 0) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                - bytesRemaining, bytesRemaining, clusterMap);
            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                splitHosts[0], splitHosts[1]));
          }
        } else {
          String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
          splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    // splits 大小=4;表示最终分区分为splits.size个。
    return splits.toArray(new FileSplit[splits.size()]);
  }

computeSplitSize Hadoop 切片大小公式

代码语言:javascript
复制
// goalSize=66
// minSize=1
// blockSize=32<<20 字节
protected long computeSplitSize(long goalSize, long minSize,long blockSize) {
    // Math.min(goalSize, blockSize) 找最小值  结果就是66
   //   Math.max(minSize, 66); 找最大值 66 
    return Math.max(minSize, Math.min(goalSize, blockSize));
}

调整 worldCount.txt 文件大小

79 字节 (79 字节)

此时发现,程序最终Partitions结果为 5

代码语言:javascript
复制
Partitions=5

通过切片来说明

获取文件总大小

代码语言:javascript
复制
long totalSize = 79; 

计算每个分区分配多大文件 goalSize = 79/4 =19 取整

代码语言:javascript
复制
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

获取最低切片个数。 minSize =1

代码语言:javascript
复制
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

获取切片大小 splitSize = 19

代码语言:javascript
复制
long splitSize = computeSplitSize(goalSize, minSize, blockSize);

切片

代码语言:javascript
复制
          // 第一次 79 /19 = 4.157894736842105 ; bytesRemaining  =79-19=60
          // 第二次 60 /19 = 3.1578947368421053 ; bytesRemaining  =60-19=41
          // 第三次 41 /19 = 2.1578947368421053 ; bytesRemaining  =41-19=22
          // 第四次 22 /19 = 1.1578947368421053 ; bytesRemaining  =22-19=3
          // 第五次 3 /19 = 0.15789473684210525 ;不满足 while  循环,
          long bytesRemaining = length; // bytesRemaining =79
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                length-bytesRemaining, splitSize, clusterMap);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts[0], splitHosts[1]));
            bytesRemaining -= splitSize;
          }
          // 第五次   将剩下文件分区合并在一起。
          if (bytesRemaining != 0) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                - bytesRemaining, bytesRemaining, clusterMap);
            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                splitHosts[0], splitHosts[1]));
          }

此次切片之后,切分了5次,最终的分区数所以就是5。

虽然在上面,设置了切片数为4,他只是表示最低的切片数为4。具体分多少分区,还得看最终文件切片数量。

代码语言:javascript
复制
al lines= sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RDD 五大特性
  • RDD的创建方式
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档