前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[SPARK][CORE] 面试问题之什么是 external shuffle service?

[SPARK][CORE] 面试问题之什么是 external shuffle service?

作者头像
Tim在路上
发布2022-06-12 14:02:47
1.2K0
发布2022-06-12 14:02:47
举报

在讨论external shuffle service的具体实现之前,我们先来回顾下spark shuffle的大概过程。

spark shuffle分为两部分shuffle write和shuffle read。

在map write端,对每个task的数据,不管是按key hash还是在数据结构里先聚合再排序,最终都会将数据写到一个partitionFile里面,在partitionFile里面的数据是partitionId有序的,外加会生成一个索引文件,索引包含每个partition对应偏移量和长度。

而reduce read 端就是从这些partitionFile里面拉取相应partitionId的数据, 然后再进行聚合排序。

现在我们在来看下****external shuffle service(ESS)****,其乍从其名字上看,ESS是spark分布式集群为存储shuffle data而设计的分布式组件。但其实它只是Spark通过Executor获取Shuffle data块的代理。

我们可以理解为ESS负责管理shuffle write端生成的shuffle数据,ESS是和yarn一起使用的, 在yarn集群上的每一个nodemanager上面都运行一个ESS,是一个常驻进程。一个ESS管理每个nodemanager上所有的executor生成的shuffle数据。总而言之,ESS并不是分布式的组件,它的生命周期也不依赖于Executor。

为什么需要ESS ?

在Spark中,Executor进程除了运行task,还要负责写shuffle 数据,以及给其他Executor提供shuffle数据。当Executor进程任务过重,导致GC而不能为其他Executor提供shuffle数据时,会影响任务运行。同时,ESS的存在也使得,即使executor挂掉或者回收,都不影响其shuffle数据,因此只有在ESS开启情况下才能开启动态调整executor数目。

因此,spark提供了external shuffle service这个接口,常见的就是spark on yarn中的,YarnShuffleService。这样,在yarn的nodemanager中会常驻一个externalShuffleService服务进程来为所有的executor服务,默认为7337端口。

其实在spark中shuffleClient有两种,一种是blockTransferService,另一种是externalShuffleClient。如果在ESS开启,那么externalShuffleClient用来fetch shuffle数据,而blockTransferService用于获取broadCast等其他BlockManager保存的数据。

如果ESS没有开启,那么spark就只能使用自己的blockTransferService来拉取所有数据,包括shuffle数据以及broadcast数据。

ESS的架构与优势

在启用ESS后,ESS服务会在node节点上创建,并且每次存在时,新创建的Executor都会向其注册。

代码语言:javascript
复制
/** Registers a new Executor with all the configuration we need to find its shuffle files. */
public void registerExecutor(
    String appId,
    String execId,
    ExecutorShuffleInfo executorInfo) {

在注册过程中,使用appId, execId和ExecutorShuffleInfo(localDirs, shuffleManager类型)作为参数,从参数信息可以看出Executor会通知ESS服务它创建在磁盘上文件的存储位置。由于这些信息,ESS服务守护进程能够在检索过程中将shuffle中间的临时文件返回给其他执行程序。

ESS服务的存在也会影响文件删除。在正常情况下(没有外部 shuffle 服务),当Executor停止时,它会自动删除生成的文件。但是启用ESS服务后,Executor关闭后文件不会被清理。以下架构图说明了启用外部 shuffle 服务时工作程序节点上发生的情况:

ed.png

ESS服务的一大优势是提高了可靠性。即使其中一个 executor 出现故障,它的 shuffle 文件也不会丢失。另一个优点是可扩展性,因为在 Spark 中运行动态资源分配需要ESS服务,这块我们后续在进行介绍。

总之使用Spark ESS 为 Spark Shuffle 操作带来了以下好处:

  1. 即使 Spark Executor 正在经历 GC 停顿,Spark ESS 也可以为 Shuffle 块提供服务。
  2. 即使产生它们的 Spark Executor 挂了,Shuffle 块也能提供服务。
  3. 可以释放闲置的 Spark Executor 来节省集群的计算资源。

ESS源码初探

Executors 通过 RPC 协议与ESS服务通信,发送两种类型的消息:RegisterExecutorOpenBlocks。当Executor想要在其local external shuffle service中注册时,使用RegisterExecutor, OpenBlocks在获取shuffle data过程中使用。

在Executor创建的时候,会调用env.blockManager.initialize(conf.getAppId),在blockManager存储当前node的externalBlockStoreClient ,在其initialize方法中执行blockStoreClient.init(appId),这里的blockStoreClient称为shuffleClient(这里是ExternalShuffleClient)在shuffleClient 只是将一些实用对象设置为工厂来创建远程连接。

稍后 BlockManager 调用registerWithShuffleServer方法,这时ESS shuffle 服务会知道executor 存储 shuffle 文件的位置。

代码语言:javascript
复制
// blockManager类
// [1] executor 向ESS注册
// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled&& !blockManagerId.isDriver) {
  registerWithExternalShuffleServer()
}

   // [2] 封装localDirs, shuffle data的位置信息
    val shuffleConfig = new ExecutorShuffleInfo(
      diskBlockManager.localDirsString,
      diskBlockManager.subDirsPerLocalDir,
      shuffleManagerMeta)

 // [3] 向ESS发送RegisterExecutor消息
    try (TransportClient client = clientFactory.createClient(host, port)) {
      ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
      // 
      client.sendRpcSync(registerMessage, registrationTimeoutMs);
    }

