前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何使用CDSW在CDH中分布式运行所有R代码

如何使用CDSW在CDH中分布式运行所有R代码

作者头像
Fayson
修改2018-04-01 19:24:59
1.7K0
修改2018-04-01 19:24:59
举报
文章被收录于专栏:Hadoop实操Hadoop实操

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

本文将是Fayson最近写的R系列的最后一篇文章了,大家且看且珍惜吧。

无需额外花费过多的学习成本,sparklyr(https://spark.rstudio.com)可以让R用户很方便的利用Apache Spark的分布式计算能力。之前Fayson介绍了什么是sparklyr,大家知道R用户可以编写几乎相同的代码运行在Spark之上实现本地或者分布式计算。

spark_apply的架构 (来自 https://github.com/rstudio/sparklyr/pull/728)

从sparklyr0.6(https://blog.rstudio.com/2017/07/31/sparklyr-0-6/)开始,你就可以通过spark_apply()运行R代码在Spark集群之上。换句话说,你可以用R写UDF。这样可以让你用你最喜欢的R包来访问Spark里的数据,比如仅在R中实现的特定的统计分析方法,或者像NLP的高级分析,等等。

因为目前spark_apply()的实现需要在工作节点上也安装R环境,在这篇文章里,我们将介绍如何在CDH集群中运行spark_apply()。我们会介绍两种方法:1.使用Parcel。2)使用conda环境。

选项1:使用Parcel安装R环境


创建和分发R的Parcel


Parcel(https://www.cloudera.com/documentation/enterprise/latest/topics/cm_ig_parcels.html)是一种二进制的分发格式,Cloudera Manager可以使用Parcel来分发CDH,Spark2,Kafka和需要运行在集群上的服务。Parcel很像.deb或者.rpm包。它可以让你通过Cloudera Manager的界面很容易的在CDH集群上安装特定的服务。使用这种方式的前提是CDH集群是使用Parcel方式安装的。

你可以到https://dl.bintray.com/chezou/Parcels/下载预先创建好的Parcel包,它包含了https://github.com/conda/conda-recipes/blob/master/r-packages/r-essentials/meta.yaml里的一些基础组件。注意你不能将Bintray直接设置为远程的Parcel仓库地址,所以你需要提前将你的Parcel上传到HTTP服务器。

如果你想使用其他的R包,可以使用R的Parcel构建工具。你可以用Docker创建你自己的Parcels,通过修改Dockerfile。

https://github.com/chezou/cloudera-parcel

将这些Parcels放到HTTP服务器或者指定的S3 bucket。然后你就可以在Cloudera Manager中添加Parcel的仓库地址。更多细节请参考:https://www.cloudera.com/documentation/enterprise/latest/topics/cm_ig_parcels.html#cmug_topic_7_11_5

在Spark的工作节点上运行R代码


当分发完R的Parcel包以后,就可以在工作节点上运行R代码。

注意:因为存在环境变量配置的问题:https://github.com/rstudio/sparklyr/issues/915,所以目前只能使用sparklyr的upstreamversion。最新的sparklyr 0.6.1没有这个功能。

以下是一个分布式执行R代码的例子

https://github.com/chezou/sparklyr-distribute

devtools::install_github("rstudio/sparklyr") library(sparklyr) config <- spark_config() config[["spark.r.command"]]<- "/opt/cloudera/parcels/CONDAR/lib/conda-R/bin/Rscript" config$sparklyr.apply.env.R_HOME <- "/opt/cloudera/parcels/CONDAR/lib/conda-R/lib/R" config$sparklyr.apply.env.RHOME <- "/opt/cloudera/parcels/CONDAR/lib/conda-R" config$sparklyr.apply.env.R_SHARE_DIR<- "/opt/cloudera/parcels/CONDAR/lib/conda-R/lib/R/share" config$sparklyr.apply.env.R_INCLUDE_DIR<- "/opt/cloudera/parcels/CONDAR/lib/conda-R/lib/R/include" sc <- spark_connect(master = "yarn-client",config = config) sdf_len(sc,5, repartition= 1) %>% spark_apply(function(e) I(e))r ## # Source: table<sparklyr_tmp_1cc757d61b8>[?? x 1] ## # Database: spark_connection ## id ## <dbl> ## 1 1 ## 2 2 ## 3 3 ## 4 4 ## 5 5

如果想要在分布式函数中使用R的包,sparklyr将这些包打包放在了本地的.libPaths(),然后使用SparkContext.addFile()函数将这些包分发到工作节点。如果是在spark_apply()中使用这些包则依赖于本地的代码,当然也可以按照下一个章节要介绍的使用Conda来分发他们。

更多官方文档资料: https://spark.rstudio.com/articles/guides-distributed-r.html#distributing-packages

选项2:使用conda环境


使用R创建conda环境


http://blog.cloudera.com/blog/2017/04/use-your-favorite-python-library-on-pyspark-cluster-with-cloudera-data-science-workbench/说明过conda虚拟环境可以打包R环境。

创建一个R的conda环境,使用zip压缩。

代码语言:javascript
复制
$ conda create -p ~/r_env --copy -y -q r-essentials -c r
# [Option] If you need additional package you can install as follows:
# $ source activate r_env
# $ Rscript -e 'install.packages(c("awesome-package"), lib = /home/cdsw/r_env/lib/R/library, dependencies = TRUE, repos="https://cran.r-project.org")'
# $ source deactivate

与使用Parcel的差异是环境变量的设置,需要将r_env.zip设置为环境变量。虽然这种方式很灵活,但是需要每次创建Spark连接时都分发zip文件。全部代码请参考: https://github.com/chezou/sparklyr-distribute/blob/master/dist_sparklyr_conda.r

代码语言:javascript
复制
config <- spark_config()
config[["spark.r.command"]] <- "./r_env.zip/r_env/bin/Rscript"
config[["spark.yarn.dist.archives"]] <- "r_env.zip"
config$sparklyr.apply.env.R_HOME <- "./r_env.zip/r_env/lib/R"
config$sparklyr.apply.env.RHOME <- "./r_env.zip/r_env"
config$sparklyr.apply.env.R_SHARE_DIR <- "./r_env.zip/r_env/lib/R/share"
config$sparklyr.apply.env.R_INCLUDE_DIR <- "./r_env.zip/r_env/lib/R/include"

然后你就可以在Spark的工作节点上运行R代码。

复杂的例子:使用spacyr做文本分析


注意:本版本目前不支持在spark_apply()中使用本地代码的R包。

在这个例子中,我们使用spacyr package(https://github.com/kbenoit/spacyr),这个包R绑定了spaCy(https://spacy.io),一个新的Python的NLP库。我们提取named-entity(https://en.wikipedia.org/wiki/Named-entity_recognition),比如person, place, organization等等,来自于Jane Austen’sbooks(https://www.rdocumentation.org/packages/janeaustenr/versions/0.1.5/topics/austen_books)。所有代码请参考:

https://github.com/chezou/spacyr-sparklyr

为spacyr准备conda环境


因为spacyr需要Python运行环境,在运行下面例子之前你需要安装Anaconda的Parcel。

使用https://github.com/chezou/spacyr-sparklyr/blob/master/install_spacy.sh这个脚本安装conda环境

用spark_apply()抽取named entities


Spark DataFrame有text的column,我们可以用下面的UDF抽取named entities

代码语言:javascript
复制
entities <- austen_tbl %>%
  select(text) %>%
  spark_apply(
    function(e) 
    {
      lapply(e, function(k) {
          spacyr::spacy_initialize(python_executable="/opt/cloudera/parcels/Anaconda/bin/python")
          parsedtxt <- spacyr::spacy_parse(as.character(k), lemma = FALSE)
          spacyr::entity_extract(parsedtxt)
        }
      )
    },
    names = c("doc_id", "sentence_id", "entity", "entity_type"),
    packages = FALSE)
 
entities %>% head(10) %>% collect()
## # A tibble: 10 x 4
##    doc_id sentence_id         entity entity_type
##     <chr>       <int>          <chr>       <chr>
##  1  text1           1    Jane Austen      PERSON
##  2  text1           1       Dashwood         ORG
##  3  text1           1         Sussex         GPE
##  4  text1           2   Norland Park         GPE
##  5  text1           4 Henry Dashwood      PERSON
##  6  text1           4        Norland         GPE
##  7  text1           5      Gentleman        NORP
##  8  text1           7 Henry Dashwood      PERSON
##  9  text1           8 Henry Dashwood      PERSON
## 10  text1          11        Norland         GPE

划分entity类型的数量

代码语言:javascript
复制
library(ggplot2)
p <- entities %>%
  collect() %>% 
  ggplot(aes(x=factor(entity_type)))
p <- p + scale_y_log10()
p + geom_bar()

前十名看每本书的人

代码语言:javascript
复制
persons <- entities %>% 
  filter(entity_type == "PERSON") %>%
  group_by(doc_id, entity) %>%
  select(doc_id, entity) %>%
  count() %>%
  arrange(doc_id, desc(n))
persons %>% 
  filter(doc_id == "text1") %>%
  head(10) %>%
  collect()
## # A tibble: 10 x 3
## # Groups:   doc_id, entity [10]
##    doc_id         entity     n
##     <chr>          <chr> <dbl>
##  1  text1         Elinor   662
##  2  text1       Marianne   538
##  3  text1         Edward   235
##  4  text1       Jennings   233
##  5  text1     Willoughby   200
##  6  text1           Lucy   172
##  7  text1       Dashwood   159
##  8  text1        Ferrars   104
##  9  text1 Lady Middleton    80
## 10  text1         Palmer    74

我应该使用哪个选项?


一般来说,建议选择选项1,因为你不需要每次分发R的环境,而且构建包含所有包的Parcel节约了很多时间,而不用纠结于某一个包。目前,RStudio有OS(https://spark.rstudio.com/articles/guides-distributed-r.html#requirements)的限制,但你可以设置packages= FALSE来绕过因为OS不同的问题。

另外,选项2可以灵活的后面再安装包,但用起来会麻烦一些。如果只是仅仅使用R包,这个方法也行,但当你还想使用rJava等源生的扩展包的时候,则比较难准备好环境。

总结


本文主要是介绍了如何使用sparklyr在Spark工作节点上运行和分发R代码。因为spark_apply()方法需要在工作节点上安装R,我们介绍了两种方法可以让你在CDH集群和CDSW上运行spark_apply()。你可以根据你想要的进行选择。如果需要稳定,可以选择选项1:Parcel的方法。如果需要灵活,则可以选择选项2:conda环境。

不仅只是执行dplyr,同时你可以分发你本地的R代码到Spark集群。这样可以让你将你的R技能充分应用到分布式计算框架上。

参考


https://blog.cloudera.com/blog/2017/09/how-to-distribute-your-r-code-with-sparklyr-and-cdsw/

醉酒鞭名马,少年多浮夸! 岭南浣溪沙,呕吐酒肆下!挚友不肯放,数据玩的花! 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。


推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-10-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Hadoop实操 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
专用宿主机
专用宿主机(CVM Dedicated Host,CDH)提供用户独享的物理服务器资源,满足您资源独享、资源物理隔离、安全、合规需求。专用宿主机搭载了腾讯云虚拟化系统,购买之后,您可在其上灵活创建、管理多个自定义规格的云服务器实例,自主规划物理资源的使用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档