如何使用CDSW在CDH中分布式运行所有R代码

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

本文将是Fayson最近写的R系列的最后一篇文章了,大家且看且珍惜吧。

无需额外花费过多的学习成本,sparklyr(https://spark.rstudio.com)可以让R用户很方便的利用Apache Spark的分布式计算能力。之前Fayson介绍了什么是sparklyr,大家知道R用户可以编写几乎相同的代码运行在Spark之上实现本地或者分布式计算。

spark_apply的架构 (来自 https://github.com/rstudio/sparklyr/pull/728)

从sparklyr0.6(https://blog.rstudio.com/2017/07/31/sparklyr-0-6/)开始,你就可以通过spark_apply()运行R代码在Spark集群之上。换句话说,你可以用R写UDF。这样可以让你用你最喜欢的R包来访问Spark里的数据,比如仅在R中实现的特定的统计分析方法,或者像NLP的高级分析,等等。

因为目前spark_apply()的实现需要在工作节点上也安装R环境,在这篇文章里,我们将介绍如何在CDH集群中运行spark_apply()。我们会介绍两种方法:1.使用Parcel。2)使用conda环境。

选项1:使用Parcel安装R环境


创建和分发R的Parcel


Parcel(https://www.cloudera.com/documentation/enterprise/latest/topics/cm_ig_parcels.html)是一种二进制的分发格式,Cloudera Manager可以使用Parcel来分发CDH,Spark2,Kafka和需要运行在集群上的服务。Parcel很像.deb或者.rpm包。它可以让你通过Cloudera Manager的界面很容易的在CDH集群上安装特定的服务。使用这种方式的前提是CDH集群是使用Parcel方式安装的。

你可以到https://dl.bintray.com/chezou/Parcels/下载预先创建好的Parcel包,它包含了https://github.com/conda/conda-recipes/blob/master/r-packages/r-essentials/meta.yaml里的一些基础组件。注意你不能将Bintray直接设置为远程的Parcel仓库地址,所以你需要提前将你的Parcel上传到HTTP服务器。

如果你想使用其他的R包,可以使用R的Parcel构建工具。你可以用Docker创建你自己的Parcels,通过修改Dockerfile。

https://github.com/chezou/cloudera-parcel

将这些Parcels放到HTTP服务器或者指定的S3 bucket。然后你就可以在Cloudera Manager中添加Parcel的仓库地址。更多细节请参考:https://www.cloudera.com/documentation/enterprise/latest/topics/cm_ig_parcels.html#cmug_topic_7_11_5

在Spark的工作节点上运行R代码


当分发完R的Parcel包以后,就可以在工作节点上运行R代码。

注意:因为存在环境变量配置的问题:https://github.com/rstudio/sparklyr/issues/915,所以目前只能使用sparklyr的upstreamversion。最新的sparklyr 0.6.1没有这个功能。

以下是一个分布式执行R代码的例子

https://github.com/chezou/sparklyr-distribute

devtools::install_github("rstudio/sparklyr") library(sparklyr) config <- spark_config() config[["spark.r.command"]]<- "/opt/cloudera/parcels/CONDAR/lib/conda-R/bin/Rscript" config$sparklyr.apply.env.R_HOME <- "/opt/cloudera/parcels/CONDAR/lib/conda-R/lib/R" config$sparklyr.apply.env.RHOME <- "/opt/cloudera/parcels/CONDAR/lib/conda-R" config$sparklyr.apply.env.R_SHARE_DIR<- "/opt/cloudera/parcels/CONDAR/lib/conda-R/lib/R/share" config$sparklyr.apply.env.R_INCLUDE_DIR<- "/opt/cloudera/parcels/CONDAR/lib/conda-R/lib/R/include" sc <- spark_connect(master = "yarn-client",config = config) sdf_len(sc,5, repartition= 1) %>% spark_apply(function(e) I(e))r ## # Source: table<sparklyr_tmp_1cc757d61b8>[?? x 1] ## # Database: spark_connection ## id ## <dbl> ## 1 1 ## 2 2 ## 3 3 ## 4 4 ## 5 5

如果想要在分布式函数中使用R的包,sparklyr将这些包打包放在了本地的.libPaths(),然后使用SparkContext.addFile()函数将这些包分发到工作节点。如果是在spark_apply()中使用这些包则依赖于本地的代码,当然也可以按照下一个章节要介绍的使用Conda来分发他们。

更多官方文档资料: https://spark.rstudio.com/articles/guides-distributed-r.html#distributing-packages

选项2:使用conda环境


使用R创建conda环境


http://blog.cloudera.com/blog/2017/04/use-your-favorite-python-library-on-pyspark-cluster-with-cloudera-data-science-workbench/说明过conda虚拟环境可以打包R环境。

创建一个R的conda环境,使用zip压缩。

$ conda create -p ~/r_env --copy -y -q r-essentials -c r
# [Option] If you need additional package you can install as follows:
# $ source activate r_env
# $ Rscript -e 'install.packages(c("awesome-package"), lib = /home/cdsw/r_env/lib/R/library, dependencies = TRUE, repos="https://cran.r-project.org")'
# $ source deactivate

与使用Parcel的差异是环境变量的设置,需要将r_env.zip设置为环境变量。虽然这种方式很灵活,但是需要每次创建Spark连接时都分发zip文件。全部代码请参考: https://github.com/chezou/sparklyr-distribute/blob/master/dist_sparklyr_conda.r

config <- spark_config()
config[["spark.r.command"]] <- "./r_env.zip/r_env/bin/Rscript"
config[["spark.yarn.dist.archives"]] <- "r_env.zip"
config$sparklyr.apply.env.R_HOME <- "./r_env.zip/r_env/lib/R"
config$sparklyr.apply.env.RHOME <- "./r_env.zip/r_env"
config$sparklyr.apply.env.R_SHARE_DIR <- "./r_env.zip/r_env/lib/R/share"
config$sparklyr.apply.env.R_INCLUDE_DIR <- "./r_env.zip/r_env/lib/R/include"

然后你就可以在Spark的工作节点上运行R代码。

复杂的例子:使用spacyr做文本分析


注意:本版本目前不支持在spark_apply()中使用本地代码的R包。

在这个例子中,我们使用spacyr package(https://github.com/kbenoit/spacyr),这个包R绑定了spaCy(https://spacy.io),一个新的Python的NLP库。我们提取named-entity(https://en.wikipedia.org/wiki/Named-entity_recognition),比如person, place, organization等等,来自于Jane Austen’sbooks(https://www.rdocumentation.org/packages/janeaustenr/versions/0.1.5/topics/austen_books)。所有代码请参考:

https://github.com/chezou/spacyr-sparklyr

为spacyr准备conda环境


因为spacyr需要Python运行环境,在运行下面例子之前你需要安装Anaconda的Parcel。

使用https://github.com/chezou/spacyr-sparklyr/blob/master/install_spacy.sh这个脚本安装conda环境

用spark_apply()抽取named entities


Spark DataFrame有text的column,我们可以用下面的UDF抽取named entities

entities <- austen_tbl %>%
  select(text) %>%
  spark_apply(
    function(e) 
    {
      lapply(e, function(k) {
          spacyr::spacy_initialize(python_executable="/opt/cloudera/parcels/Anaconda/bin/python")
          parsedtxt <- spacyr::spacy_parse(as.character(k), lemma = FALSE)
          spacyr::entity_extract(parsedtxt)
        }
      )
    },
    names = c("doc_id", "sentence_id", "entity", "entity_type"),
    packages = FALSE)
 
entities %>% head(10) %>% collect()
## # A tibble: 10 x 4
##    doc_id sentence_id         entity entity_type
##     <chr>       <int>          <chr>       <chr>
##  1  text1           1    Jane Austen      PERSON
##  2  text1           1       Dashwood         ORG
##  3  text1           1         Sussex         GPE
##  4  text1           2   Norland Park         GPE
##  5  text1           4 Henry Dashwood      PERSON
##  6  text1           4        Norland         GPE
##  7  text1           5      Gentleman        NORP
##  8  text1           7 Henry Dashwood      PERSON
##  9  text1           8 Henry Dashwood      PERSON
## 10  text1          11        Norland         GPE

划分entity类型的数量

library(ggplot2)
p <- entities %>%
  collect() %>% 
  ggplot(aes(x=factor(entity_type)))
p <- p + scale_y_log10()
p + geom_bar()

前十名看每本书的人

persons <- entities %>% 
  filter(entity_type == "PERSON") %>%
  group_by(doc_id, entity) %>%
  select(doc_id, entity) %>%
  count() %>%
  arrange(doc_id, desc(n))
persons %>% 
  filter(doc_id == "text1") %>%
  head(10) %>%
  collect()
## # A tibble: 10 x 3
## # Groups:   doc_id, entity [10]
##    doc_id         entity     n
##     <chr>          <chr> <dbl>
##  1  text1         Elinor   662
##  2  text1       Marianne   538
##  3  text1         Edward   235
##  4  text1       Jennings   233
##  5  text1     Willoughby   200
##  6  text1           Lucy   172
##  7  text1       Dashwood   159
##  8  text1        Ferrars   104
##  9  text1 Lady Middleton    80
## 10  text1         Palmer    74

我应该使用哪个选项?


一般来说,建议选择选项1,因为你不需要每次分发R的环境,而且构建包含所有包的Parcel节约了很多时间,而不用纠结于某一个包。目前,RStudio有OS(https://spark.rstudio.com/articles/guides-distributed-r.html#requirements)的限制,但你可以设置packages= FALSE来绕过因为OS不同的问题。

另外,选项2可以灵活的后面再安装包,但用起来会麻烦一些。如果只是仅仅使用R包,这个方法也行,但当你还想使用rJava等源生的扩展包的时候,则比较难准备好环境。

总结


本文主要是介绍了如何使用sparklyr在Spark工作节点上运行和分发R代码。因为spark_apply()方法需要在工作节点上安装R,我们介绍了两种方法可以让你在CDH集群和CDSW上运行spark_apply()。你可以根据你想要的进行选择。如果需要稳定,可以选择选项1:Parcel的方法。如果需要灵活,则可以选择选项2:conda环境。

不仅只是执行dplyr,同时你可以分发你本地的R代码到Spark集群。这样可以让你将你的R技能充分应用到分布式计算框架上。

参考


https://blog.cloudera.com/blog/2017/09/how-to-distribute-your-r-code-with-sparklyr-and-cdsw/

醉酒鞭名马,少年多浮夸! 岭南浣溪沙,呕吐酒肆下!挚友不肯放,数据玩的花! 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。


推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

原文发布于微信公众号 - Hadoop实操(gh_c4c535955d0f)

原文发表时间:2017-10-15

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏牛肉圆粉不加葱

YARN资源调度器

1666
来自专栏Hadoop实操

如何使用Oozie API接口向非Kerberos环境的CDH集群提交Shell工作流

前面Fayson介绍了《如何使用Oozie API接口向非Kerberos环境的CDH集群提交Spark作业》和《如何使用Oozie API接口向非Kerber...

3407
来自专栏闵开慧

hadoop源码解析1 - hadoop中各工程包依赖关系

1 hadoop中各工程包依赖简述     Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。     G...

3285
来自专栏Hadoop实操

如何使用Oozie API接口向Kerberos集群提交Java程序

在CDH集群外的节点向集群提交MapReduce作业的方式有多种,前面Fayson介绍了《如何跨平台在本地开发环境提交MapReduce作业到CDH集群》和《如...

1.2K7
来自专栏Hadoop实操

如何给Hadoop集群划分角色

Fayson在之前的文章中介绍过《CDH网络要求(Lenovo参考架构)》,《如何为Hadoop集群选择正确的硬件》和《CDH安装前置准备》,而我们在搭建Had...

5477
来自专栏星回的实验室

打造自己的MapReduce[一]:Hadoop集群搭建

那就是我在这里的每一篇文开头都必然是:最近工作好忙,又断更很久了……Anyway,这也不能成为偷懒的理由。我可能对记录技术有些固执的误解,总认为是要待到整理出一...

1111
来自专栏Ryan Miao

idea 自动提示生成 serialVersionUID

from: http://tonycody.blog.51cto.com/8421818/1401422 Intellij IDEA 默认没启用这个功能。 Se...

2748
来自专栏Hadoop实操

如何使用Oozie API接口向Kerberos环境的CDH集群提交Shell作业

前面Fayson介绍了使用Oozie API向Kerberos和非Kerberos集群提交Spark和Java作业,本篇文章主要介绍如何使用Oozie Clie...

3956
来自专栏闵开慧

spark出现GC overhead limit exceeded和java heap space

    spark执行任务时出现java.lang.OutOfMemoryError: GC overhead limit exceeded和java.lang...

5109
来自专栏美图数据技术团队

Spark Streaming VS Flink

本文从编程模型、任务调度、时间机制、Kafka 动态分区的感知、容错及处理语义、背压等几个方面对比 Spark Stream 与 Flink,希望对有实时处理...

1621

扫码关注云+社区

领取腾讯云代金券