摘 要
分布式系统通常在一个机器集群上运行,同时运行的几百台机器中某些出问题的概率大大增加,所以容错设计是分布式系统的一个重要能力。
Spark以前的集群容错处理模型,像MapReduce,将计算转换为一个有向无环图(DAG)的任务集合,这样可以通过重复执行DAG里的一部分任务来完成容错恢复。但是由于主要的数据存储在分布式文件系统中,没有提供其他存储的概念,容错过程需要在网络上进行数据复制,从而增加了大量的消耗。所以,分布式编程中经常需要做检查点,即将某个时机的中间数据写到存储(通常是分布式文件系统)中。
RDD也是一个DAG,每一个RDD都会记住创建该数据集需要哪些操作,跟踪记录RDD的继承关系,这个关系在Spark里面叫lineage。由于创建RDD的操作是相对粗粒度的变换(如map
、filter
、join
),即单一的操作应用于许多数据元素,而不需存储真正的数据,该技巧比通过网络复制数据更高效。当一个RDD的某个分区丢失时,RDD有足够的信息记录其如何通过其他RDD进行计算,且只需重新计算该分区,这是Spark的一个创新。
Spark的lineage也不是完美解决所有问题的,因为RDD之间的依赖分为两种,如下图所示:
根据父RDD分区是对应一个还是多个子RDD分区,依赖分为如下两种。
对于窄依赖,只需要通过重新计算丢失的那一块数据来恢复,容错成本较小。但如果是宽依赖,则当容错重算分区时,因为父分区数据只有一部分是需要重算子分区的,其余数据重算就造成了冗余计算。
所以,不同的应用有时候也需要在适当的时机设置数据检查点。由于RDD的只读特性使得它比常用的共享内存更容易做检查点,具体可以使用doCheckPoint
方法。
在有些场景的应用中,容错会更复杂,比如计费服务等,要求零丢失。还有在Spark支持的Streaming计算的应用场景中,系统的上游不断产生数据,容错过程可能造成数据丢失。为了解决这些问题,Spark也提供了预写日志(也称作journal),先将数据写入支持容错的文件系统中,然后才对数据施加这个操作。
另外,Kafka和Flume这样的数据源,接收到的数据只在数据被预写到日志以后,接收器才会收到确认消息,已经缓存但还没有保存的数据在Driver程序重新启动之后由数据源从上一次确认点之后重新再发送一次。
这样,所有的数据要不从日志中恢复,要不由数据源重发,实现了零丢失。
Spark Master的容错分为两种情况:Standalone集群模式和单点模式。
Standalone集群模式下的Master容错是通过ZooKeeper来完成的,即有多个Master,一个角色是Active,其他的角色是Standby。当处于Active的Master异常时,需要重新选择新的Master,通过ZooKeeper的ElectLeader功能实现。关于ZooKeeper的实现,这里就不展开了,感兴趣的朋友可以参考Paxos。
要使用ZooKeeper模式,你需要在conf/spark-env.sh中为SPARK_DAEMON_JAVA_OPTS
添加一些选项,详见下表。
系统属性 | 说明 |
---|---|
spark.deploy.recoveryMode | 默认值为NONE。设置为ZOOKEEPER后,可以在Active Master异常之后重新选择一个Active Master |
spark.deploy.zookeeper.url | ZooKeeper集群地址(比如192.168.1.100:2181,192.168.1.101:2181) |
spark.deploy.zookeeper.dir | 用于恢复的ZooKeeper目录,默认值为/spark |
设置SPARK_DAEMON_JAVA_OPTS
的实际例子如下:
SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS
-Dspark.deploy.recoveryMode =ZOOKEEPER"
应用程序启动运行时,指定多个Master地址,它们之间用逗号分开,如下所示:
MASTER=spark://192.168.100.101:7077,spark://192.168.100.102:7077 bin/spark-shell
在ZooKeeper模式下,恢复期间新任务无法提交,已经运行的任务不受影响。
此外,Spark Master还支持一种更简单的单点模式下的错误恢复,即当Master进程异常时,重启Master进程并从错误中恢复。具体方法是设置spark.deploy.recoveryMode
属性的值为FILESYSTEM
,并为spark.deploy.recoveryDirectory
属性设置一个本地目录,用于存储必要的信息来进行错误恢复。
Slave节点运行着Worker、执行器和Driver程序,所以我们分三种情况讨论下3个角色分别退出的容错过程。
StatusUpdate
,于是Driver会将注册的执行器移除,Worker收到LaunchExecutor
指令,再次启动执行器。