Hadoop专业解决方案-第5章 开发可靠的MapReduce应用

本章主要内容:

1、利用MRUnit创建MapReduce的单元测试。

2、MapReduce应用的本地实例。

3、理解MapReduce的调试。

4、利用MapReduce防御式程序设计。

在WOX.COM下载本章源代码

本章在wox.com网站的源码可以在www.wiley.com/go/prohadoopsolutions的源码下载标签找到。第五章的源码根据本章的内容各自分别命名放在了第五章下载目录中。

到目前为止,你应该对MapReduce体系结构,应用程序设计,和定制MapReduce扩展程序很熟悉了。本章讨论如何对测试程序通过有影响力(leveraging)的单元测试和基于Hadoop的设备创建可靠的MapReduce代码。你也可以从中学到不同的防御式编程技术来处理部分被破坏的(corrupted)数据的方法。

MapReduce应用的单元测试

缺陷(Bugs)总在代码中存在,这是一个不争的事实——你写的代码越多,你也会遇到越多的缺陷(Bugs)。即使是最伟大的程序员也极少写没有缺陷(bug)的代码。这就是为什么测试成为了代码开发中完整中的(integral)一部分,因此许多开发人员越来越倾向于测试性驱动的开发(test-driven development,TDD)。

注:这里所讨论的MRUnit和它在MapReduce工作的单元测试中的使用是由诺基亚的同事Michael Spicuzza富有创造性地实现的。

