专栏首页Hadoop实操如何在CDH集群上部署Python3运行环境及运行Python作业

如何在CDH集群上部署Python3运行环境及运行Python作业

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。 Fayson的github:https://github.com/fayson/cdhproject

1.文档编写目的


当前有很多工具辅助大数据分析,但最受欢迎的就是Python。Python简单易用,语言有着直观的语法并且提供强大的科学计算和集群学习库。借着最近人工智能,深度学习的兴起,Python成为时下最火的语言,已经超越了Java和C,并且纳入了国家计算机等级考试。本篇文章主要讲述如何在CDH集群基于Anaconda部署Python3的运行环境,并使用示例说明使用pyspark运行Python作业。

  • 测试环境

1.CM和CDH版本为5.11.2

2.采用sudo权限的ec2-user用户操作

3.集群已启用Kerberos

  • 前置条件

1.Spark On Yarn模式

2.基于Anaconda部署Python3


注意在每个worker节点都要部署python3,并且部署目录要相同,因为CDH自带了python2,所以如果需要将python替换成python3,需要在CM界面上修改PYSPARK_PYTHON的路径,下面会有说明。

下载anaconda包,这里选用的版本是Anaconda3-4.2.0-Linux-x86_64,下载地址:

https://repo.continuum.io/archive/Anaconda3-4.2.0-Linux-x86_64.sh

这个版本对应的Python版本是3.5.2,版本需要注意的是PySpark does not work with Python 3.6.0,SPARK-19019

https://issues.apache.org/jira/browse/SPARK-19019

所以我们这里装Python3.5.2版本。

1.进入到安装包目录,执行命令:bashAnaconda3-4.2.0-Linux-x86_64.sh

2.下一步输入回车键

3.在以下界面输入”yes”

4.下一步设置安装路径,我们这里安装在/usr/local/anaconda3目录下

如果提示“tar(child): bzip2: Cannot exec: No such file or directory”,需要先安装bzip2。

ec2-user@ip-172-31-21-45 ~$ sudo yum -y install bzip2

5.安装完后,提示设置anaconda的PATH路径,这里需要设置全局路径,因为要确保pyspark任务提交过来之后可以使用python3,所以输入“no”,重新设置PATH

6.设置全局的anaconda3的PATH

[root@ip-172-31-21-45 ec2-user]# echo "export PATH=/usr/local/anaconda3/bin:$PATH" >> /etc/profile
[root@ip-172-31-21-45 ec2-user]# source /etc/profile
[root@ip-172-31-21-45 ec2-user]# env |grep PATH
PATH=/usr/local/anaconda3/bin:/sbin:/bin:/usr/sbin:/usr/bin
[root@ip-172-31-21-45 ec2-user]# 

7.使用conda创建python3环境并激活

执行命令:

[root@ip-172-31-21-45 ec2-user]# conda create --unknown --offline -n py3 python=3.5

注意:这里创建python3环境时使用了离线模式,即--offline参数,以及—unknown,这种方式适合安装没有依赖的python包,如果有依赖使用conda install会报错,需要搭建一个内部仓库。

8.执行如下命令,激活python3运行环境:

[root@ip-172-31-21-45 ec2-user]# source activate py3

3.在CM配置Python环境变量


1.通过export设置python命令的安装路径:

export PYSPARK_PYTHON=/usr/local/anaconda3/bin/python

export PYSPARK_DRIVER_PYTHON=/usr/local/anaconda3/bin/python

修改完成后,回到CM主页根据提示重启相关服务。

4.pyspark命令测试


1.获取kerberos凭证

2.使用Pyspark命令测试

x = sc.parallelize(1,2,3)

y = x.flatMap(lambda x: (x, 100*x, x**2))

print(x.collect())

print(y.collect())

5.使用spark-submit提交一个Pyspark作业


这个demo主要使用spark-submit提交pyspark job,模拟从hdfs中读取数据,并转换成DateFrame,然后注册表并执行SQL条件查询,将查询结果输出到hdfs中。

1.将测试数据上传至hdfs目录/tmp/examples/

执行put命令上传文件,因为集群启用了Kerberos,所以也要使用kinit获取用户凭证信息

people.txt示例数据:

[ec2-user@ip-172-31-26-80 pysparktest]$ klist
Ticket cache: FILE:/tmp/krb5cc_1000
Default principal: fayson@CLOUDERA.COM

Valid starting       Expires              Service principal
12/17/2017 00:53:31  12/18/2017 00:53:31  krbtgt/CLOUDERA.COM@CLOUDERA.COM
        renew until 12/24/2017 00:53:31
[ec2-user@ip-172-31-26-80 pysparktest]$ hadoop fs -mkdir /tmp/examples/
[ec2-user@ip-172-31-26-80 pysparktest]$ hadoop fs -put people.txt /tmp/examples
[ec2-user@ip-172-31-26-80 pysparktest]$ hadoop fs -cat /tmp/examples/people.txt

2.将pyspark程序上传至CDH集群其中一个节点上,该节点部署了Spark的Gateway角色和Python3

PySparkTest2HDFS.py在pysparktest目录中,内容如下:

# 初始化sqlContext
from pyspark import SparkConf,SparkContext
from pyspark.sql import SQLContext, Row
conf=(SparkConf().setAppName('PySparkTest2HDFS'))
sc=SparkContext(conf=conf)
sqlContext = SQLContext(sc)

