已经开始改代码了,目前GPU调度的代码运行良好。调度结果符合预期,因此先卖个关子。考虑目前的难点:机器调度。
之前考虑的问题是:由于Task是逐个调度的,所以应该让一个任务尽可能调度到前面成功调度的节点上。于是就需要知道前面的任务都调到哪儿了。
StatusManager就是记录这个的,借助其记录状态的功能,可以方便的获得所有之前已经进行过调度的任务状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// If previous tasks get their containers requested on some nodes, the following tasks should reuse these nodes to reduce communication overheads.
Set<TaskState> taskStateSet = new HashSet<>();
List<TaskStatus> containerRequestedTaskStatuses = statusManager.getTaskStatus(taskStateSet, false);
List<Node> reusedNodes = new ArrayList<Node>();
if (!candidateNodes.isEmpty() && !containerRequestedTaskStatuses.isEmpty()) {
for (Node candidateNode : candidateNodes) {
for (TaskStatus taskStatus : containerRequestedTaskStatuses) {
String hostName = taskStatus.getContainerHost();
LOGGER.logInfo("Select: Checking task: [%s-%s] [%s]", taskStatus.getTaskRoleName(), taskStatus.getTaskIndex(), taskStatus.getTaskState());
if (hostName == null) {
continue;
}
LOGGER.logInfo("Select: Previous requested host: [%s]", hostName);
if (hostName.equals(candidateNode.getHost())) {
reusedNodes.add(candidateNode);
break;
}
}
}
}
上面这一段是目前我的实现,首先通过getTaskStatus
拿到所有任务的taskStatus,这个数据结构里面包含了其分配到的hostName。这样只要扫一遍就知道前面任务的host分布了。
然而事情没这么简单,事实上,这里得到的hostName都是空。一开始我也不清楚是为什么,后来仔细理逻辑才发现,这里的状态转移非常谨慎。什么意思呢?
当所有的任务都都通过各种调度算法得到要调度上去的机器时,机器此时只是接到了通知,哦,你要上来。所以这时候,对应的容器还机器需要分配,如果分不出来。那就算你指定要上某台机器,没容器你也没地方上。
因此这个过程中有两个状态,CONTAINER_REQUESTED,CONTAINER_ALLOCATED。
当所有任务选择好机器时,就会转移到CONTAINER_REQUESTED。然而它们的TaskStatus里面的机器有关信息并没有更新,仍然是null。因为这是一个任务和机器的双向选择,如果一方不同意,那么就不能绑定这个任务到这台机器上。
这个“绑定”,实际对应到代码中是这样一个函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Should call disassociateTaskWithContainer if associateTaskWithContainer failed
private void associateTaskWithContainer(TaskStatusLocator locator, Container container, Map<String, Ports> portDefinitions) throws Exception {
TaskStatus taskStatus = getTaskStatus(locator);
String containerId = container.getId().toString();
taskStatus.setContainerId(containerId);
taskStatus.setContainerHost(container.getNodeId().getHost());
taskStatus.setContainerIp(
DnsUtils.resolveExternalIPv4Address(taskStatus.getContainerHost()));
taskStatus.setContainerLogHttpAddress(
HadoopUtils.getContainerLogHttpAddress(container.getNodeHttpAddress(), containerId, conf.getAmUser()));
taskStatus.setContainerConnectionLostCount(0);
taskStatus.setContainerGpus(
ResourceDescriptor.fromResource(container.getResource()).getGpuAttribute());
String portString = ValueRangeUtils.convertPortRangeToPortDefinitionsString(
ResourceDescriptor.fromResource(container.getResource()).getPortRanges(), portDefinitions);
taskStatus.setContainerPorts(portString);
taskStatusesesChanged.put(locator.getTaskRoleName(), true);
}
别的不看,看这个
1
taskStatus.setContainerHost(container.getNodeId().getHost());
这是唯一一处设置taskStatus的hostName的地方。从名字可以看出,这个函数在任务与某个容器“关联”的时候才会执行。
再找一找,巧了,这个函数只有一次执行在StatusManager的transitionTaskState
中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if (!TaskStateDefinition.CONTAINER_ASSOCIATED_STATES.contains(srcState) &&
TaskStateDefinition.CONTAINER_ASSOCIATED_STATES.contains(dstState)) {
assert (event.getContainer() != null);
String containerId = event.getContainer().getId().toString();
try {
associateTaskWithContainer(locator, event.getContainer(), event.getPortDefinitions());
LOGGER.logInfo("Associated Task %s with Container %s", locator, containerId);
} catch (Exception e) {
disassociateTaskWithContainer(locator);
// Mark as unchanged
taskStatusesesChanged.put(locator.getTaskRoleName(), false);
throw new Exception(
String.format("Failed to associate Container %s to Task %s",
containerId, locator), e);
}
}
条件说得很清楚了,只有初始状态不在CONTAINER_ASSOCIATED_STATES这个集合中,结束状态在CONTAINER_ASSOCIATED_STATES中时才会执行这个函数。
这个集合如下:
1
2
3
4
5
6
7
8
public static final Set<TaskState> CONTAINER_ASSOCIATED_STATES = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(
TaskState.CONTAINER_ALLOCATED,
TaskState.CONTAINER_LAUNCHED,
TaskState.CONTAINER_RUNNING,
TaskState.CONTAINER_COMPLETED,
TaskState.TASK_COMPLETED
)));
就很明确了,调度算法执行结束后,仅仅是从TASK_WAITING转移到了CONTAINER_REQUESTED,这个状态还没有关联容器,因此即使选好了机器,也不会记录。
那么我的工作就简单了,只要在不影响原始FSM的情况下加一个用于预测机器的数据结构就行了,只需要在调度过程中记录一下。
This work is licensed under a
Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International license.