在基于Hadoop集群的大规模分布式深度学习一文中,雅虎介绍了其集Caffe和Spark之长开发CaffeOnSpark用于大规模分布式深度学习,并向github.com/BVLC/caffe贡献了部分代码。现在,雅虎机器学习团队又在这篇tumblr文章上宣布将整个CaffeOnSpark开源作为Spark的深度学习包。
Github:yahoo/CaffeOnSpark(Apache 2.0 license)
许多现有的DL框架需要一个分离的集群进行深度学习,而一个典型的机器学习管道需要创建一个复杂的程序(如图1)。分离的集群需要大型的数据集在它们之间进行传输,从而系统的复杂性和端到端学习的延迟不请自来。
图1 分离集群上复杂程序的ML Pipeline
雅虎认为,深度学习应该与现有的支持特征工程和传统(非深度)机器学习的数据处理管道在同一个集群中,创建CaffeOnSpark意在使得深度学习训练和测试能被嵌入到Spark应用程序(如图2)中。
图2 单一集群上单程序的ML Pipeline
CaffeOnSpark被设计成为一个Spark深度学习包。Spark MLlib支持各种非深度学习算法用于分类、回归、聚类、推荐等,但目前缺乏深度学习这一关键能力,而CaffeOnSpark旨在填补这一空白。CaffeOnSpark API支持dataframes,以便易于连接准备使用Spark应用程序的训练数据集,以及提取模型的预测或中间层的特征,用于MLLib或SQL数据分析。
图3 CaffeOnSpark成为一个Spark深度学习package
使用CaffeOnSpark和MLlib的Scala应用如下:
1: def main(args: Array[String]): Unit = {
2: val ctx = new SparkContext(new SparkConf())
3: val cos = new CaffeOnSpark(ctx)
4: val conf = new Config(ctx, args).init()
5: val dl_train_source = DataSource.getSource(conf, true)
6: cos.train(dl_train_source)
7: val lr_raw_source = DataSource.getSource(conf, false)
8: val extracted_df = cos.features(lr_raw_source)
9: val lr_input_df = extracted_df.withColumn(“Label”, cos.floatarray2doubleUDF(extracted_df(conf.label)))10: .withColumn(“Feature”, cos.floatarray2doublevectorUDF(extracted_df(conf.features(0))))11: val lr = new LogisticRegression().setLabelCol(“Label”).setFeaturesCol(“Feature”)12: val lr_model = lr.fit(lr_input_df)13: lr_model.write.overwrite().save(conf.outputPath)14: }
这段代码演示了CaffeOnSpark和MLlib如何协同:
CaffeOnSpark使得深度学习步骤能够无缝嵌入Spark应用。它消除了在传统的解决方案不得不做的数据移动(如图1所示),并支持直接在大数据集群上进行深度学习。直接访问大数据和大规模计算能力对深度学习至关重要。
如同标准的CaffeCa,CaffeOnSpark用配置文件于求解器和神经网络。正如例子中的神经网络有一个MemoryData层有2个额外的参数:
最初发布的CaffeOnSpark有几个内置的数据源类(包括com.yahoo.ml.caffe.LMDB的LMDB数据库和com.yahoo.ml.caffe.SeqImageDataSource的Hadoop的序列文件)。用户可以很容易地引入自定义的数据源类与现有的数据格式进行交互。
CaffeOnSpark应用程序将通过标准的Spark命令(如spark-submit)launch。这里有两个spark-submit命令的例子。第一个命令使用CaffeOnSpark训练一个DNN模型保存到HDFS上。第二个命令是一个定制的嵌入CaffeOnSpark及MLlib的应用。
第一个命令:
spark-submit \
–files caffenet_train_solver.prototxt,caffenet_train_net.prototxt \
–num-executors 2 \
–class com.yahoo.ml.caffe.CaffeOnSpark \
caffe-grid-0.1-SNAPSHOT-jar-with-dependencies.jar \
-train -persistent \
-conf caffenet_train_solver.prototxt \
-model hdfs:///sample_images.model \
-devices 2
第二个命令:
spark-submit \
–files caffenet_train_solver.prototxt,caffenet_train_net.prototxt \
–num-executors 2 \
–class com.yahoo.ml.caffe.examples.MyMLPipeline \
caffe-grid-0.1-SNAPSHOT-jar-with-dependencies.jar \
-features fc8 \
-label label \
-conf caffenet_train_solver.prototxt \
-model hdfs:///sample_images.model \
-output hdfs:///image_classifier_model \
-devices 2
图4 CaffeOnSpark系统架构
CaffeOnSpark系统架构如图4所示(和之前相比没有变化)。Spark executor中,Caffe引擎在GPU设备或CPU设备上,通过调用一个细颗粒内存管理的JNI层。不同于传统的Spark应用,CaffeOnSpark executors之间通过MPI allreduce style接口通信,通过TCP/以太网或者RDMA/Infiniband。这个Spark+MPI架构使得CaffeOnSpark能够实现和专用深度学习集群相似的性能。
许多深度学习工作是长期运行的,处理潜在的系统故障很重要。CaffeOnSpark支持定期快照训练状态,因此job出现故障后能够恢复到之前的状态。
雅虎已经在多个项目中应用CaffeOnSpark,如Flickr小组通过在Hadoop集群上用CaffeOnSpark训练数百万张照片,显著地改进图像识别精度。现在深度学习研究者可以在一个AWS EC2云或自建的Spark集群上进行测试CaffeOnSpark。