本文从一个bug入手,为大家展示Hadoop Speculative机制,以及编写mapreduce程序的注意点。
一个小弟来找我帮忙调试一个问题。
项目大概:用C++编写 map reduce程序,mapper会解析大文件为很多小文件。每个小文件名字是唯一确定的(小文件名字 = 大文件名字 + index),为了调试,文件名字又加入了生成文件的时间戳。
问题描述:程序运行成功,突然发现有很多“同名+不同时间戳”的文件。
这个小弟完全懵圈了,看代码没有任何异常,但就是同一个文件被生成多次。只能找我帮忙看看。
map程序示例如下(仅仅是示例)
int main(int argc, char** argv) {
string key;
while(cin >> key) {
key是文件名字,
解析这个大文件为若干小文件,然后存储
cout << key << "/t" << "1" << endl;
}
return 0;
}
reduce.cpp程序如下:
int main(int argc, char** argv) {
string key, num;
map<string, int> count;
map<string, int>::iterator it;
while(cin >> key >> num) {
it = count.find(key);
if(it != count.end()) {
it->second++;
}
else {
count.insert(make_pair(key, 1));
}
}
for(it = count.begin(); it != count.end(); it++) {
cout << it->first << "/t" << it->second << endl;
}
return 0;
}
看到这里,有的兄弟会提出疑问,这不就是wordcount的reduce代码嘛!
是的,这个就是wordcount的reduce代码。其实这个项目的reducer没有什么业务逻辑意义,业务完全在map程序中执行,或者说业务就是map的副产品而已。而这个就是最终问题所在。
其实从这个问题原因看,就是大致定位出问题点也没法找出最终原因。我们当时只能定位如下:
因为无法最终定位,所以只能从配置项,官方文档和网上资料中查找。
我们开始怀疑是Hadoop有备份或者高可用机制,这样就会启动多个task,但这样没法解释为啥有一个map node参与处理却没有参与最后输出。
最后觉得配置中有一个speculative.execution
的字样平时没怎么关注,而且看起来和执行相关。于是就去查找,结果发现就是这家伙造成的。
作业(job)提交时,会被map-reduce 框架的 JobTracker 拆成一系列的map任务、reduce任务在整个hadoop 集群的机器上执行。
由于一些原因,可能是硬件老化,软件层面的不恰当配置,程序Bug,负载不均或者其他的一些问题,导致在一个JOB下的多个TASK速度不一致,比如有的任务已经完成,但是有些任务可能只跑了10%,根据木桶原理,这些任务将成为整个JOB的短板。
Straggle(掉队者)是指那些跑的很慢但最终会成功完成的任务。一个掉队的Map任务会阻止Reduce任务开始执行。
Hadoop不会尝试去诊断或者修复这些Straggle,但是可以识别那些跑的比较慢的任务。它会在集群的其他节点上去启动这些慢任务的多个实例作为备份,这就是hadoop的推测执行(speculative execution)。
如果集群启动了推测执行,这时为了最大限度的提高短板,Hadoop会为该task启动备份任务,让speculative task与原始task同时处理一份数据,哪个先运行完,则将谁的结果作为最终结果,并且在运行完成后Kill掉另外一个任务。
推测执行(Speculative Execution)是通过利用更多的资源来换取时间的一种优化策略。
我们这个bug就是因为推测执行造成的。
Hadoop发现有的任务执行慢,就启动了备份任务。这时候两个任务都对同样的文件进行解析,生成新的文件。这样文件就被重复生成了。但是因为推测执行机制,最终只有一个任务顺利完成,这就是为什么reducer只看到一个任务执行完成。
MapReduce任务有两个参数可以控制Speculative Task:
这两个参数默认都为true。
所以我们直接修改mapred-site.xml,如下:
<property>
<name>mapreduce.map.speculative</name>
<value>false</value>
</property>
<property>
<name>mapreduce.reduce.speculative</name>
<value>false</value>
</property>
则没有重复文件输出。
map函数应该是幂等的,即同样的输入,如果map执行到一半退出,在另外一个节点重试这个map任务,则应该得到同样的业务逻辑和业务输出。
因此,编写map / reduce程序要特别小心,特别是和外部资源如数据库/文件相关时候。这时候如果误用了Speculative Task,则很容易发生重复读/写,产生异常。同时又额外消耗了节点资源。
以下 hadoop 源码分析均摘录于 Hadoop2.6.0运行mapreduce之推断(speculative)执行(上)。
这部分源码在 org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator
。
maybeScheduleASpeculation用于计算map或者reduce任务推断调度的可能性。
private int maybeScheduleASpeculation(TaskType type) {
int successes = 0;
long now = clock.getTime();
//根据当前Task的类型(map或reduce)获取相应类型任务的需要分配Container数量的缓存containerNeeds
ConcurrentMap<JobId, AtomicInteger> containerNeeds
= type == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
//遍历containerNeeds
for (ConcurrentMap.Entry<JobId, AtomicInteger> jobEntry : containerNeeds.entrySet()) {
// This race conditon is okay. If we skip a speculation attempt we
// should have tried because the event that lowers the number of
// containers needed to zero hasn't come through, it will next time.
// Also, if we miss the fact that the number of containers needed was
// zero but increased due to a failure it's not too bad to launch one
// container prematurely.
//如果当前Job依然有未分配Container的Task,那么跳过当前循环,继续下一次循环。这说明如果当前Job的某一类型的Task依然存在未分配Container的,则不会进行任务推断;
if (jobEntry.getValue().get() > 0) {
continue;
}
int numberSpeculationsAlready = 0;
int numberRunningTasks = 0;
// loop through the tasks of the kind
//从当前应用的上下文AppContext中获取Job,并获取此Job的所有的Task(map或者reduce)
Job job = context.getJob(jobEntry.getKey());
Map<TaskId, Task> tasks = job.getTasks(type);
//计算允许执行推断的Task数量numberAllowedSpeculativeTasks(map或者reduce)
int numberAllowedSpeculativeTasks
= (int) Math.max(minimumAllowedSpeculativeTasks,
proportionTotalTasksSpeculatable * tasks.size());
TaskId bestTaskID = null;
long bestSpeculationValue = -1L;
// this loop is potentially pricey.
// TODO track the tasks that are potentially worth looking at
//遍历Job对应的map任务或者reduce任务集合,在迭代完所有的map任务或者reduce任务后,获取这一任务集合中的推断值bestSpeculationValue最大的任务ID。
for (Map.Entry<TaskId, Task> taskEntry : tasks.entrySet()) {
//调用speculationValue方法获取每一个Task的推断值。
long mySpeculationValue = speculationValue(taskEntry.getKey(), now);
if (mySpeculationValue == ALREADY_SPECULATING) {
++numberSpeculationsAlready;
}
if (mySpeculationValue != NOT_RUNNING) {
++numberRunningTasks;
}
//获取这一任务集合中的推断值bestSpeculationValue最大的任务ID
if (mySpeculationValue > bestSpeculationValue) {
bestTaskID = taskEntry.getKey();
bestSpeculationValue = mySpeculationValue;
}
}
//再次计算numberAllowedSpeculativeTasks
numberAllowedSpeculativeTasks
= (int) Math.max(numberAllowedSpeculativeTasks,
proportionRunningTasksSpeculatable * numberRunningTasks);
// If we found a speculation target, fire it off
//如果numberAllowedSpeculativeTasks大于numberSpeculationsAlready(已经推断执行过的Task数量),则调用addSpeculativeAttempt方法将第4步中选出的任务的任务ID添加到推断尝试中。
if (bestTaskID != null
&& numberAllowedSpeculativeTasks > numberSpeculationsAlready) {
addSpeculativeAttempt(bestTaskID);
++successes;
}
}
return successes;
}
speculationValue方法主要用于估算每个任务的推断值。主要是“备份任务实例运行”的结束时间 和 “原任务实例的结束时间” 的差值,越大则调度执行的价值越大。
speculationValue方法的执行步骤如下:
private long speculationValue(TaskId taskID, long now) {
Job job = context.getJob(taskID.getJobId());
Task task = job.getTask(taskID);
Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
long acceptableRuntime = Long.MIN_VALUE;
long result = Long.MIN_VALUE;
//如果任务还没有被推断执行,那么调用estimator的thresholdRuntime方法获取任务可以接受的运行时长acceptableRuntime
if (!mayHaveSpeculated.contains(taskID)) {
acceptableRuntime = estimator.thresholdRuntime(taskID);
if (acceptableRuntime == Long.MAX_VALUE) {
return ON_SCHEDULE;
}
}
TaskAttemptId runningTaskAttemptID = null;
int numberRunningAttempts = 0;
for (TaskAttempt taskAttempt : attempts.values()) {
if (taskAttempt.getState() == TaskAttemptState.RUNNING
|| taskAttempt.getState() == TaskAttemptState.STARTING) {
//如果任务的运行实例数大于1,则说明此任务已经发生了推断执行,因此返回ALREADY_SPECULATING。
if (++numberRunningAttempts > 1) {
return ALREADY_SPECULATING;
}
runningTaskAttemptID = taskAttempt.getID();
//调用estimator的estimatedRuntime方法获取任务运行实例的估算运行时长estimatedRunTime。
long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);
//调用estimator的attemptEnrolledTime方法获取任务实例开始运行的时间,此时间即为startTimes中缓存的start。
long taskAttemptStartTime
= estimator.attemptEnrolledTime(runningTaskAttemptID);
if (taskAttemptStartTime > now) {
// This background process ran before we could process the task
// attempt status change that chronicles the attempt start
return TOO_NEW;
}
//estimatedEndTime表示估算任务实例的运行结束时间
long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
//调用estimator的estimatedNewAttemptRuntime方法估算如果此时重新为任务启动一个实例,此实例运行结束的时间estimatedReplacementEndTime。
long estimatedReplacementEndTime
= now + estimator.estimatedNewAttemptRuntime(taskID);
float progress = taskAttempt.getProgress();
TaskAttemptHistoryStatistics data =
runningTaskAttemptStatistics.get(runningTaskAttemptID);
if (data == null) {
//如果缓存中没有任务实例的历史统计信息,那么将estimatedRunTime、任务实例进度progress,当前时间封装为历史统计信息缓存起来。
runningTaskAttemptStatistics.put(runningTaskAttemptID,
new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));
} else {
//如果缓存中存在任务实例的历史统计信息
if (estimatedRunTime == data.getEstimatedRunTime()
&& progress == data.getProgress()) {
//如果缓存的estimatedRunTime和本次估算的estimatedRunTime一样并且缓存的实例进度progress和本次获取的任务实例进度progress一样
// Previous stats are same as same stats
if (data.notHeartbeatedInAWhile(now)
|| estimator.hasStagnatedProgress(runningTaskAttemptID, now)) {
// Stats have stagnated for a while, simulate heart-beat.
TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
taskAttemptStatus.id = runningTaskAttemptID;
taskAttemptStatus.progress = progress;
taskAttemptStatus.taskState = taskAttempt.getState();
// Now simulate the heart-beat
//说明有一段时间没有收到心跳了,则模拟一次心跳。
handleAttempt(taskAttemptStatus);
}
} else {
//如果缓存的estimatedRunTime和本次估算的estimatedRunTime不一样或者缓存的实例进度progress和本次获取的任务实例进度progress不一样
// Stats have changed - update our data structure
//将estimatedRunTime、任务实例进度progress,当前时间更新到任务实例的历史统计信息中。
data.setEstimatedRunTime(estimatedRunTime);
data.setProgress(progress);
data.resetHeartBeatTime(now);
}
}
//如果estimatedEndTime小于当前时间,则说明任务实例的进度良好,返回PROGRESS_IS_GOOD,PROGRESS_IS_GOOD等于Long.MIN_VALUE + 3。
if (estimatedEndTime < now) {
return PROGRESS_IS_GOOD;
}
//如果estimatedReplacementEndTime大于等于estimatedEndTime,则说明即便启动备份任务实例也无济于事,因为它的结束时间达不到节省作业总运行时长的作用。
if (estimatedReplacementEndTime >= estimatedEndTime) {
return TOO_LATE_TO_SPECULATE;
}
//计算本次估算的结果值result,它等于estimatedEndTime - estimatedReplacementEndTime,当这个差值越大表示备份任务实例运行后比原任务实例的结束时间就越早,因此调度执行的价值越大。
result = estimatedEndTime - estimatedReplacementEndTime;
}
}
// If we are here, there's at most one task attempt.
if (numberRunningAttempts == 0) {
//如果numberRunningAttempts等于0,则表示当前任务还没有启动任务实例,返回NOT_RUNNING,NOT_RUNNING等于Long.MIN_VALUE + 4。
return NOT_RUNNING;
}
//重新计算acceptableRuntime,处理方式与第1步相同。
if (acceptableRuntime == Long.MIN_VALUE) {
acceptableRuntime = estimator.thresholdRuntime(taskID);
if (acceptableRuntime == Long.MAX_VALUE) {
return ON_SCHEDULE;
}
}
return result;
}