前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Spark重点难点】你的数据存在哪了?

【Spark重点难点】你的数据存在哪了?

作者头像
王知无-import_bigdata
发布2021-12-07 14:42:21
1.3K0
发布2021-12-07 14:42:21
举报

前言

在之前的课中我们讲了Spark的RDD以及整个Spark系统中的一些关键角色:《【Spark重点难点】你从未深入理解的RDD和关键角色》

以及Spark中非常重要的一个概念Shuffle:《【Spark重点难点】你以为的Shuffle和真正的Shuffle》

无论是在提交任务还是执行任务的过程中,Spark存储体系永远是绕不过去的坎。

Spark为了避免类似Hadoop读写磁盘的IO操作成为性能瓶颈,优先将配置信息、计算结果等数据存入内存,当内存存储不下的时候,可选择性的将计算结果输出到磁盘,为了保证性能,默认都是存储到内存的,这样极大的提高了Spark的计算效率。

我们先用一张图来概括一下Spark的存储体系:

整体体系中重要的角色包括:

  • BlockManager是整体存储体系中核心模块
  • DiskBlockManager磁盘管理器
  • MemoryStore内存存储
  • DiskStore磁盘存储

接下来我们依次看看这些角色都是用来做什么的。

BlockManager

BlockManager运行在每个节点上(包括Driver和Executor)。

他提供对本地或远端节点上的内存、磁盘及堆外内存中Block的管理。存储体系从狭义上来说指的就是BlockManager,从广义上来说,则包括整个Spark集群中的各个BlockManagerBlockInfoManagerDiskBlockManagerDiskStoreMemoryManagerMemoryStore、对集群中的所有BlockManager进行管理的BlockManagerMaster及各个节点上对外提供Block上传与下载服务的BlockTransferService

BlockManager的结构是Maser-Slave架构,Master就是Driver上的BlockManagerMaster,Slave就是每个Executor上的BlockManagerBlockManagerMaster负责接受Executor上的BlockManager的注册以及管理BlockManager的元数据信息。

工作原理

在DAGShceduler中有一个BlockManagerMaster对象,该对象的工作就是负责管理全局所有BlockManager的元数据,当集群中有BlockManager注册完成的时候,其会向BlockManagerMaster发送自己元数据信息;BlockManagerMaster会为BlockManager创建一个属于这个BlockManagerBlockManagerInfo,用于存放BlockManager的信息。

在创建SparkContext的时候,会调用SparkEnv.blockManager.initialize方法实例化BlockManager对象,在创建Executor对象的时候也会创建BlockManager

当我们的Spark程序启动的时候,首先会创建SparkContext对象,在创建SparkContext对象的时候就会调用_env.blockManager.initialize(_applicationId)创建BlockManager对象,这个BlockManager就是Driver上的BlockManager,它负责管理集群中Executor上的BlockManager

创建BlockManager的关键方法如下,完整的源代码你可以在BlockManager这个类中看到。

代码语言:javascript
复制
def initialize(appId: String): Unit = {
    //初始化BlockTransferService,其实是它的子类NettyBlockTransferService是下了init方法,
    //该方法的作用就是初始化传输服务,通过传输服务可以从不同的节点上拉取Block数据
    blockTransferService.init(this)
    shuffleClient.init(appId)

    //设置block的复制分片策略,由spark.storage.replication.policy指定
    blockReplicationPolicy = {
      val priorityClass = conf.get(
        "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
      val clazz = Utils.classForName(priorityClass)
      val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
      logInfo(s"Using $priorityClass for block replication policy")
      ret
    }

    //根据给定参数为对对应的Executor封装一个BlockManagerId对象(块存储的唯一标识)
    //executorID:executor的Id,blockTransferService.hostName:传输Block数据的服务的主机名
    //blockTransferService.port:传输Block数据的服务的主机名
    val id = BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)

    //调用BlockManagerMaster的registerBlockManager方法向Driver上的BlockManagerMaster注册
    val idFromMaster = master.registerBlockManager(
      id,
      maxMemory,
      slaveEndpoint)
    //更新BlockManagerId
    blockManagerId = if (idFromMaster != null) idFromMaster else id

    //判断是否开了外部shuffle服务
    shuffleServerId = if (externalShuffleServiceEnabled) {
      logInfo(s"external shuffle service port = $externalShuffleServicePort")
      BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
    } else {
      blockManagerId
    }

    // 如果开启了外部shuffle服务,并且该节点是Driver的话就调用registerWithExternalShuffleServer方法
    //将BlockManager注册在本地
    if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
      registerWithExternalShuffleServer()
    }

    logInfo(s"Initialized BlockManager: $blockManagerId")
  }

那么BlockManager又是如何存储数据的呢?Spark存储系统提供了两种存储抽象:MemoryStoreDiskStoreBlockManager正是利用它们来分别管理数据在内存和磁盘中的存取。

MemoryStore

MemoryStore负责将Block存储到内存。Spark通过将广播数据、RDD、Shuffle数据存储到内存,减少了对磁盘I/O的依赖,提高了程序的读写效率。

MemoryStore类实现了一个简单的基于块数据的内存数据库,用来管理需要写入到内存中的块数据。可以按序列化或非序列化的形式存放块数据,存放这两种块数据的数据结构是不同的,但都必须实现MemoryEntry这个接口。也就是说:MemoryStore管理的是以MemoryEntry为父接口的内存对象。