最终会将其存放在ESS维护的executors列表中,它是以下数据结构ConcurrentMap<AppExecId, ExecutorShuffleInfo> 。

接下来我们来分析下,reducer如何通过ESS来获取shuffle数据块。

获取shuffle block的请求在ExternalShuffleClient的fetchBlocks方法中生成。获取的过程使用RetryingBlockFetcher实例,它可以在失败时重试获取块。实际上,获取过程最终是由OneForOneBlockFetcher类实现的,它负责发送请求以检索所需的块。

代码语言:javascript
复制
RetryingBlockTransferor.BlockTransferStarter blockFetchStarter =
    (inputBlockId, inputListener) -> {
      // Unless this client is closed.
      if (clientFactory != null) {
        assert inputListener instanceof BlockFetchingListener :
          "Expecting a BlockFetchingListener, but got " + inputListener.getClass();
        TransportClient client = clientFactory.createClient(host, port, maxRetries > 0);
        new OneForOneBlockFetcher(client, appId, execId, inputBlockId,
          (BlockFetchingListener) inputListener, transportConf, downloadFileManager).start();
      } else {
        logger.info("This clientFactory was closed. Skipping further block fetch retries.");
      }
    };

可以看到这里的代码和我们在shuffle reader中讲解的是一致的。

  1. 首先,请求获取(block id, chunks 数)组成的键值对。
  2. 其次,请求获取chunks 块的具体内容。

下面我们再来总结下chunks块获取的详细流程:

U2led.png

chunks块的获取有两种模式,分别是流模式或批处理模式。

流模式操作是通过 TransportClient 的stream方法实现的。它包括向TransportRequestHandler的实例发送 StreamRequest 消息。处理程序通知客户端打开用于发送所需数据的 TCP 连接,然后传输发生在整个连接中,在单个 TCP 连接中向客户端发送所需的数据。

批处理模式操作是使用 TransportClient 的fetchChunk方法实现的该请求方法包含要获取的block的索引。处理程序只向客户端返回这个特定的数据块,所以它是每个请求响应一个块。

ESS的配置与使用

ESS shuffle 服务的配置以spark.shuffle.service前缀开头:

  • spark.shuffle.service.enabled - 定义ESS服务是否启用。
  • spark.shuffle.service.port - 定义运行ESS shuffle 服务的端口。由于该服务应该与执行程序在同一节点上运行,因此配置中不存在主机。
  • spark.shuffle.service.index.cache.size - 确定缓存的大小。在开启ESS shuffle 服务情况下,用于缓存存储索引文件信息。它避免了每次获取块时打开/关闭这些文件。主要用于基于排序的 shuffle 数据。

学完External Shuffle Service,下面是一些思考题:

  1. External Shuffle Service的优势是什么?shuffle data是否被存储在ESS中?
  2. 为什么在Spark动态资源分配时需要ESS服务?
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-06-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 为什么需要ESS ?
  • ESS的架构与优势
  • ESS源码初探
  • ESS的配置与使用
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档