分布式的 XGBoost 可以用 Spark 来跑,当然也支持用其他分布式的方法去跑,比如用 XGBoost Operator,可以很轻松的实现 XGBoost 算法的分布式执行。
目前在 Kubeflow 的框架下去开发一个机器学习相关的 Operator 已经比较容易了,首先 kubebuilder 打造好 Operator 的框架,然后通过 Kubeflow 社区抽象的 common 包,在新的 Operator 下调整业务逻辑还是比较简单的。XGBoost Operator 也是在这样的背景下诞生的,所以可以看到其源码是相对 tf-operator 这些 Kubeflow 早起的项目,代码会更加简练清晰一点。
重点分析 XGBoost Operator 的 Reconcile
协调方法。
func (r *ReconcileXGBoostJob) Reconcile(request reconcile.Request) (reconcile.Result, error) {
// Fetch the XGBoostJob instance
xgboostjob := &v1alpha1.XGBoostJob{}
err := r.Get(context.Background(), request.NamespacedName, xgboostjob)
if err != nil {
if errors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
return reconcile.Result{}, err
}
// Check reconcile is required.
needSync := r.satisfiedExpectations(xgboostjob)
if !needSync || xgboostjob.DeletionTimestamp != nil {
log.Info("reconcile cancelled, job does not need to do reconcile or has been deleted",
"sync", needSync, "deleted", xgboostjob.DeletionTimestamp != nil)
return reconcile.Result{}, nil
}
// Set default priorities for xgboost job
scheme.Scheme.Default(xgboostjob)
// Use common to reconcile the job related pod and service
err = r.xgbJobController.ReconcileJobs(xgboostjob, xgboostjob.Spec.XGBReplicaSpecs, xgboostjob.Status.JobStatus, &xgboostjob.Spec.RunPolicy)
if err != nil {
logrus.Warnf("Reconcile XGBoost Job error %v", err)
return reconcile.Result{}, err
}
return reconcile.Result{}, err
}
实际上,自定义资源对象 XGBoostJob 由 XGBoost Operator 的 Reconcile
方法来协调就可以了,因为这个方法的背后,是 Kubeflow 的 common 包,会统一再做 Pod/Service 的协调的,所以开发者只要专注自定义资源的协调就够了。
就这?对的,就是挺简单的。
下面运行一个 XGBoost Opearator 提供的 Demo。
按照官方文档,build
镜像。
docker build -f Dockerfile -t kubeflow/xgboost-dist-rabit-test:1.2 ./
镜像里主要运行的代码是 xgboost_smoke_test.py
。
Master 正常运行的日志。
Worker 正常运行的日志。
这个 smoke test 仅仅是建立一个 rabit 拓扑并进行通信的简单例子,运行成功说明 XGBoost Operator 的部署也是成功的,因为 worker 之间以及与 master 通过 pod ip 是可以建立 tcp 连接的。
目前在 Kubeflow Common 包的框架下开发一个分布式的机器学习 Operator 还是比较方便的。