前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark整合Ray思路漫谈(2)

Spark整合Ray思路漫谈(2)

作者头像
用户2936994
发布2022-07-21 13:48:13
8330
发布2022-07-21 13:48:13
举报
文章被收录于专栏:祝威廉祝威廉

上一篇 关于spark 和ray整合的文章在这:

祝威廉:Spark整合Ray思路漫谈

另外还讲了讲Spark 和Ray 的对比:

祝威廉:从MR到Spark再到Ray,谈分布式编程的发展

现在我们来思考一个比较好的部署模式,架构图大概类似这样:

首先,大家可以理解为k8s已经解决一切了,我们spark,ray都跑在K8s上。但是,如果我们希望一个spark 是实例多进程跑的时候,我们并不希望是像传统的那种方式,所有的节点都跑在K8s上,而是将executor部分放到yarn cluster. 在我们的架构里,spark driver 是一个应用,我们可以启动多个pod从而获得多个spark driver实例,对外提供负载均衡,roll upgrade/restart 等功能。也就是k8s应该是面向应用的。但是复杂的计算,我们依然希望留给Yarn,尤其是还涉及到数据本地性,然计算和存储放到一起(yarn和HDFS通常是在一起的),避免k8s和HDFS有大量数据交换。

因为Yarn对Java/Scala友好,但是对Python并不友好,尤其是在yarn里涉及到Python环境问题会非常难搞(主要是Yarn对docker的支持还是不够优秀,对GPU支持也不好),而机器学习其实一定重度依赖Python以及非常复杂的本地库以及Python环境,并且对资源调度也有比较高的依赖,因为算法是很消耗机器资源的,必须也有资源池,所以我们希望机器学习部分能跑在K8s里。但是我们希望整个数据处理和训练过程是一体的,算法的同学应该无法感知到k8s/yarn的区别。为了达到这个目标,用户依然使用pyspark来完成计算,然后在pyspark里使用ray的API做模型训练和预测,数据处理部分自动在yarn中完成,而模型训练部分则自动被分发到k8s中完成。并且因为ray自身的优势,算法可以很好的控制自己需要的资源,比如这次训练需要多少GPU/CPU/内存,支持所有的算法库,在做到对算法最少干扰的情况下,然算法的同学们有最好的资源调度可以用。

下面展示一段MLSQL代码片段展示如何利用上面的架构:

代码语言:javascript
复制
-- python 训练模型的代码
set py_train='''
import ray
ray.init()

@ray.remote(num_cpus=2, num_gpus=1)
def f(x):
    return x * x

futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))
''';
load script.`py_train` as py_train;

-- 设置需要的python环境描述
set py_env='''
''';
load script.`py_env` as py_env;

-- 加载hive的表
load hive.`db1.table1` as table1;

-- 对Hive做处理,比如做一些特征工程
select features,label from table1 as data;

-- 提交Python代码到Ray里,此时是运行在k8s里的
train data as PythonAlg.`/tmp/tf/model`
where scripts="py_train"
and entryPoint="py_train"
and condaFile="py_env"
and  keepVersion="true"
and fitParam.0.fileFormat="json" -- 还可以是parquet
and `fitParam.0.psNum`="1";

下面是PySpark的示例代码:

代码语言:javascript
复制
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.sql import SparkSession
import logging
import ray

from pyspark.sql.types import StructField, StructType, BinaryType, StringType, ArrayType, ByteType
from sklearn.naive_bayes import GaussianNB
import os
from sklearn.externals import joblib
import pickle
import scipy.sparse as sp
from sklearn.svm import SVC
import io
import codecs

os.environ["PYSPARK_PYTHON"] = "/Users/allwefantasy/deepavlovpy3/bin/python3"
logger = logging.getLogger(__name__)

base_dir = "/Users/allwefantasy/CSDNWorkSpace/spark-deep-learning_latest"
spark = SparkSession.builder.master("local[*]").appName("example").getOrCreate()

data = spark.read.format("libsvm").load(base_dir + "/data/mllib/sample_libsvm_data.txt")

## 广播数据
dataBr = spark.sparkContext.broadcast(data.collect())

## 训练模型 这部分代码会在spark executor里的python worker执行
def train(row):
    import ray
    ray.init()
    train_data_id = ray.put(dataBr.value)
    ## 这个函数的python代码会在K8s里的Ray里执行
    @ray.remote
    def ray_train(x):
        X = []
        y = []
        for i in ray.get(train_data_id):
            X.append(i["features"])
            y.append(i["label"])
        if row["model"] == "SVC":
            gnb = GaussianNB()
            model = gnb.fit(X, y)
            # 为什么还需要encode一下?
            pickled = codecs.encode(pickle.dumps(model), "base64").decode()
            return [row["model"], pickled]
        if row["model"] == "BAYES":
            svc = SVC()
            model = svc.fit(X, y)
            pickled = codecs.encode(pickle.dumps(model), "base64").decode()
            return [row["model"], pickled]

    result = ray_train.remote(row)
    ray.get(result)

    


##训练模型 将模型结果保存到HDFS上
rdd = spark.createDataFrame([["SVC"], ["BAYES"]], ["model"]).rdd.map(train)
spark.createDataFrame(rdd, schema=StructType([StructField(name="modelType", dataType=StringType()),
                                              StructField(name="modelBinary", dataType=StringType())])).write. \
    format("parquet"). \
    mode("overwrite").save("/tmp/wow")

这是一个标准的Python程序,只是使用了pyspark/ray的API,我们就完成了上面所有的工作,同时训练两个模型,并且数据处理的工作在spark中,模型训练的在ray中。

完美结合!最重要的是解决了资源管理的问题!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-12-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档