前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用CDSW和运营数据库构建ML应用1:设置和基础

使用CDSW和运营数据库构建ML应用1:设置和基础

作者头像
大数据杂货铺
发布2021-02-07 14:37:30
2.6K0
发布2021-02-07 14:37:30
举报
文章被收录于专栏:大数据杂货铺大数据杂货铺

介绍

Python在数据工程师和数据科学家中被广泛使用,以解决从ETL / ELT管道到构建机器学习模型的各种问题。Apache HBase是用于许多工作流程的有效数据存储系统,但是专门通过Python访问此数据可能会很困难。对于想要利用存储在HBase中的数据的数据专业人士而言,最新的上游项目“ hbase-connectors”可以与PySpark一起使用以进行基本操作。

在本博客系列中,我们将说明如何为基本的Spark使用以及CDSW中维护的作业一起配置PySpark和HBase 。对于不熟悉CDSW的人来说,这是一个安全的、自助式企业数据科学平台,数据科学家可以管理自己的分析管道,从而加快从勘探到生产的机器学习项目。有关CDSW的更多信息,请访问Cloudera Data Science Workbench产品页面。

在这篇文章中,将解释和演示几种操作以及示例输出。就上下文而言,此特定博客文章中的所有示例操作均与CDSW部署一起运行。

先决条件

  1. 具有带有HBase和Spark的CDP集群
  2. 如果要通过CDSW遵循示例,则需要安装它-安装Cloudera Data Science Workbench
  3. Python 3安装在每个节点的同一路径上

配置

首先,HBase和Spark需要配置到一起用于SparkSQL查询工作正常进行。为此,它包括两个部分:首先,通过Cloudera Manager配置HBase Region Server。其次,确保Spark运行时具有HBase绑定。不过要记住的一点是,Cloudera Manager已经设置了一些配置和环境变量,可以自动为您将Spark指向HBase。尽管如此,在所有CDP集群上的所有部署类型中,配置Spark SQL查询的第一步都是通用的,但第二步因部署类型而略有不同。

配置HBase Region Servers

  1. 转到Cloudera Manager,然后选择HBase服务。
  2. 搜索“regionserver environment”
  1. 使用RegionServer环境高级配置代码段(安全阀)添加新的环境变量:
    • Key:HBASE_CLASSPATH
    • Value:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar:/ opt /cloudera/parcels/CDH/jars/scala-library-2.11.12.jar确保使用适当的版本号。
  2. 重新启动Region Server。

完成上述步骤后,请按照以下步骤,根据需要是否依赖CDSW部署。

在非CDSW部署中将HBase绑定添加到Spark运行时

要部署Shell或正确使用spark-submit,请使用以下命令来确保spark具有正确的HBase绑定。

代码语言:javascript
复制
pyspark –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
spark-submit –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar

在CDSW部署中将HBase绑定添加到Spark运行时

要使用HBase和PySpark配置CDSW,需要执行一些步骤。

1)确保在每个集群节点上都安装了Python 3,并记下了它的路径

2)在CDSW中创建一个新项目并使用PySpark模板

3)打开项目,转到设置->引擎->环境变量。

4)将PYSPARK3_DRIVER_PYTHONPYSPARK3_PYTHON设置为群集节点上安装Python的路径(步骤1中指出的路径)。

以下是其外观的示例。

5)在您的项目中,转到文件-> spark-defaults.conf并在工作台中将其打开

6)复制下面的行并将其粘贴到该文件中,并确保在开始新会话之前已将其保存。

代码语言:javascript
复制
spark.jars=/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar

至此,CDSW现在已配置为在HBase上运行PySpark作业!本博客文章的其余部分涉及CDSW部署上的一些示例操作。

示例操作

put操作

有两种向HBase中插入和更新行的方法。第一个也是最推荐的方法是构建目录,该目录是一种Schema,它将在指定表名和名称空间的同时将HBase表的列映射到PySpark的dataframe。构建这种用户定义的JSON格式是最优选的方法,因为它也可以与其他操作一起使用。有关目录的更多信息,请参考此文档http://hbase.apache.org/book.html#_define_catalog。第二种方法是使用一个名为“ hbase.columns.mapping”的特定映射参数,该参数仅接收一串键值对。

  • 使用目录
代码语言:javascript
复制
from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession\
 .builder\
 .appName("SampleApplication")\
 .getOrCreate()

tableCatalog = ''.join("""{
              "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
              "rowkey":"key",
              "columns":{
                "key":{"cf":"rowkey", "col":"key", "type":"int"},
                "empId":{"cf":"personal","col":"empId","type":"string"},
                "empName":{"cf":"personal", "col":"empName", "type":"string"},
                "empState":{"cf":"personal", "col":"empWeight", "type":"string"}
              }
            }""".split())

employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3]))
employeeDF = spark.createDataFrame(employeeMap)

employeeDF.write.format("org.apache.hadoop.hbase.spark") \
  .options(catalog=tableCatalog, newTable=5) \
  .option("hbase.spark.use.hbasecontext", False) \
 .save()
# newTable refers to the NumberOfRegions which has to be > 3

只需打开HBase shell并执行以下命令,即可验证是否在HBase中创建了一个名为“ tblEmployee”的新表:

代码语言:javascript
复制
scan ‘tblEmployee’, {‘LIMIT’ => 2}

使用目录还可以使您轻松加载HBase表。以后的部分将对此进行讨论。

  • 使用hbase.columns.mapping

在编写PySpark数据框时,可以添加一个名为“ hbase.columns.mapping”的选项,以包含正确映射列的字符串。此选项仅允许您将行插入现有表。

在HBase shell中,我们首先创建一个表,创建'tblEmployee2','personal'

现在在PySpark中,使用“ hbase.columns.mapping”插入2行

代码语言:javascript
复制
from pyspark.sql import Row
from pyspark.sql import SparkSession


spark = SparkSession\
 .builder\
  .appName("SampleApplication")\
  .getOrCreate()

employee = [(10, 'jonD', 'Jon Daniels', 170.7), (6, 'billR', 'Bill Robert', 200.1)]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empWeight=float(x[3])))
employeeDF = spark.createDataFrame(employeeMap)

employeeDF.write.format("org.apache.hadoop.hbase.spark") \
       .option("hbase.columns.mapping", "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") \
       .option("hbase.table", "tblEmployee2") \
       .option("hbase.spark.use.hbasecontext", False) \
       .save()

同样,只需验证名为“ tblEmployee2”的新表具有这些新行。

代码语言:javascript
复制
scan ‘tblEmployee2’, {‘LIMIT’ => 2}

这就完成了我们有关如何通过PySpark将行插入到HBase表中的示例。在下一部分中,我将讨论“获取和扫描操作”,PySpark SQL和一些故障排除。在此之前,您应该获得一个CDP集群并按照这些示例进行操作。

原文作者:Manas Chakka

原文链接:https://blog.cloudera.com/building-a-machine-learning-application-with-cloudera-data-science-workbench-and-operational-database-part-1-the-set-up-basics/

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据杂货铺 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
  • 先决条件
  • 配置
  • 示例操作
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档