前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hadoop/R 集成 I:流处理

Hadoop/R 集成 I:流处理

作者头像
大数据弄潮儿
发布2018-05-28 10:51:37
6700
发布2018-05-28 10:51:37
举报
文章被收录于专栏:大数据大数据

Hadoop/R Integration I: Streaming

原文作者:Wayne Adams

原文地址:https://dzone.com/articles/hadoopr-integration-i

译者微博:@从流域到海域

译者博客:blog.csd.net/solo95

Hadoop/R 集成 I:流处理

如果您平常一直使用MapReduce框架,那么您可能知道"单词计数示例"是MapReduce的相当于“Hello World!”的一个例子。在之前的帖子中,我试图稍作改动,但现在也有一个同样简单的问题 - 按州来计算,计算房利美(Fannie Mae)地产公司所募集的按揭证券的美元总价的新问题。

到目前为止,我已经使用了“直接的”Java和Pig,现在我将注意力转向R(语言).在这篇文章的例子完成之后,我们将讨论在该情况下R语言的独特之处,以及为什么字数统计类型的例子不会“真的做正义。在此之前,我会提及我在这里使用的主要参考资料是Joseph Adler的R in a Nutshell(请参阅第26章)和Tom White的Hadoop:权威指南(第2章)。

有很多方法可以将R语言与Hadoop结合使用,其中包括:

  • Hadoop流媒体,这篇文章的主角
  • RHadoop,R/Hadoop的集成(请参阅RHadoop Wiki),这是将在未来发布的文章的主角。
  • RHIPE(发音为hree- pay),另一个R/Hadoop的集成。

由于我在本博客中试图涵盖的主题十分广泛,因此我将限制自己使用流式传输和RHadoop。

概览

在Hadoop流中,您的mapper,reducer和可选的组合器进程(combiner processes)被写入从标准输入读取并写入标准输出。一旦流程脚本和数据准备就绪,您只需使用带有一些命令行属性的流式二进制文件就能调用Hadoop。

正如在之前的文章中一样,我将从房利美的新问题统计库(NIPS,即New Issue Pool Statistics)文件中获取数据。想要了解更多信息,请参阅上 一篇文章。我将使用与该文章中相同的数据,因此我们可以期待结果能够与前面精确匹配。

The Mapper

NIPS文件有一点复杂,因为它们包含许多不同格式的记录(在 这里查看所有格式)。我们将查看记录类型9的数据来进行我们的分析,即“GEOGRAPHIC DISTRIBUTION(地理分布)”。我们感兴趣的是第3栏(州名)和第6栏(总的未付余额)。由于在单个文件中混合了多种记录格式,因此我们首先在管道定界符上将文件分割并丢弃非9类记录。我们需要做的就是输出状态名称和累加未付余额,每个类型9行包含了1个实例。

我使用RStudio来编写R脚本,这是一个我通过Coursera上的Roger PengComputing for Data Analysis课程了解到的IDE 。在RStudio中进行过交互式脚本构建会话后,我制作了以下测试脚本:

代码语言:txt
复制
#! /usr/bin/env Rscript
conn <- file("/home/hduser/fannie-mae-nips/nips_12262012.txt", open="r")
while (length(next.line <- readLines(conn, n=1)) > 0) {
split.line <- strsplit(next.line, "\\|")
if (as.numeric(split.line[[1]][2]) == 9) {
write(paste(split.line[[1]][3],
gsub("[$,]", "", split.line[[1]][6]), sep="\t"), stdout())
}
}
close(conn)

然后我从shell中调用并得到以下输出,截取了一小段:

代码语言:txt
复制
CALIFORNIA 167300.00
FLORIDA 395950.00
GEORGIA 69500.00
ILLINOIS 235200.00
MICHIGAN 781950.00
NEW JERSEY 284550.00
OHIO 334175.00

由于这看起来很干净,我稍微修改了mapper以生成最终版本:

代码语言:txt
复制
#! /usr/bin/env Rscript
conn <- file("stdin", open="r")
while (length(next.line <- readLines(conn, n=1)) > 0) {
split.line <- strsplit(next.line, "\\|")
if (as.numeric(split.line[[1]][2]) == 9) {
write(paste(split.line[[1]][3],
gsub("[$,]", "", split.line[[1]][6]), sep="\t"), stdout())
}
}
close(conn)

请注意获取strsplit的结果的下标:strsplit返回一个列表,因此文件记录的第2个字段实际上是列表中第一个元素的元素2,它是解析字段的向量。如果您需要这个结果的详细说明,请参阅Phil Spector的Data Manipulation with R(使用R的数据操作)的“Subscripting(下标)”一章 _。另外请注意,gsubto的紧凑型使用可从汇总未付余额中删除美元符号和逗号。

The Reducer

我们的reducer也将从stdin中读取数据,其中Hadoop运行环境保证了以下内容:

  • 如果reducer遇到一个关键字,那么就reducer知道带有该关键字的所有记录都被发送到了该reducer,所以它可以产生一个输出,并知道它已经被赋予了该关键字代表的所有记录;
  • 陆续传入的记录会按键排序,因此reducer知道,当某个键发生更改时,未更改之前键的所有记录都已在流中遇到过。

在我们的reducer中,有两个变量:一个用于追踪哪个键正在被处理,另一个用于保存来自给定状态的抵押贷款的总的未支付余额。一旦某个键发生变化,我们将(使用其键)输出当前的running total出并重置running balance(解释看代码,running total和runing balance都是作者自己起的名字,并没有特殊含义):

