qizhi的任务调度机制(三)

机器调度(一)

CYQ March 1, 2019 views 4564 words

已经开始改代码了,目前GPU调度的代码运行良好。调度结果符合预期,因此先卖个关子。考虑目前的难点:机器调度。

之前考虑的问题是:由于Task是逐个调度的,所以应该让一个任务尽可能调度到前面成功调度的节点上。于是就需要知道前面的任务都调到哪儿了。

StatusManager就是记录这个的,借助其记录状态的功能,可以方便的获得所有之前已经进行过调度的任务状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    // If previous tasks get their containers requested on some nodes, the following tasks should reuse these nodes to reduce communication overheads. 
    Set<TaskState> taskStateSet = new HashSet<>();
    List<TaskStatus> containerRequestedTaskStatuses = statusManager.getTaskStatus(taskStateSet, false);
    List<Node> reusedNodes = new ArrayList<Node>();
    if (!candidateNodes.isEmpty() && !containerRequestedTaskStatuses.isEmpty()) {
      for (Node candidateNode : candidateNodes) {
        for (TaskStatus taskStatus : containerRequestedTaskStatuses) {
          String hostName = taskStatus.getContainerHost();
          LOGGER.logInfo("Select: Checking task: [%s-%s] [%s]", taskStatus.getTaskRoleName(), taskStatus.getTaskIndex(), taskStatus.getTaskState());
          if (hostName == null) {
            continue;
          }
          LOGGER.logInfo("Select: Previous requested host: [%s]", hostName);
          if (hostName.equals(candidateNode.getHost())) {
            reusedNodes.add(candidateNode);            
            break;
          }
        }
      }
    }

上面这一段是目前我的实现,首先通过getTaskStatus拿到所有任务的taskStatus,这个数据结构里面包含了其分配到的hostName。这样只要扫一遍就知道前面任务的host分布了。

然而事情没这么简单,事实上,这里得到的hostName都是空。一开始我也不清楚是为什么,后来仔细理逻辑才发现,这里的状态转移非常谨慎。什么意思呢?

当所有的任务都都通过各种调度算法得到要调度上去的机器时,机器此时只是接到了通知,哦,你要上来。所以这时候,对应的容器还机器需要分配,如果分不出来。那就算你指定要上某台机器,没容器你也没地方上。

因此这个过程中有两个状态,CONTAINER_REQUESTED,CONTAINER_ALLOCATED。

当所有任务选择好机器时,就会转移到CONTAINER_REQUESTED。然而它们的TaskStatus里面的机器有关信息并没有更新,仍然是null。因为这是一个任务和机器的双向选择,如果一方不同意,那么就不能绑定这个任务到这台机器上。

这个“绑定”,实际对应到代码中是这样一个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  // Should call disassociateTaskWithContainer if associateTaskWithContainer failed
  private void associateTaskWithContainer(TaskStatusLocator locator, Container container, Map<String, Ports> portDefinitions) throws Exception {
    TaskStatus taskStatus = getTaskStatus(locator);
    String containerId = container.getId().toString();

    taskStatus.setContainerId(containerId);
    taskStatus.setContainerHost(container.getNodeId().getHost());
    taskStatus.setContainerIp(
        DnsUtils.resolveExternalIPv4Address(taskStatus.getContainerHost()));
    taskStatus.setContainerLogHttpAddress(
        HadoopUtils.getContainerLogHttpAddress(container.getNodeHttpAddress(), containerId, conf.getAmUser()));
    taskStatus.setContainerConnectionLostCount(0);
    taskStatus.setContainerGpus(
        ResourceDescriptor.fromResource(container.getResource()).getGpuAttribute());

    String portString = ValueRangeUtils.convertPortRangeToPortDefinitionsString(
        ResourceDescriptor.fromResource(container.getResource()).getPortRanges(), portDefinitions);
    taskStatus.setContainerPorts(portString);

    taskStatusesesChanged.put(locator.getTaskRoleName(), true);
  }

别的不看,看这个

1
    taskStatus.setContainerHost(container.getNodeId().getHost());

这是唯一一处设置taskStatus的hostName的地方。从名字可以看出,这个函数在任务与某个容器“关联”的时候才会执行。

再找一找,巧了,这个函数只有一次执行在StatusManager的transitionTaskState中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    if (!TaskStateDefinition.CONTAINER_ASSOCIATED_STATES.contains(srcState) &&
        TaskStateDefinition.CONTAINER_ASSOCIATED_STATES.contains(dstState)) {
      assert (event.getContainer() != null);

      String containerId = event.getContainer().getId().toString();
      try {
        associateTaskWithContainer(locator, event.getContainer(), event.getPortDefinitions());
        LOGGER.logInfo("Associated Task %s with Container %s", locator, containerId);
      } catch (Exception e) {
        disassociateTaskWithContainer(locator);

        // Mark as unchanged
        taskStatusesesChanged.put(locator.getTaskRoleName(), false);

        throw new Exception(
            String.format("Failed to associate Container %s to Task %s",
                containerId, locator), e);
      }
    }

条件说得很清楚了,只有初始状态不在CONTAINER_ASSOCIATED_STATES这个集合中,结束状态在CONTAINER_ASSOCIATED_STATES中时才会执行这个函数。

这个集合如下:

1
2
3
4
5
6
7
8
  public static final Set<TaskState> CONTAINER_ASSOCIATED_STATES = Collections.unmodifiableSet(
      new HashSet<>(Arrays.asList(
          TaskState.CONTAINER_ALLOCATED,
          TaskState.CONTAINER_LAUNCHED,
          TaskState.CONTAINER_RUNNING,
          TaskState.CONTAINER_COMPLETED,
          TaskState.TASK_COMPLETED
      )));

就很明确了,调度算法执行结束后,仅仅是从TASK_WAITING转移到了CONTAINER_REQUESTED,这个状态还没有关联容器,因此即使选好了机器,也不会记录。

那么我的工作就简单了,只要在不影响原始FSM的情况下加一个用于预测机器的数据结构就行了,只需要在调度过程中记录一下。