前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >0835-5.16.2-如何按需加载Python依赖包到Spark集群

0835-5.16.2-如何按需加载Python依赖包到Spark集群

作者头像
Fayson
发布2021-04-30 12:12:22
2.9K0
发布2021-04-30 12:12:22
举报
文章被收录于专栏:Hadoop实操Hadoop实操Hadoop实操

1.文档编写目的

在开发Pyspark代码时,经常会用到Python的依赖包。在PySpark的分布式运行的环境下,要确保所有节点均存在我们用到的Packages,本篇文章主要介绍如何将我们需要的Package依赖包加载到我们的运行环境中,而非将全量的Package包加载到Pyspark运行环境中,本篇文章以xgboost1.0.2包为例来介绍。

  • 测试环境:

1.Redhat7.6

2.CDH5.16.2

3.使用root用户操作

2.环境检查

1.确保集群所有节点已安装了相同的Python版本,测试环境使用了Anaconda来部署统一的Python环境。

2.找一个任意OS节点装上Python3.6.4+版本,用来准备提取依赖包

配置pip使用国内的Python源

[root@cdh02 ~]# cat /etc/pip.conf 
[global]
index-url = https://mirrors.aliyun.com/pypi/simple/
[install]
user = true
trusted-host=mirrors.aliyun.com

3.在上一步的节点上安装xgboost1.0.2依赖包

/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/pip install xgboost==1.0.2

xgboost包安装成功后默认在/root/.local/lib/python3.7/site-packages目录下

验证xgboost包是否安装安装成功

4.将安装成功的xgboost包打包成zip并上传到hdfs目录

cd /root/.local/lib/python3.6/site-packages/
zip -r xgb.zip xgboost

将准备好的xgb.zip包上传到hdfs的/python/dependency/目录下

hadoop fs -mkdir -p /python/dependency
hadoop fs -put xgb.zip /python/dependency/
hadoop fs -ls /python/dependency

3.Pyspark中加载依赖包

1.在初始化SparkSession对象时指定spark.yarn.dist.archives参数

spark = SparkSession\
    .builder\
    .appName("PythonPi")\
    .config('spark.yarn.dist.archives', 'hdfs:///python/dependency/xgb.zip#xgb')\
    .getOrCreate()

注意:指定的路径是HDFS上的路径,路径后的#xgb是必须指定的,xgb可以任意命令,需要和后面代码使用一致即可。

2.自定义一个函数,主要用来加载Python的环境变量(在执行分布式代码时需要调用该函数,否则Executor的运行环境不会加载Python依赖)

def fun(x):
  import sys
  import os
  sys.path.append(os.getcwd() + "/" + "xgb")
  import xgboost
  return xgboost.__version__

3.接下来就是在代码中使用定义的function

sc = spark.sparkContext
rdd = sc.parallelize([1,2,3,4,5,6,7], 3)
rdd.map(lambda x: fun(x)).distinct().collect()

4.通过上述的方式在执行Executor时加载Python的依赖包到运行环境中解决Pyspark对Packages依赖问题,完整示例代码如下:

from __future__ import print_function
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession

import os
py_environ=os.environ['CONDA_DEFAULT_ENV']
if py_environ=='python2.7':
  os.environ['PYSPARK_PYTHON'] = '/usr/bin/python'
else:
  os.environ['PYSPARK_PYTHON'] = '/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/python'

spark = SparkSession\
    .builder\
    .appName("PythonPi")\
    .config('spark.yarn.dist.archives', 'hdfs:///python/dependency/xgb.zip#xgb')\
    .getOrCreate()

def fun(x):
  import sys
  import os
  sys.path.append(os.getcwd() + "/" + "xgb")
  import xgboost
  return xgboost.__version__

sc = spark.sparkContext

rdd = sc.parallelize([1,2,3,4,5,6,7], 3)
rdd.map(lambda x: fun(x)).distinct().collect()

4.运行结果验证

执行Pyspark代码验证所有的Executor是否有加载到xgboost依赖包

5.总结

1.存放在HDFS上的第三方依赖包可以存在多个,也可以将多个package包打包到一个zip包里。

2.注意zip中的依赖包一定是通过pip命令成功安装后的packages,而不是直接下在下来的安装包。

3.在指定spark.yarn.dist.archives路径时,必须指定在路径最后加上#号和一个别名,该别名会在运行Executor和driver时作为zip包解压的目录存在。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Hadoop实操 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档