AM在Yarn中负责应用的监控,跟踪应用执行状态,重启失败任务等,是调度工作的绝对核心。在其初始化函数中会启动NM,RM,Yarn的client,同时启动ZookeeperStore,StatusManager,requestManager,SelectionManager等服务。调度也围绕这些服务进行。
RequestManager部分
RequestManager将会定时进行周期的PullRequest。在这个函数中,会从ZookeeperStore获得新的Request信息。主要来说,依次调用以下若干个函数处理Request:
updateFrameworkDescriptor
:当FrameworkDescriptor发生变化时进行处理。这是调度的一个重要函数,因为此处会检测到FrameworkDescriptor中taskRoles的变化,对于新任务来说,这里面更新的内容就是新增的所有任务updateOverrideApplicationProgressRequest
:暂不清楚,但是从函数的实现来看,做的是一个所有任务配置文件的Override,与调度关系应当不大(需要确认)updateMigrateTaskRequests
:在AM结束容器或者用户手动结束容器时进行收尾工作(垃圾回收等),与调度无关
因此与调度相关的就是updateFrameworkDescriptor
,再进到里面去看看干了啥:
首先由于Request发生了更新,需要更新requestManager自身的数据结构,更新任务信息。但是能做的也仅限于此,接下来需要通知AM发生了任务数量的变化,让AM对新任务进一步处理,具体做法是会直接调用AM的onTaskNumbersUpdated
AM部分
此处会调用transitionTaskStateQueue.queueSystemTask
将一个函数的执行加入队列,这些函数的任务都涉及到任务的FSM的状态转换,此处的函数内容如下:
首先会调用StatusManager的updateTaskNumbers
对其记录的数据结构进行修改,此处会把新的任务的状态初始化为空。然后启动容器选择与分配函数addContainerRequest()
的无参数版本,这个函数会进一步遍历所有处于状态TASK_WAITING的任务,根据优先级,用一个循环逐个调用addContainerRequest(taskStatus)
在这个函数中一开始就会进行资源的分配与申请,即资源的调度,主要函数是setupContainerRequest
,这个函数的执行如果出现异常,会直接将该任务的分配推迟setupContainerRequestRetryIntervalSec * 1000时间之后再次加入transitionTaskStateQueue。并且直接返回,这样做就是因为这个函数干的事太重要了,它真正向Hadoop发出了资源请求,并且决定了资源如何调度。
setupContainerRequest
这个函数就是真正的调度函数啦。首先,会查看当前所需的资源是否超过了集群默认配置的最大值。
1
2
3
4
5
6
7
8
9
ResourceDescriptor requestResource = requestManager.getTaskResources().get(taskRoleName);
ResourceDescriptor maxResource = conf.getMaxResource();
if (!ResourceDescriptor.fitsIn(requestResource, maxResource)) {
LOGGER.logWarning(
"Request Resource does not fit in the Max Resource configured in current cluster, " +
"request may fail or never get satisfied: " +
"Request Resource: [%s], Max Resource: [%s]",
requestResource, maxResource);
}
超过的话当然接下来怎么调度都不够用,所以看下面的代码。
1
2
3
4
5
6
7
8
9
10
11
if (requestResource.getGpuNumber() > 0 || requestResource.getPortNumber() > 0) {
updateNodeReports(yarnClient.getNodeReports(NodeState.RUNNING));
SelectionResult selectionResult = selectionManager.selectSingleNode(taskRoleName);
ResourceDescriptor optimizedRequestResource = selectionResult.getOptimizedResource();
if (selectionResult.getNodeHosts().size() > 0) {
return HadoopUtils.toContainerRequest(optimizedRequestResource, requestPriority, null, selectionResult.getNodeHosts().get(0));
}
return HadoopUtils.toContainerRequest(optimizedRequestResource, requestPriority, requestNodeLabel, null);
}
return HadoopUtils.toContainerRequest(requestResource, requestPriority, requestNodeLabel, null);
如果当前的任务有端口号或者GPU需求的话,就进入内层分配,否则直接发Hadoop请求(这是什么鬼,CPU就不用调度了么😂),接下来看内层。
yarnClient获得所有正在运行的节点列表,让selectionManager调用selectSingleNode
选择单个节点然后发Hadoop申请资源的请求。
打住
很明显,selectSingleNode
这个函数对节点进行了选择。这里面就是我们所要找的原始调度算法代码。下一篇我会详细分析原始算法,并且提出修改的思路。
This work is licensed under a
Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International license.