前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何理解大数据框架中的分区概念

如何理解大数据框架中的分区概念

作者头像
kk大数据
发布2023-03-03 09:02:35
6890
发布2023-03-03 09:02:35
举报
文章被收录于专栏:kk大数据kk大数据

一、分布式问题背景

随着科技进步互联网的发展,各行各业产生的数据越来越多,由此催生了大量的数据处理需求。

任何事物的发展都不是一蹴而就的,一家公司成立之初10多人规模的时候,使用单台机器 shell 处理日志或者单个数据库就可以满足计算要求,并不需要分布式,并且效率很高

随着业务发展,需求的复杂度越来越高,单机处理的上限与性能日益凸显,为了突破瓶颈,就需要引入一些大数据的计算与存储框架,使用分布式计算和存储的方式,化整为零,分而治之。

二、分区在 Spark 中的实现

1、一段 WordCount 程序

Spark 中独创性的使用 RDD 来表示数据集,使用算子来表示任意的数据处理过程。

RDD 并不存储数据,RDD 只是表示对数据集的引用、计算方式、以及 RDD 之间的依赖关系。

下面贴出了一段程序,来计算经典的单词计数问题:

代码语言:javascript
复制
sc.textFile("hdfs://hadoop1:8020/input/app_24.log")
  .flatMap(_.split("\\s+"))
  .map((_,1))
  .reduceByKey(_+_)
  .foreach(println)

大道至简,虽然这段程序并不复杂,但是当它跑在 Spark 集群上时,Spark 默默在背后做了以下的事情:

(1)生成两类任务,一类任务的逻辑是:从原始文件中领取一段属于自己的文件,计算单词数量;另一类任务的逻辑是:汇总前面任务的结果得到最终结果返回。

(2)调度器需要计算集群资源的使用情况,先把第一类任务按需发送到不同的服务器上执行;

(2)等到第一批任务全部执行完后,再提交第二批任务执行,它们会从第一批任务处读取它们的计算结果,做最终处理。

2、深入 textfile 的细节

下面深入到一些细节中,首先需要介绍一下背景:

笔者本地部署了一个单机版的 hadoop 程序,并且设置了 blockSize 为 10m

然后上传了一个 99M 的文件(就是代码中的文件)到 hdfs 上,此时文件 block 数量如下所示:

文件总大小为 99MB,在 hdfs 上一共有 10 个 block。

最终提交执行时,Spark 一共会产生 10 个 Task,每个 Task 读取一个 block 块文件

这个结论是如何得出来的?

此时需要引入一个概念:RDD 的分区。

在源码中,分区是 RDD 的一个非常重要的属性

可以想象,既然是分布式计算,那么每个 Task 肯定只需要计算自己的这一份数据。

而 Task 的数量是和分区数量一致的,每个分区对应一个 Task。

而 RDD 的分区数量是如何计算得到的?

答案是:每个 RDD 中都有一个 getPartitions 方法来计算分区。

让我们回到代码中。

sc.textFile 这一行代码,它返回的是一个 HadoopRDD,这个类里面定义了 getPartitions 方法。

抽去不需要的代码,简化核心代码如下:

代码语言:javascript
复制
override def getPartitions: Array[Partition] = {
    val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
    val inputSplits = if (ignoreEmptySplits) {
        allInputSplits.filter(_.getLength > 0)
      } else {
        allInputSplits
      }
      val array = new Array[Partition](inputSplits.size)
      for (i <- 0 until inputSplits.size) {
        array(i) = new HadoopPartition(id, i, inputSplits(i))
      }
      array
}

首先使用 Hadoop 提供的 FileInputFormat 方法,遍历所有文件,再对每个文件遍历它的每个 block 文件,每个 block 文件得到一个 InputSplit。

在我的环境中,inputSplits 就是10个元素:

最终封装成 HadoopPartition 返回。

在生成 Task 的环节,就是利用这些 Partition 来生成对应的 Task。

3、其他算子的分区定义

窄依赖的算子的分区数,会传承了前面的 RDD。比如此案例中的 flatMap 和 Map ,分区数都是 10 个,每个分区上下游算子都是 1 对 1 关系。

宽依赖的算子,比如 reduceByKey、groupByKey、join 等,都是根据参数传入的分区数决定;

如果参数没传分区数,会有一个算法来计算默认分区数(并不是坊间传闻的由上游的最大分区数决定)。

三、分区在 Kafka 中的实现

Kafka 是一个大数据的消息中间件。

严格意义上来说,它并不是一个消息队列,因为它并不能做到全局的消息有序,所以这里称之为消息中间件。

Kafka 的作用使用四个字总结就是削峰填谷。

展开来说,Kafka 是在数据源头与数据计算之间,充当了消息缓冲的作用。

因为计算资源受限于机器的数量和每台机器的计算能力,而数据发送端(比如日志生成)则没有此限制。

一旦数据发送端生成数据超出了数据计算端的计算能力,系统就会发生不可预期的问题。

如果中间加入了 Kafka 这层缓冲,即使有海量数据过来,顶多是计算端来不及消费,数据有所积压,不会影响到整个系统的稳定。

为了充当好这个角色,对 Kafka 至少提出了以下的要求:

(1)必须是高性能的:每秒的吞吐量要跟上;

(2)必须是可扩展的:可扩展才能进一步提升吞吐;

(3)必须是高可靠的:增加数据的容错。

为此,Kafka 也设计了分区的概念,只有对数据分区了,才能把数据存储在不同的服务器上。

Kafka 的 Topic 可以在创建的时候,指定多个分区。每个分区可以指定多个副本。多个副本之间保持同步。

如下命令可以创建出包含 3 个分区,每个分区 3 备份的 topic

代码语言:javascript
复制
bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka  --create --replication-factor 3 --partitions 3 --topic topic_log

在存储引擎中,分区一般和复制结合使用,使的每个分区的副本存储在多个节点上,提升数据的容错性。

四、分区带来的问题

物极必反,天之道,损有余而补不足,分区在大数据领域可以带来化整为零、分而治之的正向效果,却也可能带来严重的问题。

总所周知,在做 Join 操作或 ReduceByKey 的操作时,上游任务需要把自己的数据,按照下游的分区数,分别发送给所有下游任务处理,相同的数据必须要发送给同一个任务处理,否则没法达到汇总的效果。

画个简单的示意图

如果此时,发送到下游某个 Task 的数据非常多,这将导致严重的性能问题:某个下游 Task 处理了 海量数据,其他 Task 却只有零星的数据需要处理。

这个问题也被称之为:数据倾斜。

关于数据倾斜的解决,最终思路都大同小异:使用一定的方法,避免热点数据进入同一个 Task 中。

它的解决方式,可以在 Hive框架、Spark框架相关的数据倾斜问题中找到,这里不详述。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-02-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 KK架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 二、分区在 Spark 中的实现
    • 1、一段 WordCount 程序
      • 2、深入 textfile 的细节
        • 3、其他算子的分区定义
        • 三、分区在 Kafka 中的实现
        • 四、分区带来的问题
        相关产品与服务
        对象存储
        对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档