MemoryEntryMemoryStore中的管理的成员结构。它是一个接口,有两种实现:一种是DeserializedMemoryEntry用来保存非序列化块数据;一种是SerializedMemoryEntry用来保存序列化块数据

MemoryStore如何管理这些MemoryEntry对象呢?在当前版本,MemoryStore通过一个LinkedHashMap结构来管理内存对象。也就是说,MemoryStore是一个MemoryEntry类型的LinkedHashMap。Spark选择LinkedHashMap作为内存管理的数据结构与内存块的淘汰机制有很大的关系。

MemoryStore的数据结构

MemoryStore通过以MemoryEntry对象为元素的LinkedHashMap来管理块数据。LinkedHashMap是一个有序的HashMap,这样可以按插入顺序来对元素进行管理,此时各个节点构成了一个双向链表。

MemoryStore使用LinkedHashMap按访问元素的先后顺序把访问过的元素放到双向链表的末尾。这其实就形成了一个LRU队列(Least Recently Used队列)。这正是官方文档中提到的:缓存数据是不可靠的,当内存不够时,会按LRU算法来淘汰内存块。

需要注意的是,LinkedHashMap是非并发结构,所以在进行其元素的读写操作时,必须加锁。

MemoryEntry的数据结构

MemoryEntry的成员变量有三个:块数据的大小,内存模式(堆内还是堆外),块数据的类标识。MemoryEntry的代码实现如下:

代码语言:javascript
复制
// 代码位置:org.apache.spark.storage.memory

private sealed trait MemoryEntry[T] {
  // 块数据大小
  def size: Long
  // 内存模式:ON_HEAP(堆内),OFF_HEAP(堆外)
  def memoryMode: MemoryMode
  // 数据的类标识
  def classTag: ClassTag[T]
}

每个MemoryEntry对象的大小由size来确定。并且可以被保存在ON_HEAP(堆内)或者OFF_HEAP(堆外)

淘汰内存数据

当执行任务或缓存数据空闲内存不足时,可能会释放一部分存储内存,如果对应的RDD的存储级别设置了useDisk,则会把内存中的数据持久化到磁盘上。可以参考:MemoryStore#evictBlocksToFreeSpace:

代码语言:javascript
复制
private[spark] def evictBlocksToFreeSpace(
                    blockId: Option[BlockId],
                    space: Long,
                    memoryMode: MemoryMode
): Long = {...}

其中的blockId是数据块的id,每个id都对应一个内存块。需要淘汰内存块时,只需要从LinkedHashMap的头部选择一个进行删除即可。这就是上面我们提到的LRU内存数据淘汰机制。

DiskStore

DiskStore是BlockStore的另一个实现类,负责管理磁盘数据。简单的说,DiskStore就是通过DiskBlockManager来实现Block和相应磁盘文件的映射关系,从而将Block存储到磁盘的文件中。

下面是整体DiskStore的类实现:

代码语言:javascript
复制
private[spark] class DiskStore(
    conf: SparkConf,
    diskManager: DiskBlockManager,
    securityManager: SecurityManager) extends Logging { // SecurityManager用于提供对数据加密的支持

  // 读取磁盘中的Block时,是直接读取还是使用FileChannel的内存镜像映射方法读取的阈值。由spark.storage.memoryMapThreshold配置,默认为2M
  private val minMemoryMapBytes = conf.get(config.STORAGE_MEMORY_MAP_THRESHOLD)
  // 使用内存映射读取文件的最大阈值,由配置项spark.storage.memoryMapLimitForTests指定。它是个测试参数,默认值为不限制。
  private val maxMemoryMapBytes = conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS)
  // 维护块ID与其对应大小之间的映射关系的ConcurrentHashMap。
  private val blockSizes = new ConcurrentHashMap[BlockId, Long]()
  ...
}

我们可以看到DiskStore的属性有以下几项:

  • conf:即SparkConf
  • diskManager:即磁盘Block管理器DiskBlockManager
  • minMemoryMapBytes:读到磁盘中的Block时,是直接读取还是使用FileChannel的内存镜像映射方法读取的阈值

此外,DiskStore提供了下面的方法进行操作:

  • getSize:获取给定的BlockId所对应Block的大小。
  • contains:判断本地磁盘存储路径下是否包含给定BlockId所对应的Block文件。
  • remove:删除给定BlockId所对应的Block文件。
  • putBytes:用于将BlockId所对应的Block写入磁盘,Block的内容已经封装为ChunkedByteBuffer。
  • getBytes:读取给定BlockId所对应的Block,并封装为ChunkedByteBuffer返回。

借用吴磊老师的一句话:DiskStore中数据的存取本质上就是字节序列与磁盘文件之间的转换,它通过putBytes方法把字节序列存入磁盘文件,再通过getBytes方法将文件内容转换为数据块。

关于BlockStore的实现还有一种叫做TachyonStore,是基于Tachyon内存分布式文件系统级别的持久化,我们在这里就不做介绍了。感兴趣的读者可以网上搜索一些资料来看。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-12-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • BlockManager
    • 工作原理
    • MemoryStore
      • MemoryStore的数据结构
        • MemoryEntry的数据结构
          • 淘汰内存数据
          • DiskStore
          相关产品与服务
          文件存储
          文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档