目录
Horovod 是Uber于2017年发布的一个易于使用的高性能的分布式训练框架,在业界得到了广泛应用。
本系列将通过源码分析来带领大家了解 Horovod。接下来几篇介绍 horovod 如何运行在 spark 之上。本文是第八篇,介绍 horovod on spark 的总体架构。
Horovod on spark 的目的就是让 horovod 能跑到 spark 集群上,从而把数据处理,模型训练,模型评估这一个机器学习的循环都放在Spark技术栈之中。
本系列其他文章如下:
[源码解析] 深度学习分布式训练框架 Horovod (1) --- 基础知识
[源码解析] 深度学习分布式训练框架 horovod (2) --- 从使用者角度切入
[源码解析] 深度学习分布式训练框架 horovod (3) --- Horovodrun背后做了什么
[源码解析] 深度学习分布式训练框架 horovod (4) --- 网络基础 & Driver
[源码解析] 深度学习分布式训练框架 horovod (5) --- 融合框架
[源码解析] 深度学习分布式训练框架 horovod (6) --- 后台线程架构
[源码解析] 深度学习分布式训练框架 horovod (7) --- DistributedOptimizer
Spark是一个分布式通用计算框架,而以 tensorflow 为代表的深度学习框架是分布式模型训练框架,这些框架更多专注用迭代来计算梯度。很多业内公司都是用spark来获取/处理数据,然后把spark处理好的数据结果发给Tensorflow进行训练。
目前我们已经知道,Horovod 可以把 Tensorflow等深度学习框架和MPI紧密结合起来,那么为什么要再把 spark 整合进来呢?整合的意义在哪里?具体如下:
Horovod 需要解决的核心问题是:如何将spark作为分布式tensorflow的底层调动机制,从而通过spark executor就可以把 tensorflow 的进程调动起来,这样进行tensorflow训练时就不需要手动地去组建网络。
因此能想到的其他问题是:
我们在随后一一分析。
简要来说,Spark分成几个角色:
当我们用python编写程序时,其实使用的是 Pyspark 接口。所以我们介绍一下 pyspark,可以和 Horovod 做比对。
如果我们使用Java或者Scala开发Spark相关程序,Driver 和 Executor 运行任务的载体是Java虚拟机(JVM)。但是 Python 使用的是 Python自己的虚拟机,这就产生了一个问题,核心架构是基于JVM还是PVM。
为了保持核心架构一致性,Spark依然使用JVM作为核心,核心功能依然基于JVM,其中包括:申请计算资源,管理/分配task,driver与executor之间的通信等等。在此核心架构外围则封装了一层python。
因此,PySpark 采用了 Python进程和JVM 进程分离的多进程架构,在 Driver和Executor 端都同时有 Python和JVM 两个进程。
如果用户提交一个Python 脚本,Spark Driver 会:
a.map(lambda x:(x,1))
,则这个rdd的操作会映射到JVM之中被执行。在Executor则正好相反,因为Executor端运行的Task逻辑(序列化后的字节码)是由Driver发过来的,所以 Executor 本来是可以直接运行Task,并不需要借助任何Py4j。但是因为Python脚本中会存在用户定义的python函数(或者Lambda表达式),所以Executor必须再启动Python进程进行相关处理:
交互流程如下图,实线是方法调用,虚线是返回结果。
架构图如下:
机器学习算法和计算机领域的其他算法相比,有自己的一些独特特点。例如:
以上这些特点决定了机器学习系统的设计和其他计算系统的设计有很大区别。和传统分布式系统比较,机器学习系统在通信,同步和容错等方面都活动空间极大。因为大量资源都会浪费在通讯,等待,协调这些非计算任务上,所以导致分布式机器学习任务往往并不能随着机器数量随之的增加而能力也线性提升。
因此,在设计大规模机器学习系统(比如深度学习/逻辑回归/主题模型/矩阵分解等依赖于SGD或者L-BFGS最优化的算法)时,需要解决一系列挑战,比如提高并行度,减少同步等待延迟,容错以及巨大带宽(频繁访问修改模型参数时所需)等。
MPI 的主要缺点是:
Spark在一定层度上解决了MPI的问题。
Spark训练的一个最最简陋的整体流程图如下:
+----------------+
| |
| Spark Driver |
| |
+----------------+
+
Map Stage | Reduce Stage
|
|
+--------------------+ |
| Spark Executor | |
| +----------+
+-------> | User function | | |
| | | | |
| +--------------------+ | |
| | |
| +--------------------+ | | +------------------+
| | Spark Executor | | +-> | Spark Executor |
+---+--+ | | | | | +-----+
| Data +----> | User function +------------> | +----> |model|
+---+--+ | | | | User function| +-----+
| +--------------------+ | +-> | |
| | | +------------------+
| +--------------------+ | |
| | Spark Executor | | |
| | | | |
+-------> | User function +----------+
| | |
+--------------------+ +
但是我们发现,这个工作流程只能迭代一次,完全不匹配机器学习需要循环迭代多次的特点,于是还需要修改这个架构。
于是我们修改角色如下:
迭代过程也拓展如下:
最后 reduce 阶段导出模型。
Map Stage +----------------+
1 | 2 | 1
+----------------> | Spark Driver | <-------------------+
| | | |
| +--+------+---+--+ |
| | 3| ^ | |
| | | | | |
| 3 | | | | 3 |
| +----------------+ | | +-------------------+ |
| | | | | |
| v v |1 v |
+-----+----+---------+ +--------+-----------+ +----------+----+----+
| Spark Executor | | Spark Executor | | Spark Executor |
| | | | | |
| User function | | User function | | User function |
| | | | | |
+-------------+------+ +--------+-----------+ +--------+-----------+
| | |
+----------------------------------------------------------------------+
| | |
+-----------+ | +----------------+
| | |
Reduce Stage v v v
+-+------+------+--+
| Spark Executor |
| |
| |
| User function|
| |
+--------+---------+
|4
|
v
+--+--+
|model|
+-----+
我们突然发现,这居然是一个参数服务器的架构了,即 Spark Driver 充当了参数服务器的角色。这和 Horovod 的 ring-allreduce 的架构显然不符合。另外,Spark采用的完全是BSP协议,即第二轮迭代必须等到第一轮迭代所有的机器完成,这也会拖慢我们的训练过程。
所以,我们在深入之前,需要先说说Spark 如果用于机器学习,会有哪些缺陷:
虽然 Spark 对于机器学习来说有各种缺陷,但是对于中等规模的学习确实非常有用,所以就有了 Horovod on spark。我们接下来就要看看 Horovod 是如何处理(缓解)这些问题的。大规模机器学习的目的就是解决"数据和偏差"规模非常大的时候所带来的理论/工程问题。
Tensorflow是C++开发的,而python是机器学习世界的主宰。所以,如果Spark要和TensorFlow 进行整合,一般来说有以下三种方式:
但是 Horovod 的思路又比较别致,可以认为是按照 Spark 的思路,在 Spark 之上又实现了一套自己的。即:
horovod.spark._make_spark_thread
创建了 Spark 集群;num_proc
个 tasks(Horovod TaskService),这些 tasks 都注册到 driver 之上,因此 driver 知道已经启动的所有 task信息(ip,port,路由,...),这些task 也把自己的 host hash(一个被 MPI 当作 host 的字符串)发送给Horovod DriverService ;这样就充分利用了已有的大数据体系的数据和计算特性。其实,绝大多数大规模机器学习的平台/系统都可以看做这由这两个角色构成 :Model node(driver node)和 Data node(worker node)。每个角色都有自己一套计算逻辑。从 Horovod来说,Horovod DriverService 就是 driver node,Horovod TaskService就是 data node:
大致如下,其中 SparkDriverService 对应了Horovod DriverService,SparkTaskService对应了Horovod TaskService:
+------------------------------+
| Horovod Main thread |
| |
| |
| SparkDriverService |
| |
| +----------------+ |
| | Spark Driver | |
| +----------------+ |
+------------------------------+
|
|
+--------------------------------------------------------+
| | |
| | |
v v v
+------------------------+ +----------------------+ +------------------------+
| Spark Executor | | Spark Executor | | Spark Executor |
| | | | | |
| +-------------------+ | | +------------------+ | | +-------------------+ |
| | SparkTaskService | | | | SparkTaskService | | | | SparkTaskService | |
| | | | | | | | | | | |
| | TensorFlow | | | | TensorFlow | | | | TensorFlow | |
| | | | | | | | | | | |
| | | | | | | | | | | |
| | MPI | | | | MPI | | | | MPI | |
| | + | | | | + | | | | + | |
| | | | | | | | | | | | | | |
| +-------------------+ | | +------------------+ | | +-------------------+ |
| | | | | | | | |
| | | | | | | | |
+------------------------+ +----------------------+ +------------------------+
| | |
| | |
| | |
+----------------------------+---------------------------+
手机如下:
具体分析如下。
mpi_run
来在这些 tasks 之中启动 python function(通过 RPC)。 horovod.spark.driver.mpirun_rsh
来连接每个 Executor,然后 "remote shell" 到这些 spark executors 之中。horovod.spark.driver.mpirun_rsh
是与每个 host hash 之中 最小 index 的 task进行通信,这个 task 就执行 MPI 的 orted
命令。因此,每个 Executor 之中只会运行一个 mpi orted
进程,即使这个 executor 有多个 tasks。其他的 非orted
进程 task会等待 orted
进程 task 结束。_run_command
在 spark 之中启动训练job。具体如下: 备注:
Hovorod 期望所有的 task 都同时运行,因此 cluster 应该至少提供同样个数的 core,每个 executor 可以有多个 core,因此一个 executor 可以处理多个 tasks,host 可以有多个 executor。
具体如下图:
+--------------------------+ +---------------------------------+ +-------------------------+
| Horovod Main thread | | Spark Executor | | Spark Executor |
| | | | | |
| | | | | |
| +--------------------+ | 1 register | +----------------------+ | | +--------------------+ |
| | SparkDriverService +<---------------------------------+ SparkTaskService | | | | SparkTaskService | |
| | | | | | | | | | | |
| | | | 2 notify start | | | | | | | |
| | +--------------------------------> | | | | | | |
| | | | | | | | | | | |
| | | | | | | | | | | |
| | | | 3 RunCommandRequest | | | | | | | |
| | +---------------------------------------> orted mpirun_rsh| | | | | |
| | | | | | + | | | | | |
| | | | | | | 4 | | | | | |
| | | | | | | | | | | | |
| | | | | | v | | | | | |
| | | | | | task_exec | | | | | |
| | | | | | + | | | | | |
| | | | | | | 5 | | | | | |
| | | | + | | | | | | | |
| | | |6 set_local_rank_to_rank | v | | | | | |
| | +-------------------------+---------> SparkTaskClient | | | | | |
| | | | | | | | | | | |
| | | | | | +------------------+ | | | | +----------------+ | |
| | | | 7 code() | | | | | | | | | | | |
| | +----------------------------------------> 8 train() | | | | | | train() | | |
| | | | | | | | | | | | | | | |
| | | | | | | MPI <----------------------> MPI | | |
| | | | | | | | | | | | | | | |
| | | | | | +------------------+ | | | | +----------------+ | |
| +--------------------+ | | +----------------------+ | | +--------------------+ |
+--------------------------+ +---------------------------------+ +-------------------------+
手机如下:
在 Horovod 源码中,有一个架构图。我们可以大致了解其架构。
但是因为这部分实在复杂,所以单凭这一幅图很难了解其实现,所以我们需要做深入研究。
首先我们看看 Driver 的特点。
我们首先用普通Horovod驱动做个对比。
在没有 spark 的情况下,假设有多个 hosts,需要获取到这些 host 之间的路由信息。因为 host 之间是一个环形,构成了 ring allreduce。
Tasks ping each other in a circular fashion to determine interfaces reachable within the cluster.
Driver 服务由 HorovodRunDriverService 提供,Task 服务由 HorovodRunTaskService 等提供。
其功能主要是维护各种 task 地址以及相应关系。具体各种 task 地址就是 Task 服务 来注册的。
需要注意的是:HorovodRunDriverService 和 HorovodRunTaskService 都最终继承了 network.BasicService,他们之间可以是异地运行交互。
在 Hovorod on spark 状态下,我们的训练函数实际上是在 Spark Executor 中运行,因为面对的情况不同,所以我们对于 Driver 需求是不同的。之前记录的是 host 之间的路由以及 driver & tasks 对应关系。现在需要知道 spark Executor 之间的路由,以及 driver & tasks 对应关系。
从源码中找到示例代码如下,可以看到,horovod.spark.run 是入口。
# Horovod: run training.
history, best_model_bytes = \
horovod.spark.run(train_fn, args=(model_bytes,), num_proc=args.num_proc,
stdout=sys.stdout, stderr=sys.stderr, verbose=2,
prefix_output_with_timestamp=True)[0]
fn 就是训练函数,被用户代码传进来的,具体被赋值之后,在 SparkDriverService 之中保存(具体是在其成员变量 _fn 之中),以后会使用。这样就解决了代码发布问题。
driver = driver_service.SparkDriverService(settings.num_proc, settings.num_proc,
fn, args, kwargs,
settings.key, settings.nics)
Horovod.spark.run
的逻辑是:
具体代码如下:
def run(fn, args=(), kwargs={}, num_proc=None, start_timeout=None,
use_mpi=None, use_gloo=None, extra_mpi_args=None,
env=None, stdout=None, stderr=None, verbose=1, nics=None,
prefix_output_with_timestamp=False):
# 处理各种配置,比如timeout,nice...
if start_timeout is None:
# Lookup default timeout from the environment variable.
start_timeout = int(os.getenv('HOROVOD_SPARK_START_TIMEOUT', '600'))
# nics needs to be a set
if nics and not isinstance(nics, set):
nics = set(nics)
tmout = timeout.Timeout(start_timeout, message)
settings = hvd_settings.Settings(verbose=verbose,
extra_mpi_args=extra_mpi_args,
key=secret.make_secret_key(),
start_timeout=tmout,
nics=nics,
run_func_mode=True,.....)
# 获取 spark 信息,比如从 pyspark 之中获取SparkContext
spark_context = pyspark.SparkContext._active_spark_context
settings.num_proc = num_proc
result_queue = queue.Queue(1)
# 利用 _make_spark_thread 来启动 spark executor(以及在每一个 spark executor 之中启动一个SparkTaskService)
# start Spark driver service and launch settings.num_proc Spark tasks
spark_job_group = 'horovod.spark.run.%d' % job_id.next_job_id()
driver = driver_service.SparkDriverService(settings.num_proc, settings.num_proc,
fn, args, kwargs,
settings.key, settings.nics)
gloo_is_used = is_gloo_used(use_gloo=use_gloo, use_mpi=use_mpi, use_jsrun=False)
spark_thread = _make_spark_thread(spark_context, spark_job_group, driver,
result_queue, settings,
use_gloo=gloo_is_used, is_elastic=False)
try:
# 等待第一阶段结束,即 等待所有 spark task 都结束
# wait for all tasks to register, notify them and initiate task-to-task address registration
_notify_and_register_task_addresses(driver, settings)
# Determine the index grouping based on host hashes.
# Barrel shift until index 0 is in the first host.
host_hashes = list(driver.task_host_hash_indices().keys())
host_hashes.sort()
while 0 not in driver.task_host_hash_indices()[host_hashes[0]]:
host_hashes = host_hashes[1:] + host_hashes[:1]
settings.hosts = ','.join('%s:%d' % (host_hash, len(driver.task_host_hash_indices()[host_hash]))
for host_hash in host_hashes)
# Run the job,启动训练
_launch_job(use_mpi, use_gloo, settings, driver, env, stdout, stderr)
except:
# Terminate Spark job.
spark_context.cancelJobGroup(spark_job_group)
# Re-raise exception.
raise
finally:
spark_thread.join()
driver.shutdown()
# Make sure Spark Job did not fail.
driver.check_for_spark_job_failure()
# get ranks from driver
indices_in_rank_order = _get_indices_in_rank_order(driver)
# If there's no exception, execution results are in this queue.
results = result_queue.get_nowait()
return [results[index] for index in indices_in_rank_order]
既然知道了总体代码,下一篇我们就介绍 Horovod on spark 如何启动,敬请期待。
至此,我们分析了 Horovod on spark 的总体架构,几个相关问题回答如下:
num_proc
个 tasks,这些 tasks 都注册到 driver 之上;horovod.spark.driver.mpirun_rsh
来连接每个 Executor,然后 "remote shell" 到这些 executors 之中。我们在一篇文章中会继续深入 Horovd on Spark。
★★★★★★关于生活和技术的思考★★★★★★
微信公众账号:罗西的思考
如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。