qizhi的任务调度机制(一)

调度流程初探

CYQ February 6, 2019 views 2895 words

AM在Yarn中负责应用的监控,跟踪应用执行状态,重启失败任务等,是调度工作的绝对核心。在其初始化函数中会启动NM,RM,Yarn的client,同时启动ZookeeperStore,StatusManager,requestManager,SelectionManager等服务。调度也围绕这些服务进行。

RequestManager部分

RequestManager将会定时进行周期的PullRequest。在这个函数中,会从ZookeeperStore获得新的Request信息。主要来说,依次调用以下若干个函数处理Request:

  • updateFrameworkDescriptor:当FrameworkDescriptor发生变化时进行处理。这是调度的一个重要函数,因为此处会检测到FrameworkDescriptor中taskRoles的变化,对于新任务来说,这里面更新的内容就是新增的所有任务
  • updateOverrideApplicationProgressRequest:暂不清楚,但是从函数的实现来看,做的是一个所有任务配置文件的Override,与调度关系应当不大(需要确认
  • updateMigrateTaskRequests:在AM结束容器或者用户手动结束容器时进行收尾工作(垃圾回收等),与调度无关

因此与调度相关的就是updateFrameworkDescriptor,再进到里面去看看干了啥:

首先由于Request发生了更新,需要更新requestManager自身的数据结构,更新任务信息。但是能做的也仅限于此,接下来需要通知AM发生了任务数量的变化,让AM对新任务进一步处理,具体做法是会直接调用AM的onTaskNumbersUpdated

AM部分

此处会调用transitionTaskStateQueue.queueSystemTask将一个函数的执行加入队列,这些函数的任务都涉及到任务的FSM的状态转换,此处的函数内容如下:

首先会调用StatusManager的updateTaskNumbers对其记录的数据结构进行修改,此处会把新的任务的状态初始化为空。然后启动容器选择与分配函数addContainerRequest()的无参数版本,这个函数会进一步遍历所有处于状态TASK_WAITING的任务,根据优先级,用一个循环逐个调用addContainerRequest(taskStatus)

在这个函数中一开始就会进行资源的分配与申请,即资源的调度,主要函数是setupContainerRequest,这个函数的执行如果出现异常,会直接将该任务的分配推迟setupContainerRequestRetryIntervalSec * 1000时间之后再次加入transitionTaskStateQueue。并且直接返回,这样做就是因为这个函数干的事太重要了,它真正向Hadoop发出了资源请求,并且决定了资源如何调度。

setupContainerRequest

这个函数就是真正的调度函数啦。首先,会查看当前所需的资源是否超过了集群默认配置的最大值。

1
2
3
4
5
6
7
8
9
    ResourceDescriptor requestResource = requestManager.getTaskResources().get(taskRoleName);
    ResourceDescriptor maxResource = conf.getMaxResource();
    if (!ResourceDescriptor.fitsIn(requestResource, maxResource)) {
      LOGGER.logWarning(
          "Request Resource does not fit in the Max Resource configured in current cluster, " +
              "request may fail or never get satisfied: " +
              "Request Resource: [%s], Max Resource: [%s]",
          requestResource, maxResource);
    }

超过的话当然接下来怎么调度都不够用,所以看下面的代码。

1
2
3
4
5
6
7
8
9
10
11
    if (requestResource.getGpuNumber() > 0 || requestResource.getPortNumber() > 0) {
      updateNodeReports(yarnClient.getNodeReports(NodeState.RUNNING));
      SelectionResult selectionResult = selectionManager.selectSingleNode(taskRoleName);

      ResourceDescriptor optimizedRequestResource = selectionResult.getOptimizedResource();
      if (selectionResult.getNodeHosts().size() > 0) {
        return HadoopUtils.toContainerRequest(optimizedRequestResource, requestPriority, null, selectionResult.getNodeHosts().get(0));
      }
      return HadoopUtils.toContainerRequest(optimizedRequestResource, requestPriority, requestNodeLabel, null);
    }
    return HadoopUtils.toContainerRequest(requestResource, requestPriority, requestNodeLabel, null);

如果当前的任务有端口号或者GPU需求的话,就进入内层分配,否则直接发Hadoop请求(这是什么鬼,CPU就不用调度了么😂),接下来看内层。

yarnClient获得所有正在运行的节点列表,让selectionManager调用selectSingleNode选择单个节点然后发Hadoop申请资源的请求。

打住

很明显,selectSingleNode这个函数对节点进行了选择。这里面就是我们所要找的原始调度算法代码。下一篇我会详细分析原始算法,并且提出修改的思路。