JobTracker会接受TaskTracker的心跳,并处理。不多说,直接上源码
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean restarted,
boolean initialContact,
boolean acceptNewTasks,
short responseId)
1 首先检查heartbeat是否来自自己的host列表,否则抛出异常。
如果不再Host列表或者在排除Host列表中,退出心跳处理。
return (inHostsList(status) && !inExcludedHostsList(status));
2 判断是否在黑名单、灰名单、默认名单,并从这些名单中删除
黑名单、灰名单主要是Hadoop的容错机制,在此不做过多解释,可以单写一篇文章。
faultyTrackers.markTrackerHealthy(status.getHost());
3 根据trackerName获取上一个heartbeat response
HeartbeatResponse prevHeartbeatResponse =
trackerToHeartbeatResponseMap.get(trackerName);
4 如果上一个heartbeat 为null,让Tasktracker重新初始化 如果是第一个 response 从recoveryMap中移除
if (prevHeartbeatResponse == null) {
// This is the first heartbeat from the old tracker to the newly
// started JobTracker
if (hasRestarted()) {
addRestartInfo = true;
// inform the recovery manager about this tracker joining back
recoveryManager.unMarkTracker(trackerName);
} else {
// Jobtracker might have restarted but no recovery is needed
// otherwise this code should not be reached
LOG.warn("Serious problem, cannot find record of 'previous' " +
"heartbeat for '" + trackerName +
"'; reinitializing the tasktracker");
return new HeartbeatResponse(responseId,
new TaskTrackerAction[] {new ReinitTrackerAction()});
}
}
如果重发的responseId,丢弃掉。
if (prevHeartbeatResponse.getResponseId() != responseId) {
LOG.info("Ignoring 'duplicate' heartbeat from '" +
trackerName + "'; resending the previous 'lost' response");
return prevHeartbeatResponse;
}
5 处理heartbeat 首先 updateTaskTrackerStatus 如果是被遗忘的tasktracker 加入队列中;更新任务状态;更新健康节点状态;
private synchronized boolean processHeartbeat(
TaskTrackerStatus trackerStatus,
boolean initialContact,
long timeStamp) throws UnknownHostException {
//主要集中在此不分析那么详细了
}
6 检查新Task是否执行,如果没有执行,加入执行队列
if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
} else {
List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
if (tasks == null ) {
tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
}
//添加Task
if (tasks != null) {
for (Task task : tasks) {
expireLaunchingTasks.addNewTask(task.getTaskID());
if(LOG.isDebugEnabled()) {
LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
}
actions.add(new LaunchTaskAction(task));
}
}
}
}
7 检查Task是否杀死
List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
if (killTasksList != null) {
actions.addAll(killTasksList);
}
8 检查 task 是否cleanup
List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);
if (killJobsList != null) {
actions.addAll(killJobsList);
}
9 检查task 的output是否可以save
List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
if (commitTasksList != null) {
actions.addAll(commitTasksList);
}
10 计算下一次heartbeat的时间间隔
int nextInterval = getNextHeartbeatInterval();
response.setHeartbeatInterval(nextInterval);
response.setActions(
actions.toArray(new TaskTrackerAction[actions.size()]));
11 更新heartbeatMap,并remove掉Marked已经处理掉的heartbeat
// 更新Map
trackerToHeartbeatResponseMap.put(trackerName, response);
//清除处理完成的心跳
removeMarkedTasks(trackerName);
不对之处欢迎讨论。
=================参考====
hadoop源码。
分享到:
相关推荐
可以导进去编程的时候了解实现 hadoop-core-0.20.2 源码 hadoop-2.5.1-src.tar.gz 源码
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
Eclipse集成Hadoop2.10.0的插件,使用`ant`对hadoop的jar包进行打包并适应Eclipse加载,所以参数里有hadoop和eclipse的目录. 必须注意对于不同的hadoop版本,` HADDOP_INSTALL_PATH/share/hadoop/common/lib`下的jar包...
赠送jar包:hadoop-yarn-client-2.6.5.jar; 赠送原API文档:hadoop-yarn-client-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom;...
赠送jar包:hadoop-mapreduce-client-common-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.6.5-sources.jar; 赠送Maven依赖信息...
hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...
赠送jar包:hadoop-yarn-common-2.6.5.jar 赠送原API文档:hadoop-yarn-common-2.6.5-javadoc.jar 赠送源代码:hadoop-yarn-common-2.6.5-sources.jar 包含翻译后的API文档:hadoop-yarn-common-2.6.5-javadoc-...
hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包
赠送jar包:hadoop-yarn-server-resourcemanager-2.6.0.jar; 赠送原API文档:hadoop-yarn-server-resourcemanager-2.6.0-javadoc.jar; 赠送源代码:hadoop-yarn-server-resourcemanager-2.6.0-sources.jar; 赠送...
hadoop-eclipse-plugin-2.7.4.jar和hadoop-eclipse-plugin-2.7.3.jar还有hadoop-eclipse-plugin-2.6.0.jar的插件都在这打包了,都可以用。
hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1
赠送jar包:hadoop-yarn-api-2.5.1.jar; 赠送原API文档:hadoop-yarn-api-2.5.1-javadoc.jar; 赠送源代码:hadoop-yarn-api-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-api-2.5.1.pom; 包含翻译后...
hadoop-eclipse-plugin-3.1.1, hadoop eclipse 插件 3.1.1
赠送jar包:hadoop-hdfs-client-2.9.1.jar 赠送原API文档:hadoop-hdfs-client-2.9.1-javadoc.jar 赠送源代码:hadoop-hdfs-client-2.9.1-sources.jar 包含翻译后的API文档:hadoop-hdfs-client-2.9.1-javadoc-...
hadoop-mapreduce-examples-2.7.1.jar
赠送jar包:hadoop-hdfs-client-2.9.1.jar; 赠送原API文档:hadoop-hdfs-client-2.9.1-javadoc.jar; 赠送源代码:hadoop-hdfs-client-2.9.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-client-2.9.1.pom;...
赠送jar包:hadoop-auth-2.5.1.jar; 赠送原API文档:hadoop-auth-2.5.1-javadoc.jar; 赠送源代码:hadoop-auth-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-auth-2.5.1.pom; 包含翻译后的API文档:hadoop...
hadoop-eclipse-plugin-3.1.3,eclipse版本为eclipse-jee-2020-03
赠送jar包:hadoop-yarn-server-common-2.6.5.jar; 赠送原API文档:hadoop-yarn-server-common-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-server-common-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-...
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar