qizhi的任务调度机制(二)

进入调度函数

CYQ February 7, 2019 views 7625 words

上次讲到selectSingleNode这个函数进行了节点选择,具体看看它的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public synchronized SelectionResult selectSingleNode(String taskRoleName) throws NotAvailableException {
    SelectionResult results = select(taskRoleName);
    if (results.getNodeHosts().size() > 1) {
      // Random pick a host from the result set to avoid conflicted requests for concurrent container requests from different jobs
      ResourceDescriptor optimizedRequestResource = results.getOptimizedResource();
      String candidateNode = results.getNodeHosts().get(CommonUtils.getRandomNumber(0, results.getNodeHosts().size() - 1));
      optimizedRequestResource.setGpuAttribute(results.getGpuAttribute(candidateNode));

      // re-create single node result object.
      SelectionResult result = new SelectionResult();
      result.addSelection(candidateNode, results.getGpuAttribute(candidateNode), results.getOverlapPorts());
      result.setOptimizedResource(optimizedRequestResource);
      return result;
    }
    return results;
  }

注意这一段代码加了互斥锁,因此同时至多进行一个任务的资源分配调度。首先会调用select函数并且返回一个SelectionResult,进去看一下这个数据结构就知道,重要的是Map<String, Long>和ResourceDescriptor,前者表示如果选择某个节点,该节点的gpuAttribute会变成什么样(位掩码),后者则是其他请求资源(CPU,内存等)。

之所以分开这两部分,是因为当前qizhi使用的Hadoop 2.7.2还不支持GPU资源的分配,因此这一部分实际上不由Hadoop维护,而是yarn来跟踪。

之后会从所有候选节点中随机选择一个节点,返回只有一个节点的SelectionResult,这里的随机显然是需要修改的,不然碰到GPU的分布式训练任务时,如果子任务运气不好,没能分配到同一台机器,子任务GPU之间的通信开销会非常大。

当然还有另一种思路,让select返回的就只有一个节点,这样随机来随机去就只有一个节点可供选择,就不用动这里的代码了。

说了这么多,应该进到select里面去了。

select()

这里面会根据历史数据和用户配置初步生成一些资源的选择。通俗来说,如果用户自己拿捏得好节点、GPU、端口等,那当然不需要集群再去操心。

如果用户比较懒,把这些地方都留空了,那么就完全依赖接下来要进到的内层select函数里面去了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@VisibleForTesting
  public synchronized SelectionResult select(ResourceDescriptor requestResource, String requestNodeLabel, String requestNodeGpuType,
      int startStatesTaskCount, List<ValueRange> reusePorts, Map<String, NodeConfiguration> configuredNodes) throws NotAvailableException {

    LOGGER.logInfo(
        "select: Request: Resource: [%s], NodeLabel: [%s], NodeGpuType: [%s], StartStatesTaskCount: [%d], ReusePorts: [%s]",
        requestResource, requestNodeLabel, requestNodeGpuType, startStatesTaskCount, ValueRangeUtils.toString(reusePorts));
    
    initFilteredNodes();
    filterNodesByNodeLabel(requestNodeLabel);
    filterNodesByGpuType(configuredNodes, requestNodeGpuType);
    if (!conf.getAmAllowNoneGpuJobOnGpuNode()) {
      int jobTotalRequestGpu = requestManager.getTotalGpuCount();
      filterNodesForNoneGpuJob(jobTotalRequestGpu);
    }
    
    ResourceDescriptor optimizedRequestResource = YamlUtils.deepCopy(requestResource, ResourceDescriptor.class);
    if (ValueRangeUtils.getValueNumber(reusePorts) > 0) {
      LOGGER.logInfo(
          "select: reuse pre-selected ports: [%s]", ValueRangeUtils.toString(reusePorts));
      optimizedRequestResource.setPortRanges(reusePorts);
    }
    if (optimizedRequestResource.getPortNumber() > 0 && ValueRangeUtils.getValueNumber(optimizedRequestResource.getPortRanges()) <= 0) {
      //If port is required and the portRange is not set in previous steps, allocate port ranges from all candidate nodes.
      List<ValueRange> portRanges = selectPortsFromFilteredNodes(optimizedRequestResource);
      LOGGER.logInfo(
          "select: select ports from all filteredNodes  :  [%s]", ValueRangeUtils.toString(portRanges));
      if (ValueRangeUtils.getValueNumber(portRanges) == optimizedRequestResource.getPortNumber()) {
        optimizedRequestResource.setPortRanges(portRanges);
      }
    }
    
    filterNodesByResource(optimizedRequestResource, conf.getAmSkipLocalTriedResource());
    
    filterNodesByRackSelectionPolicy(optimizedRequestResource, startStatesTaskCount);
    if (filteredNodes.size() < 1) {
      // Don't have candidate nodes for this request.
      if (requestNodeGpuType != null) {
        // GpuType relax is not supported in yarn, the gpuType is specified, abort this request and try later.
        throw new NotAvailableException(String.format("Don't have enough nodes to meet GpuType request: optimizedRequestResource: [%s], NodeGpuType: [%s], NodeLabel: [%s]",
            optimizedRequestResource, requestNodeGpuType, requestNodeLabel));
      }
      if (optimizedRequestResource.getPortNumber() > 0 && ValueRangeUtils.getValueNumber(optimizedRequestResource.getPortRanges()) <= 0) {
        // Port relax is not supported in yarn, The portNumber is specified, but the port range is not selected, abort this request and try later.
        throw new NotAvailableException(String.format("Don't have enough nodes to meet Port request: optimizedRequestResource: [%s], NodeGpuType: [%s], NodeLabel: [%s]",
            optimizedRequestResource, requestNodeGpuType, requestNodeLabel));
      }
    }
    SelectionResult selectionResult = selectNodes(optimizedRequestResource, startStatesTaskCount);
    //If port is not previous selected, select ports from the selectionResult.
    List<ValueRange> portRanges = selectPorts(selectionResult, optimizedRequestResource);
    optimizedRequestResource.setPortRanges(portRanges);
    selectionResult.setOptimizedResource(optimizedRequestResource);
    return selectionResult;
  }