测试性驱动的程序开发(TEST-DRIVEN DEVELOPMENT

测试性驱动程序开发是一种基于开发实际代码的同时编写自动化测试代码的编程技术。这保证了你即时的测试你的代码,并保证你快速而容易地重复测试你的代码,因为这个过程是自动的。

测试性驱动程序开发涉及到了如下简短、重复的开发流程:

1、在写任何代码之前,你都要首先写它的自动化测试代码。当你写自动化测试代码的时候,你需要考虑到他所有可能的输入,错误,以及输出。这样,你需要在实际编写代码前设计你的代码习惯

2、第一次运行你的测试代码,这个测试应该是失败的——表明代码还没有准备好。

3、然后,你应该开始编程。因为这里已经有测试代码了,只要这个代码仍旧失败,那么就意味着还没有准备好。这个代码可以一直修复直到它通过所有的断言为止。

4、一旦这个代码通过了这个测试,然后通过重构,你可以将它清除干净。只要这个代码仍旧能通过测试,它意味着它仍旧能工作。你不必当心改变会引起新的缺陷(bugs)。

5、在其他的方法和程序中重新开始整个事情。

测试性驱动编程的一个基石是单元测试。尽管通过利用足够多的模拟类,利用JUnit可以从技术上测试大量的MapReduce应用程序,这里仍有一个可以选择的方法能够提供可以增加的覆盖水平。MRUnit是专门为Hadoop服务的测试框架。它是从包含对Hadoop的Cloudera的分布式开源实现开始的,它现在是Apache的一个子项目。MRUnit是基于Junit的,并允许单元测试成为映射式(mappers),简化式(reducers),和其他mapper-reducer交叉的交叉性测试,伴随着合并,客户计数,和拆分。

如第三章的解释,Eclipse提供了MapReduce的一个非常好的开发工具。在这里的讨论中,你能学会如何创建包含了MapReduce程序依赖的所有需求的pom.xml文件。Eclipse也提供了基于MRUnit的单元测试平台。

为了利用MRUnit,你应该继承在第三章中增加的MRUnit依赖的标准MapReudce Maven中的pom文件,如清单5-1所示:

注:MRUnit的jar文件,和,所以,Maven的依赖项,有如下两个版本:

mrunit-0.9.0-incubating-hadoop1.jar 是Hadoop的MapReduce的第一个版本,而mrunit-0.9.0-incubating-hadoop2.jar是工作在Hadoop的MapReudce新的版本下的。这个新版本是指从Cloudere的CDH 4而来的hadoop-2.0版本。

       有了这个工具,你可以实现包含MapReduce应用的主要元素的单元测试。第三章的单词计数的实例在这里用作测试的实例。这意味着这个实例中的mapper和reducer会作为参数传递给测试程序。

测试Mappers

使用MRUnit测试mappers非常直接,代码清单5-2非常明显的展示了其特性。

注:MRUnit支持老的(从mapred包而来)和新的(从mapreduce包而来)MapReduce APIs。当测试你的代码时,注意保证应用合适的MapDriver对象的实例。相同的ReduceDriver和MapReduceDriver对象,在以后的章节中描述。

写一个基于MRUnit的测试单元非常简单。这种简单的标志性的加强是基于流利的API风格的。为了写你的测试程序,你需要做如下的事情:

1.在测试map程序中安装MapDriver类作为map确切的参数。

2.通过使用withMapper调用来增加一个你正在测试的实例。第三章单词计数的程序的mapper程序应用在了这里。

3. 你可以使用一个可选的withConfiguration方法,将所需的配置传递给mapper。

4. 该withInput调用,你可以通过所需的键和值 - 在这种情况下,用一个任意值,并且包含一个长文本对象如“猫,猫,狗“

5. 使用withOutput调用中得到预计输出。在这个例子中,是“猫”,“猫”和“狗”三个文本对象与其出现的相应i的intWritable值 – 所有的值都为1。

6. 如果一个mapper递增计数器可选。 withCounter (组名称, expectedValue ) (清单5-2中没有显示),使你能够指定计数器的期望值。

7.最后一次调用,runtest,反馈进入mapper中指定的输入值,比较实际的输出和通过withoutput方法得到的期望输出值。

对于MapDriver类的缺陷是对于每个侧是你最后只能有单个的输入和输出。如果你想,你可以调用withInput和withOutput多次,因此,你在任何时间只是测试一组输入/输出。为了指定多个输入,你必须使用MapReducerDriver对象(将在这章稍后介绍)。

Reducers 测试

       测试Reducer和测试Mapper是一样的。可以参看清单5-3:

清单5-3

@Test

public void testReducer() throws Exception {

List<IntWritable> values = new ArrayList<IntWritable>();

values.add(new IntWritable(1));

values.add(new IntWritable(1));

new ReduceDriver<Text, IntWritable, Text, IntWritable>()

.withReducer(new WordCount.Reduce())

.withConfiguration(new Configuration())

.withInput(new Text("cat"), values)

.withOutput(new Text("cat"), new IntWritable(2))

.runTest();

}

以下是这段代码详细的解释:

  1. reducer被创建的时候一个IntWritable对象列表作为输入对象。
  2. 一个ReducerDriver需要被实例化。如同MapprtDriver一样,这也真是作为参数在reducer中测试。
  3. 你想要的一个reducer的实例是通过withReducer调用的。这个在第三章的word count实例中使用的实例将在这里使用。
  4. 一个可选的withconfiguration方法
  5. WithInput调用允许你传递输入之值给reducer。这里,你传递“cat”健和在一开始通过intwritable创建的列表。
  6. 你可以通过withOutput指定希望的输出结果。这里,你指定相同的健“cat”和intwritable代替单词“cat”的个数。
  7. 如果一个reducer是一个递增的计数器,一个可选的计数组合(组,名,期待值)(在5-3清单中未列出)可以让你指定希望得到的计数值。
  8. 最后,你调用runtest,其中反馈了reducer的指定输出,并和期望输出作对比。

Reducerdricver和mapperDriver存在相同的限制,不能接受超过一个的输入/输出对。

到目前为止,这一章节已经向你展示了如何分开测试mapper和reducer的方法,但是,也可能需要一起对它们进行交叉测试。你可以利用MapReduceDriver类来实现。Mapreducedriver类也被用来测试联合使用的问题。

交叉测试

MRUnit提供了Mapreducerdriver类来让你测试mapper和reducer共同使用的情况。MapReducerDriver类不同于MapperDriver和ReducerDriver类被参数化。首先,你参数化mapper类的输入和输出类型,然后是Reducer的输入和输出类型。因为mapper的输出类型通常是和reducer的输入类型相互匹配的,你最终得到三对参数对。补充一下,你可以提供多组的输入和指定多组的期望输出。清单5-4列出了一些实例代码:

清单5-4 一起测试mapper和reducer

@Test

public void testMapReduce() throws Exception {

new MapReduceDriver<LongWritable, Text, Text, IntWritable, Text,

IntWritable>()

.withMapper(new WordCount.Map())

.withReducer(new WordCount.Reduce())

.withConfiguration(new Configuration())

.withInput(new LongWritable(1), new Text("dog cat dog"))

.withInput(new LongWritable(2), new Text("cat mouse"))

.withOutput(new Text("cat"), new IntWritable(2))

.withOutput(new Text("dog"), new IntWritable(2))

.withOutput(new Text("mouse"), new IntWritable(1))

.runTest();

}

正如你在上面的代码中所看到的,这个安装程序和MapDriver/ReduceDriver用到的类是很相似的。你传递实例实例到mapper和reducer中。(第三章涉及到的单词计数的实例在这里也用到了。)可选的是,你可以利用withConfiguration和withCombiner来测试配置和需要的合并。

MapReduceDrive类让你能够传递多个不同的键值。这里,你传递两个记录——第一个含有一个LonWritable的随意值和以恶文本对象包含一行“dog cat dog”,第二个LongWritable对象包含一个任意值和一个文本对象包含一行“cat mouse”。

你也可以利用withoutput方法指定一个期望的输出。这里,你指定三个关键值——“cat”,“dog”,”mouse”——伴随着一致的计数2,2,和1.最后,如果mapper/reducer 是一个递增的计数器,一个可选的是,withCounter(group,name,experctedValue)(在清单5-4中没有列出来)可以让你指定这个计数器期望的值。

如果一个测试失败了,MRUnit会产生一个和清当5-5相类似的指定输出,告诉你出现了什么错误。

清单5-5:MRUnit不成功时的输出结果

13/01/05 09:56:30 ERROR mrunit.TestDriver: Missing expected output (mouse, 2)

at position 2.

13/01/05 09:56:30 ERROR mrunit.TestDriver: Received unexpected output (mouse,

1)   at position 2.

如果测试结果是成功的,你会活得一个小小的自信,mapper和reducer协同工作是成功的。

尽管MRUnit使mapper和reducer代码的单元测试变得简单了,在这里涉及的mapper和reducer实例是比较简单的。如果你的map和/或者reduce代码开始变得很复杂,从Hadoop框架获得支持分开处理,并单独测试业务逻辑是一个好的设计方法(也就是说,需要应用程序定制)。就像在交叉测试中使用MapReduceDriver一样,在你不再测试你的代码的时候也是很容易得到一个点的,而不是已经做了这件事的Hadoop框架本身。

这里所设计的单元测试是一种典型的在实现发现bugs的方法,但是这些测试不会测试基于Hadoop的已经完成的MapReduce任务。本地任务运行,在下一节中描述,能让你在本地运行Hadoop程序,在一个java虚拟机中,使如果MapReduce任务失败了更加容易调试。

用Eclipse进行本地程序测试

利用Eclipse进行Hadoop开发提供了运行完整的MapReduce本地应用程序的能力——在一个单例模式下。Hadoop分布式(Hadoop-core)伴随着本地任务远行,能让你在本地计算机上运行Hadoop,在单个的JVM中。在这种情况下,你能在map或者reduce方法内部设置断点,利用eclipse调试器,和单步执行代码来检验程序的错误。

在本地的eclipse中运行MapReduce程序不许要一些特别的配置或者设置。如图5-1,只需要右键类,选择Run As(或者Debug as)再选择Java Application就行了。

图5-1:在本地Eclipse中运行MapReduce程序

注意:尽管一个本地的任务执行可以运行完整的程序,但是它也很多限制。例如,它不能运行超过一个的Reducer。(它不支持0reducer的情况。)通常,这不是问题,因为许多程序能够只用一个reducer执行。需要注意的问题是,即使你设定了多个reducer,本地任务也会忽略这些设置,并用单个的reducer。也要注意所有的本地mapper是顺序执行的。

一个基于Eclipse的本地可执行任务可以在Linux和Windows中执行。(如果在Windows中使用mapreduce程序,你需要安装Cygwin。)默认情况下,一个本地运行的任务会使用本地的文件系统进行读和写数据。

注意这个,默认情况,本地Hadoop执行程序使用本地文件系统。(如同在第二章中描述的,HDFS实现了对本地文件系统提供支持。)这意味着,所有用于测试的数据都需要拷贝到本地,产生的结果也是本地的。

如果这个是不可选的,你可以配置本地运行程序去操作集群数据(包括HBasse)。为了通过本地执行程序进入集群,你必须要使用一个配置文件,如清单5-6中所示:

清单5-6:Hadoop访问集群数据的配置文件

<?xml version="1.0" encoding="UTF-8"?>

<configuration>

<!-- hbase access -->

<property>

<name>hbase.zookeeper.quorum</name>

<value>Comma separated list of zookeeper nodes</value>

</property>

<property>

<name>hbase.zookeeper.property.clientPort</name>

<value>zookeeper port</value>

</property>

<!-- hdfs -->

<property>

<name>fs.default.name</name>

<value>hdfs://<url>/</value>

</property>

<!-- impersonation -->

<property>

<name>hadoop.job.ugi</name>

<value>hadoop, hadoop</value>

</property>

</configuration>

这个配置文件定义了三个主要的组件——HBase的配置(定义了指向Zookeeper的连接数),HDFS的配置(定义了HBase的URL),和安全模拟(这需要如果你开发的机器和Hadoop集群属于不同的安全域,或者在你本机和Hadoop集群中你使用不同的登陆名)。将这个配置文件加到可执行的应用中使相当简单的,如同清单5-7中所示。

清单5-7:加载集群信息的配置文件

Configuration.addDefaultResource("Hadoop properiies");

Configuration conf = new Configuration();

尽管利用本地执行任务进行测试相对于单元测试来说会比较彻底,一个你必须记住的就是测试hadoop程序必须关注计算规模,不论你在本地运行多少次,直到你使用真实的数据测试代码的时候,你都不会确定它是正确工作的。

实际上,许多测验都不能被验证,包括下面几项:

1、 在程序运行的时候有多少的mapper被创建,数据在它们之间是如何被拆分的?

2、 多少真实的重洗和排序?是否必要去实现一个联合器?是否一个驻内存的联合器是可选的?

3、 是什么样的硬件/软件/网络环境?是否需要调整应用程序/集群的参数?

这意味着为了保证应用程序正常工作,测试本地任务必须在Hadoop集群上用真实的数据测试。

MapReduce可执行程序的高并发性和它依赖于大量的数据,使得测试MapReduce代码变得比较具有挑战性。在下一节,你会学到如何利用Hadoop的日志来加强Hadoop执行程序的调试。

利用日志文件测试Hadoop

日志文件被软件工程广泛的使用了,并包含如下几个重要的目标:

1、 创建一个可执行的测试应用,例如,为了执行分析,或者得到潜在的提升。

2、 收集执行的各项指标,能够用来进行实时的和事后的分析,并且能自动测试,错误校验,等等。

MapReduce本身已经记录了程序执行过程中的各项日志。本地的这些文件是受Hadoop的配置文件控制的。默认情况下,它们存放在Hadoop版本文件夹下的logs子目录下。对于单个程序最重要的日志文件是TaskTracker的日志。MapReduce任务抛出的任何异常信息都会在这些日志文件中记载。

这个log文件目录下还有一个userlogs的子目录,它包含了每个任务的日志。每个任务都会记录它的stdout和stderr信息到在这个目录下的这两个文件中。每一个应用指定的日志信息包括用户的代码也存放在这些文件里。在一个多节点的Hadoop集群上,这些日志文件没有集中汇总,你必须查看每个节点下的logs/userlosgs目录。

一个访问一个任务的所有日志的方便的方法是通过JobTracker的网页。如图5-2所示。它能让你看到这个任务的mappers和reducers的所有日志。

图5-2 任务网页

所有任务的日志信息可以通过TaskTracker的任务网页进入(任务的设置和清除日志,也包括mapper和reducer的日志一致性网页)。从这些页面,你可以导航到任务的配置页面,如图5-3所示.

图5-3 任务配置页面

这个任务配置文件包含了配置对象的文本。如果你使用了大量的自己写的配置文件(例如,当你从你的驱动器传递参数到mappers和reducers的时候)。这些页面允许你配置他们期望的值。

另外,任务的安装和清理日志,包括mapper和reducer的页面,被链接到统一的日志文件页面。如图5-4所示,日志文件包括stdout,stderr和syslog三个日志文件。

图5-4 map的日志文件

任何应用程序指定的日志都应该在这个页面上显示。因为这个页面可以实时的刷新。它可以有效的查看单个执行任务的进程(假设你的日志文件记录了合适的信息)。

通过使用MapReduce框架来运行用户提供的调试脚本会使日志运行的更加有效。这些用户指定的日志信息是对于问题信息更加重要的记载。这些脚本允许从任务的输出文件(stdout和stderr),系统日文件,和任务配置文件中挖掘数据。这些从脚本的标准输出文件的到的文件可以利用任务提供的接口来使用。

你可以对失败的任务提供map和reduce分开的脚本。你可以通过对mapred.map.task.debug.script(为了调试map任务)和mapred.reduce.task.debug.script(为了调试reduce任务)属性设置合适的值来提交调试脚本。你可以通过API来设置这些属性。对于这些脚本的参数就是任务的stdout,stderr,syslog和jobconf文件。

当决定什么样的东西需要在你的日志文件中出现,使你的决定在一个有目的的情况下进行。对于调试日志文件有如下几点建议:

1. 异常或者错误代码信息应该一直输出异常信息。

2. 任何不期望的变量的值(例如,空值)应该在执行的过程中记录日志。

3. 不可预料的执行路径应该记录日志。

4. 如果异常是发生在被包含的里面的,那么在主函数块中应该记录相关日志。

5. 太多的日志文件反而使日志无效。尽量使相关的信息放在同一个日志文件中。

尽管利用JobTracker可以很方便的查看指定任务的日志文件,但它不事后自动记录日志和挖掘数据。下一节将描述适合自动记录日志的方法。

进行中的程序日志

你可以使用广泛的方法来解决日志文件的问题,利用指定的软件(例如,适合HadoopOps的Splunk)和一个定制的日志处理程序。为了实现定制的日子处理程序,所有map和reduce产生的日志文件都应该集中到一个文件总来。你可利用清单5-8(代码文件:HadoopJobLogScraper类)所展示的那样来处理,它允许你将所有相关的任务的日志集中起来,并将它们存放到单个的文件中去。

注意:这个解决方案是一个叫Dmitry Mikhelson的诺基亚同事提供的。

LISTING 5-8: Simple log screen scraper

public class HadoopJobLogScraper{

private String _trackerURL = null;

public static void main(String[] args) throws IOException{

if (args.length != 2){

System.err.println("usage: <JobTracker URL>, <job id>");

}

String jobId = args[1];

String trackerURL = args[0];

HadoopJobLogScraper scraper = new HadoopJobLogScraper(trackerURL);

scraper.scrape(jobId, JobType.MAP);

scraper.scrape(jobId, JobType.REDUCE);

System.out.println("done");

}

public enum JobType{

MAP("map"), REDUCE("reduce");

private String urlName;

private JobType(String urlName){

this.urlName = urlName;

}

public String getUrlName(){

return urlName;

}

}

private Pattern taskDetailsUrlPattern = Pattern.compile("<a

href=\"(taskdetails\\.jsp.*?)\">(.*?)</a>");

private Pattern logUrlPattern = Pattern.compile("<a

href=\"([^\"]*)\">All</a>");

public HadoopJobLogScraper (String trackerURL){

_trackerURL = trackerURL;

}

public void scrape(String jobId, JobType type) throws IOException{

System.out.println("scraping " + jobId + " - " + type);

String jobTasksUrl = _trackerURL + "/jobtasks.jsp?jobid=" + jobId +

"&type=" + type.getUrlName() + "&pagenum=1";

String jobTasksHtml = IOUtils.toString(new

URL(jobTasksUrl).openStream());

Matcher taskDetailsUrlMatcher =

taskDetailsUrlPattern.matcher(jobTasksHtml);

File dir = new File(jobId);

if (!dir.exists()){

dir.mkdir();

}

File outFile = new File(dir, type.getUrlName());

BufferedWriter out = new BufferedWriter(new FileWriter(outFile));

while (taskDetailsUrlMatcher.find()){

out.write(taskDetailsUrlMatcher.group(2) + ":\n");

String taskDetailsUrl = new String(_trackerURL + "/" +

taskDetailsUrlMatcher.group(1));

String taskDetailsHtml = IOUtils.toString(new

URL(taskDetailsUrl).openStream());

Matcher logUrlMatcher = logUrlPattern.matcher(taskDetailsHtml);

while (logUrlMatcher.find()){

String logUrl = logUrlMatcher.group(1) +

"&plaintext=true&filter=stdout";

out.write(IOUtils.toString(new URL(logUrl).openStream()));

}

}

out.flush();

out.close();

}

}

注意:这个解决方案是基于screen scraping的,也是继承自unreliable的。在页面上的任何改动都有可能打断这个实现。

从一个作业监控URL和作业的Id通过主方法对这个作业建立屏幕抓取。让后利用它从mapper和reducer中抓取日志文件。这个抓取程序利用正则表达式来解析所有的mapper和reducer页面,读取这些页面,并打印出他们的文本。

日志不是获取MapReduce执行情况的唯一方式。接下来,你可以找到另外一种方式来获取执行情况-工作计数器。

利用工作计数器进行报表度量

另外一个hadoop指定的调试和测试的方法是利用定制度量-工作计数器。正如第三章的描述,计数器是在Hadoop中轻量级的对象可以让你追踪你感兴趣的事件在map和reduce中。

MapReduce自身记录了它每次运行的度量计数器,包括输入的记录数由mapper和reducer提供的,它从HDFS中读取的或者写入的字节数,等等。因为工作页面(图 5-1)自动更新(通过默认值,每30秒),这个计数器能够用来追踪执行的情况。这个计数器也能被用作,例如,所有输入的记录都被读取和处理了。

表5-1展示了分组名和计数器名包含在了目前Hadoop支持的个人组别中。

TABLE 5-1: Hadoop’s Built-in Counters

GROUP NAME COUNTER NAME

org.apache.hadoop.mapred.Task$Counter

MAP_INPUT_RECORDS

org.apache.hadoop.mapred.Task$Counter

MAP_OUTPUT_RECORDS

org.apache.hadoop.mapred.Task$Counter

MAP_SKIPPED_RECORDS

org.apache.hadoop.mapred.Task$Counter

MAP_INPUT_BYTES

org.apache.hadoop.mapred.Task$Counter

MAP_OUTPUT_BYTES

org.apache.hadoop.mapred.Task$Counter

COMBINE_INPUT_RECORDS

org.apache.hadoop.mapred.Task$Counter

COMBINE_OUTPUT_RECORDS

org.apache.hadoop.mapred.Task$Counter

REDUCE_INPUT_GROUPS

org.apache.hadoop.mapred.Task$Counter

REDUCE_SHUFFLE_BYTES

org.apache.hadoop.mapred.Task$Counter

REDUCE_INPUT_RECORDS

org.apache.hadoop.mapred.Task$Counter

REDUCE_OUTPUT_RECORDS

org.apache.hadoop.mapred.Task$Counter

REDUCE_SKIPPED_GROUPS

org.apache.hadoop.mapred.Task$Counter

REDUCE_SKIPPED_RECORDS

org.apache.hadoop.mapred.JobInProgress$Counter

TOTAL_LAUNCHED_MAPS

org.apache.hadoop.mapred.JobInProgress$Counter

RACK_LOCAL_MAPS

org.apache.hadoop.mapred.JobInProgress$Counter

DATA_LOCAL_MAPS

org.apache.hadoop.mapred.JobInProgress$Counter

TOTAL_LAUNCHED_REDUCES

FileSystemCounters

FILE_BYTES_READ

FileSystemCounters

HDFS_BYTES_READ

FileSystemCounters

FILE_BYTES_WRITTEN

FileSystemCounters

HDFS_BYTES_WRITTEN

你可以通过这些计数器来获得更多任务执行情况的信息——例如,mapper的input/outpu计数(MAP_INPUT_RECORDS/MAP_INPUT_RECORDS),HDFS读写的字节数(HDFS_BYTES_READ/ HDFS_BYTES_WRITTEN),等等。补充一下,你可定制应用程序的计数器-指定值-例如,中间计算的数值,或者代码分支的数量(可以在以后程序的测试和调试过程中有用)。

给mapper和reducer类传递的文本对象可以被用来更新计数器。相同的计数器变量(基于名称)被所有的mapper和reducer实例,并且通过集群的master节点合并计数,因此在这种方法下他们是“线程安全的”。5-9列表中展示了简单的代码片段显示如何来创建和使用定制计数器。

LISTING 5-9: Updating counters

……………………………………………………………………………

private static String COUNTERGROUP = "debugGroup";

private static String DEBUG1 = "debug1";

………………………………………………………………

context.getCounter(COUNTERGROUP, DEBUG1).increment(1);

在列表5-9中,如果这是第一次使用计数器,合适的计数对象会被创建为初始值为0。

每个工作的计数器个数

计数器存放在JobTracker中,这意味着如果一个工作尝试着创建一百万的计数器,JobTracker将会生成“超过内存空间”的错误。(参考第三章mapreduce推荐的设计。)为了避免这个错误,每一个工作可以创建的计数器个数被Hadoop框架所限制。

这里是Hadoop 1.0的计数器类的一些代码片段:

/** limit on the size of the name of the group **/

private static final int GROUP_NAME_LIMIT = 128;

/** limit on the size of the counter name **/

private static final int COUNTER_NAME_LIMIT = 64;

private static final JobConf conf = new JobConf();

/** limit on counters **/

public static int MAX_COUNTER_LIMIT =

conf.getInt("mapreduce.job.counters.limit", 120);

/** the max groups allowed **/

static final int MAX_GROUP_LIMIT = 50;

注意每个计数器组是没有配置的,然而计数器是配置的(在基本的cluster-wide中)。

在Hadoop 2.0中,所有的参数都是配置的。下面是MRJobconfig类的代码片段:

public static final String COUNTERS_MAX_KEY =

"mapreduce.job.counters.max";

public static final int COUNTERS_MAX_DEFAULT = 120;

public static final String COUNTER_GROUP_NAME_MAX_KEY =

"mapreduce.job.counters.group.name.max";

public static final int COUNTER_GROUP_NAME_MAX_DEFAULT = 128;

public static final String COUNTER_NAME_MAX_KEY =

"mapreduce.job.counters.counter.name.max";

public static final int COUNTER_NAME_MAX_DEFAULT = 64;

public static final String COUNTER_GROUPS_MAX_KEY =

"mapreduce.job.counters.groups.max";

public static final int COUNTER_GROUPS_MAX_DEFAULT = 50;

如果一个工作尝试创建比指定的更多地计数器,如下的一个异常将会在运行的时候抛出:

org.apache.hadoop.mapred.Counters$CountersExceededException: Error:

Exceeded limits on number of counters - Counters=xxx Limit=xxx

定制的计数器可以通过JobTracker的配置页面来指定(如图5-2),表5-10展示了简单的代码片段显示了如何打印出计数器的文本无论是运行结束的还是开始的工作。

LISTING 5-10: Printing Job’s counters

// Now lets get the counters put them in order by job_id and then print

// them out.

Counters c = job.getCounters();

// now walk through counters adding them to a sorted list.

Iterator<CounterGroup> i = c.iterator();

while (i.hasNext()){

CounterGroup cg = i.next();

System.out.println("Counter Group =:"+cg.getName());

Iterator<Counter> j = cg.iterator();

while (j.hasNext()){

Counter cnt = j.next();

System.out.println("\tCounter: "+cnt.getName()+

"=:"+cnt.getValue());

}

}

本章中介绍的日志文件和计数器都提供了工作任务执行的概况的信息。它们对于你查找那里出现了问题时有力的工具。它们被用来测试和调试用户代码,不幸的是,即使是完全正确的Hadoop应用程序也可能失败因为数据的中断。防御式编程帮助我们提供能够部分应对中断的方法。

在MapReduce中的防御式编程

应为Hadoop工作在一个大量数据输入的环境中(许多数据可以中断),经常的杀掉一个工作进程每次mapper不能处理输入的数据时或者应为数据本身的中断,或者因为map函数的bug(例如,在第三方库中,源代码是不可见的)。在这种情况下,一个标准的Hadoop恢复机制将会非常有帮助。不论你多少次尝试着阅读坏的记录,最后的结果将是相同的-map执行程序将会失败。

如果一个应用程序可以接受略过某些数据,像这样正确执行解决方案从而使整个应用程序更加稳健和可维护。不幸的是,这样的一个应用程序不是普通的一个应用程序。在这种情况下,一个异常可能发生在reader负责读取数据的过程,也可能发生在mapper处理数据的过程。想正确的处理这个情况需要如下的几点要求:

1、 注意在读取数据时候的错误,让所有读取时候的错误都能正确处理,然后文件指针移动到下一个位置。

2、 一种向mapper发送reader错误信息的机制保证mapper能正确输出信息。

3、 注意在mapper中的错误信息,保证所有的错误都能正确的处理。

4、 一个定制的OutPutFormat(类似于第四章描述的内容)能够将错误信息输出到一个错误字典中。

幸运的是,Hadoop允许你实现一个单纯的应用提供一个能略过一些记录当你确信它会引起任务中断的时候。如果这个略过模式开启的话,一个任务在这种模式下会被尝试执行多次。一旦在这种模式下,TaskTrcker决定那条记录引起这个失败。TaskTracker然后重启这个任务,但是会略过这些坏的记录。

应用程序可以通过SkipBadRecords类来控制这种特性,这个类提供了许多静态方法。工作驱动必须调用如下一个或者两个方法来打开map和reduce任务的略过记录功能:

setMapperMaxSkipRecords(Configuration conf,long maxSkipRecs)

setReducerMaxSkipGroups(Configuration conf,long maxSkipGrps)

如果最大略过数设置为0(默认情况),略过记录功能不可用。略过数依赖于程序中计数器自增记录数。你应当在每次记录被处理之后来增加计数器。如果这个不能做到(许多程序会分开来进行处理),这个框架可能会围绕坏记录来增加记录。

Hadoop利用分治法找到需要略过的记录,它每次分开执行这个有略过的任务,并决定另外一半包含坏的记录数。这个过程会迭代进行直到略过的范围在可接受的之内。这是一个相当耗费资源的操作,尤其是略过的最大值非常小的时候。它可能需要必要的增加最大值的设定来让正常的hadoop任务恢复机制接受额外的尝试。

如果略过功能是开启的,当任务失败的时候任务会向TaskTracker报告正在处理的返回记录,然后TaskTracker会再次尝试这个任务,并略过引起失败的记录。由于网络故障或者是对错误记录处理的失败,略过模式会再任务两次错误之后开启。

对于在一个记录中一致性失败的任务,TaskTracker运行多次任务尝试如下的结果:

1、 没有指定动作的失败的尝试(两次)。

2、 被TaskTracker存储的失败的记录的失败的尝试。

3、 在新的略过坏的记录失败的情况下,利用SkipBadRecords类中的setAttmptsToStartSkippint(int attemps)方法你能修改任务失败记录的数量来触发略过模式。Hadoop会将略过的记录存储在HDFS中,以便以后的分析使用。他们在_log/skip文件下以序列的方式写入。

总结

这章讨论了建造可靠的MapReduce应用程序的标准。你学到了如何利用有利的MRUnit组件来测试MapReduce应用程序的方法,并利用本地的Hadoop工作任务来调试已经完成的应用程序。你也学到了利用日志和程序计数器来查看MapReduce的执行情况。最后,你学到了如何设计,实现,和调试MapReduce,第六章讨论如何利用Apache Oozie将MapReduce程序结合在一起。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券