代码语言:txt
复制
#! /usr/bin/env Rscript
current.key <- NA
current.upb <- 0.0
conn <- file("stdin", open="r")
while (length(next.line <- readLines(conn, n=1)) > 0) {
split.line <- strsplit(next.line, "\t")
key <- split.line[[1]][1]
upb <- as.numeric(split.line[[1]][2])
if (is.na(current.key)) {
current.key <- key
current.upb <- upb
}
else {
if (current.key == key) {
current.upb <- current.upb + upb
}
else {
write(paste(current.key, current.upb, sep="\t"), stdout())
current.key <- key
current.upb <- upb
}
}
}
write(paste(current.key, current.upb, sep="\t"), stdout())
close(conn)

现在,如果我想在单个文件上测试这个reducer,但是我遇到了一个小问题 - 我的mapper没有对输出进行排序(因为按常理来说不需要),但是我的reducer希望数据是按键排序的。我可以等着看最后的数字是怎么出来的,但由于流式传输只涉及stdin输出到标准输入,我有点好奇这个任务在Hadoop之外运行的速度可以有多快(我没有真正去比较,针对简单的单节点集群; 我只是好奇)。而且我还在学习R,所以接下来我编写了一个脚本来按记录键对行进行排序:

代码语言:txt
复制
#! /usr/bin/env Rscript
conn <- file("stdin", open="r")
all.lines <- readLines(conn)
write(sort(all.lines), stdout())
close(conn)

在这种情况下,我能明确回忆起我为什么这么喜欢R语言(即你就能明白R的强大之处)!接下来,我将使用mapper的“测试”版来处理单个文件:

代码语言:txt
复制
./map.test.R | ./map.output.sorter.R | ./reduce.R

并为单个NIPS文件获得如下输出(缩写):

代码语言:txt
复制
ALABAMA 72699735.21
ALASKA 6883209.62
ARIZONA 287482321.1
ARKANSAS 21579003.98
CALIFORNIA 1811342276.77
...
VIRGIN ISLANDS 1021750
WASHINGTON 239648997.97
WEST VIRGINIA 9925894.94
WISCONSIN 72752945.87
WYOMING 6232557.56

使用R在Hadoop中进行流式传输

现在我们有了一个mapper和一个reducer,我们可以在Hadoop中处理整个数据集。我将处理与我之前的Hadoop-Java-Pig那个帖子中相同的数据集,即2012年8月23日至12月26日的NIPS数据。正如在那篇文章中所展示的,我以伪分布模式运行Hadoop,使用来自HDFS的数据。当然,这里的区别在于我指定了使用流式处理,并提供了我的mapper和Reducer R脚本。我从Hadoop主目录启动:

代码语言:txt
复制
bin / hadoop jar $ HADOOP\_PREFIX / contrib / streaming / hadoop-streaming-1.1.0.jar -input / user / hduser / fannie-mae-nips -output / user / hduser / fannie-mae-nips -r-output -mapper /home/hduser/RScripts/map.R -reducer /home/hduser/RScripts/reduce.R

那么,我的努力取得了什么结果?从HDFS复制出我的结果文件:

代码语言:txt
复制
bin / hadoop dfs -copyToLocal / user / hduser / fannie-mae-nips -r-output / part-00000 rResults.txt

产生以下输出(这里是缩写):

代码语言:txt
复制
ALABAMA 3242681838.89999
ALASKA 841797447.200001
ARIZONA 9340767235.06001
ARKANSAS 1452136751.9
CALIFORNIA 89114642822.0799
...
VERMONT 553060435.67
VIRGIN ISLANDS 33604327.46
VIRGINIA 12706719836.48
WASHINGTON 13194248475.54
WEST VIRGINIA 486889587.57
WISCONSIN 8140391871.79
WYOMING 720905726.84

我仍然保存着使用Java和Pig在这个相同的数据集上的输出; 仔细阅读此输出将凸显出以下输出(请注意,由于数字以不同的格式输出,因此我没有区分这些文件):

代码语言:txt
复制
ALABAMA 3.242681838899994E9
ALASKA 8.417974472000003E8
ARIZONA 9.340767235060005E9
ARKANSAS 1.452136751900001E9
CALIFORNIA 8.91146428220799E10
....
VERMONT 5.530604356700001E8
VIRGIN ISLANDS 3.360432746000001E7
VIRGINIA 1.2706719836479996E10
WASHINGTON 1.319424847554002E10
WEST VIRGINIA 4.868895875700002E8
WISCONSIN 8.140391871790002E9
WYOMING 7.209057268400007E8

因此,我成功地使用R和Hadoop流处理复制了使用Java和Pig进行的示例。

关于Hadoop和R的最终评论

如果你完全熟悉R,你就会明白R并不是一种你为了分割输出和数字求和而选择的语言; 该语言及其库包含丰富的功能。这篇文章的重点主要是过一遍R与Hadoop流处理的机械式细节(即使用R与流处理的固定步骤)。R真正发光的地方在于,如果是一些“繁重的工作”,R很容易就能将其分解为Mapper风格和Reducer风格的任务。例如,如果您正针对庞大的数据集进行线性回归操作,使用了大量的变量,或者如果您正在对大型数据集执行Shapiro-Wilk测试,则可以将作业分解为并行任务,最后将它们与Reducer相结合,这将成为Hadoop/R协同工作的一个很好的例子。有关R中的并行计算的更多信息,请查阅 R in a Nutshell,特别是他在本章最后的注明的“在哪里了解更多”部分。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Hadoop/R Integration I: Streaming
  • Hadoop/R 集成 I:流处理
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档