这个函数会对节点进行很多次筛选,一个函数一个函数来看干了啥。

initFilteredNodes

初始化filteredNodes为所有节点,这是当然,因为一开始没有一个节点被筛。

filterNodesByNodeLabel

如果用户指定了运行节点的NodeLabel,那当然听用户的,不符合要求的节点筛掉。不过就现在的情况来看,用这个功能筛节点的人不多,毕竟人得先知道有哪些NodeLabel。

filterNodesByGpuType

用户可以指定GPU类型,目前北京这边全是K-80没得选

filterNodesForNoneGpuJob

集群默认有一个配置项,如果任务不需要GPU,就跳过GPU节点。这个想法是好的,避免CPU任务占住GPU节点,但是代码实现逻辑根本不可能出现这种情况,因为不用GPU的任务根本就不会调用select函数。

接下来是一段端口号的分配代码,没有写函数,直接inline

filterNodesByResource

这里根据具体每个节点资源是否满足需求来进一步筛选,甚至还有一个剔除曾经尝试失败的节点的选项。这个需求很硬,不可能改动。

filterNodesByRackSelectionPolicy

这是一个留空的方法,TODO的意思是根据节点的GPU使用情况筛选,这是一个可以下功夫的点。

1
2
3
private void filterNodesByRackSelectionPolicy(ResourceDescriptor requestResource, int startStatesTaskCount) {
    //TODO: Node GPU policy filter the nodes;
  }

接下来又是选端口的代码,跳过。

跳过之后就不再是筛选函数了,换句话说,剩下的节点都满足要求,怎么选就是一个策略问题了。进入更核心的的一个资源调度函数:

selectNodes

1
2
3
4
  private SelectionResult selectNodes(ResourceDescriptor requestResource, int startStatesTaskCount) {
    //TODO: apply other node selection policy in the future;
    return selectNodesByJobPacking(requestResource, startStatesTaskCount);
  }

这里可以实现很多个筛选策略,我们先看默认策略selectNodesByJobPacking

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  //Default Node Selection strategy.
  private SelectionResult selectNodesByJobPacking(ResourceDescriptor requestResource, int startStatesTaskCount) {
    int requestNumber = startStatesTaskCount * conf.getAmSearchNodeBufferFactor();
    List<Node> candidateNodes = new ArrayList<Node>();
    SelectionResult result = new SelectionResult();

    for (String nodeName : filteredNodes) {
      candidateNodes.add(allNodes.get(nodeName));
    }
    Collections.sort(candidateNodes);
    for (int i = 0; i < requestNumber && i < candidateNodes.size(); i++) {
      Node node = candidateNodes.get(i);
      Long gpuAttribute = requestResource.getGpuAttribute();
      if (gpuAttribute == 0) {
        gpuAttribute = selectCandidateGpuAttribute(node, requestResource.getGpuNumber());
      }
      result.addSelection(node.getHost(), gpuAttribute, node.getAvailableResource().getPortRanges());
    }
    return result;
  }

这里有一个factor叫做AmSearchNodeBufferFactor,默认值为2,大致意思是当前TaskRole剩下$k$个Task没有被分配时,应该选择$2k$个节点备用。这是一个很迷的操作,因为无论选择多少个节点,这次函数调用也只会分配一个Task的资源,当然也只可能在某一个节点申请资源,那么这个2倍的意义何在呢?需要找代码作者讨论

selectCandidateGpuAttribute是GPU选择策略,默认的naïve策略已经被我改成基于拓扑结构的调度策略了。之后再提。

因此可以看到,如果要增加机器选择的策略,在filterNodesByRackSelectionPolicy或者selectNodes中添加都是不错的选择。根据代码作者的本意,在selectNodes中加一个选择函数更加合适。