# 加载文本文件并转换成Row.
lines = sc.textFile("/tmp/examples/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# 将DataFrame注册为table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

# 执行sql查询,查下条件年龄在13岁到19岁之间
teenagers = sqlContext.sql("SELECT name,age FROM people WHERE age >= 13 AND age <= 19")

# 将查询结果保存至hdfs中
teenagers.write.save("/tmp/examples/teenagers")

3.使用spark-submit命令向集群提交PySpark作业

root@ip-172-31-26-80 pysparktest# spark-submit PySparkTest2HDFS.py

4.作业执行成功

查看Yarn界面

通过以上信息,可以看到作业执行成功。

5.查看生成的文件,如下图:

因为生成的是parquet文件,它是二进制文件,无法直接使用命令查看,所以我们可以在pyspark上验证文件内容是否正确.

我们上面使用spark-submit提交的任务使用sql查询条件是13到19岁,可以看到在pyspark上查询的数据是在这个区间的数据

parquetFile = sqlContext.read.parquet("/tmp/examples/teenagers")

parquetFile.registerTempTable("parquetTable")

teenagers = sqlContext.sql("select* from parquetTable")

teenagers.show()

6.PySpark写数据到MySQL


1.将上面的作业增加如下代码

# 初始化sqlContext
from pyspark import SparkConf,SparkContext
from pyspark.sql import SQLContext, Row
conf=(SparkConf().setAppName('PySparkTest2MySQL'))
sc=SparkContext(conf=conf)
sqlContext = SQLContext(sc)

# 加载文本文件并转换成Row.
lines = sc.textFile("/tmp/examples/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# 将DataFrame注册为table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

# 执行sql查询,查下条件年龄在13岁到19岁之间
teenagers = sqlContext.sql("SELECT name,age FROM people WHERE age >= 13 AND age <= 19")

url = "jdbc:mysql://ip-172-31-22-86.ap-southeast-1.compute.internal:3306/test"
table = "teenagers"
prop = {"user":"root","password":"123456"}

teenagers.write.jdbc(url, table, "append", prop)

2.在命令行加载MySQL的驱动包到Spark环境变量,然后执行命令

[ec2-user@ip-172-31-26-80 pysparktest]$ export SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/cloudera/parcels/CDH/lib/spark/lib/mysql-connector-java-5.1.30.jar
[ec2-user@ip-172-31-26-80 pysparktest]$ spark-submit PySparkTest2Mysql.py 

执行成功

3.使用Yarn查看作业是否运行成功

4.验证MySQL表中是否有数据

注意:这里将数据写入MySQL时需要在环境变量中加载MySQL的JDBC驱动包,MySQL表可以不存在,pyspark在写数据时会自动创建该表。

为天地立心,为生民立命,为往圣继绝学,为万世开太平。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看

本文分享自微信公众号 - Hadoop实操(gh_c4c535955d0f),作者:Fayson

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2017-12-17

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • CENTOS7.2安装CDH5.10和Kudu1.2(一)

    本文档描述CENTOS7.2操作系统部署CDH企业版的过程。Cloudera企业级数据中心的安装主要分为4个步骤:

    Fayson
  • 如何在Redhat7.3的CDH5.14中启用Kerberos

    在前面的文章中,Fayson介绍了《如何在Redhat7.3安装CDH5.14》,这里我们基于这个环境开始安装MIT Kerberos。前面Fayson也介绍过...

    Fayson
  • CentOS6.5安装CDH5.13

    Cloudera前一段时间发布了CDH5.13版本,5.13的新功能可以参考前一篇文章CDH5.13和CM5.13的新功能,本文章主要讲述如何在CentOS6....

    Fayson
  • CENTOS7.2安装CDH5.10和Kudu1.2(一)

    本文档描述CENTOS7.2操作系统部署CDH企业版的过程。Cloudera企业级数据中心的安装主要分为4个步骤:

    Fayson
  • 结合使用 Draft 与 Tencent Kubernetes Engine (TKE)

    Draft 是一种开源工具,有助于在 Kubernetes 群集中打包和部署应用程序容器,让你专注于开发周期 - 专注开发的“内部循环”。 在开发代码期间,但尚...

    张善友
  • 『Python』python 弹窗、提示和警告框MessageBox部件

    风骨散人Chiam
  • 里程碑:RISC-V 基金会批准基础指令集架构与特权架构规范

    近日,RISC-V 基金会宣布批准 RISC-V 基础指令集架构 与 特权架构 规范,为 RISC-V 的可扩展性进一步奠定了基础。

    Debian中国
  • 面试题16(以下java程序输出什么?)

    以下java程序输出什么? public class HelloSoGou { public static synchronized void main(St...

    Java学习
  • 上海公开科技扶持政策,RISC-V架构备受重视

    为了扶持国内芯片的研发工作,各地政府均在政策和资金上不断补助。继去年8月首次发布RISC-V相关支持政策后,上海市经济信息化委员会最近发布了《上海市经济信息化委...

    镁客网
  • 【使用Postman测试WEB接口】设置测试环境与全局变量

    可直接通过切换环境来实现多个环境中的参数切换。常用功能:环境地址切换、全局变量使用

    botkenni

扫码关注云+社区

领取腾讯云代金券