Skip to content

Commit

Permalink
optimit code
Browse files Browse the repository at this point in the history
  • Loading branch information
zeyu10 committed Oct 30, 2024
1 parent c669b70 commit 78489ea
Showing 1 changed file with 34 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ public static DAGWalkHelper getInstance() {
}

public Set<TaskInfo> getReadyToRunTasks(Collection<TaskInfo> taskInfos) {
// 根据依赖关系获取准备运行的任务
Set<TaskInfo> readyToRunTasks = getReadyToRunTasksByDependencies(taskInfos);
boolean isKeyMode = isKeyMode(taskInfos);
// 根据依赖关系获取准备运行的非流式输入任务
Set<TaskInfo> readyToRunTasks = getReadyToRunBlockInputTasks(taskInfos, isKeyMode);
// 添加准备运行的流式输入任务
addReadyToRunStreamInputTasks(taskInfos, readyToRunTasks);
addReadyToRunStreamInputTasks(taskInfos, readyToRunTasks, isKeyMode);
return readyToRunTasks;
}

Expand All @@ -64,44 +65,26 @@ public Set<TaskInfo> getReadyToRunTasks(Collection<TaskInfo> taskInfos) {
* 1. 当前任务不为空且状态为未开始
* 2. 依赖任务全部完成(如果在关键路径模式下,则包括关键路径完成)
*/
private Set<TaskInfo> getReadyToRunTasksByDependencies(Collection<TaskInfo> taskInfos) {
boolean isKeyMode = isKeyMode(taskInfos);
private Set<TaskInfo> getReadyToRunBlockInputTasks(Collection<TaskInfo> taskInfos, boolean isKeyMode) {
return taskInfos.stream()
.filter(taskInfo -> taskInfo != null && taskInfo.getTaskStatus() == TaskStatus.NOT_STARTED)
.filter(Objects::nonNull)
.filter(taskInfo -> taskInfo.getTaskStatus() == TaskStatus.NOT_STARTED)
.filter(taskInfo -> TaskInputOutputType.getTypeByValue(taskInfo.getTask().getInputType()) == TaskInputOutputType.BLOCK)
.filter(taskInfo -> isDependenciesAllSuccessOrSkip(taskInfo, isKeyMode))
.collect(Collectors.toSet());
}

/**
* 判断依赖的所有任务是否都已完成
* 1. 如果没有依赖,说明依赖的所有任务都已完成
* 2. 流式输入任务,有任意依赖的 block 输出任务完成或关键路径下完成
* 3. 非流式输入任务,所有依赖的 block 输出任务是否都已经完成,或在关键路径模式下关键路径完成或跳过
* 非流式输入任务,所有依赖的 block 输出任务都已经完成,或在关键路径模式下关键路径完成或跳过,则任务可以运行
*/
private boolean isDependenciesAllSuccessOrSkip(TaskInfo taskInfo, boolean isKeyMode) {
// 1. 没有依赖视为依赖均已完成
if (CollectionUtils.isEmpty(taskInfo.getDependencies())) {
return true;
}
TaskInputOutputType inputType = TaskInputOutputType.getTypeByValue(taskInfo.getTask().getInputType());
boolean isTaskKeyMode = isKeyMode && taskInfo.getTask().isKeyCallback();
if (inputType == TaskInputOutputType.STREAM) {
// 2. 流式输入任务,任意依赖的流式任务开始执行或者非流式任务已完成(非关键路径模式下完成或跳过,或者在关键路径模式下关键路径完成或跳过)
return taskInfo.getDependencies().stream().anyMatch(dependency -> {
TaskInputOutputType dependencyOutputType = TaskInputOutputType.getTypeByValue(dependency.getTask().getOutputType());
boolean streamDependencyStarted = false;
boolean blockDependencyFinished = false;
if (dependencyOutputType == TaskInputOutputType.STREAM) {
streamDependencyStarted = dependency.getTaskStatus() != TaskStatus.NOT_STARTED;
} else {
blockDependencyFinished = isTaskSuccessOrSkip(dependency, isTaskKeyMode);
}
return streamDependencyStarted || blockDependencyFinished;
});
} else {
// 3. 非流式输入任务,所有依赖任务是否都已完成(非关键路径模式下完成或跳过,或者在关键路径模式下关键路径完成或跳过)
return taskInfo.getDependencies().stream().allMatch(dependency -> isTaskSuccessOrSkip(dependency, isTaskKeyMode));
}
// 2. 非流式输入任务,所有依赖任务是否都已完成(非关键路径模式下完成或跳过,或者在关键路径模式下关键路径完成或跳过)
return taskInfo.getDependencies().stream().allMatch(dependency -> isTaskSuccessOrSkip(dependency, isTaskKeyMode));
}

private boolean isTaskSuccessOrSkip(TaskInfo taskInfo, boolean isTaskKeyMode) {
Expand All @@ -115,23 +98,40 @@ private boolean isTaskSuccessOrSkip(TaskInfo taskInfo, boolean isTaskKeyMode) {
* @param taskInfos 所有任务的集合
* @param readyToRunTasks 已准备运行的任务集合
*/
private void addReadyToRunStreamInputTasks(Collection<TaskInfo> taskInfos, Set<TaskInfo> readyToRunTasks) {
private void addReadyToRunStreamInputTasks(Collection<TaskInfo> taskInfos, Set<TaskInfo> readyToRunTasks, boolean isKeyMode) {
Set<TaskInfo> readyToRunStreamTasks = new HashSet<>();
taskInfos.stream().filter(Objects::nonNull).filter(taskInfo -> taskInfo.getTaskStatus() == TaskStatus.NOT_STARTED)
.filter(taskInfo -> TaskInputOutputType.getTypeByValue(taskInfo.getTask().getInputType()) == TaskInputOutputType.STREAM)
.forEach(taskInfo -> {
boolean needRun = taskInfo.getDependencies().stream().anyMatch(dependency -> {
TaskInputOutputType dependencyOutputType = TaskInputOutputType.getTypeByValue(dependency.getTask().getOutputType());
// 如果依赖任务是流输出类型且准备运行,则将当前任务添加到准备运行的流任务集合中
return dependencyOutputType == TaskInputOutputType.STREAM && readyToRunTasks.contains(dependency);
});
boolean isTaskKeyMode = isKeyMode && taskInfo.getTask().isKeyCallback();
boolean needRun = taskInfo.getDependencies().stream()
.anyMatch(dependency -> isStreamTaskReadyToRun(readyToRunTasks, dependency, isTaskKeyMode));
if (needRun) {
readyToRunStreamTasks.add(taskInfo);
}
});
readyToRunTasks.addAll(readyToRunStreamTasks);
}

/**
* 判断流式输入任务是否可执行,只要有任何依赖符合以下任一条件,流式输入任务就可以执行
* 1. 任意依赖的流式输出任务开始执行或者准备被执行
* 2. 任意依赖的非流式输出任务执行完成(包括关键路径模式下关键路径执行完成)
*/
private boolean isStreamTaskReadyToRun(Set<TaskInfo> readyToRunTasks, TaskInfo dependency, boolean isTaskKeyMode) {
TaskInputOutputType dependencyOutputType = TaskInputOutputType.getTypeByValue(dependency.getTask().getOutputType());
boolean streamDependencyStarted = false;
boolean blockDependencyFinished = false;
if (dependencyOutputType == TaskInputOutputType.STREAM) {
// 1. 任意依赖的流式输出任务开始执行或者准备被执行
streamDependencyStarted = dependency.getTaskStatus() != TaskStatus.NOT_STARTED || readyToRunTasks.contains(dependency);
} else {
// 2. 任意依赖的非流式输出任务执行完成(包括关键路径模式下关键路径执行完成)
blockDependencyFinished = isTaskSuccessOrSkip(dependency, isTaskKeyMode);
}
return streamDependencyStarted || blockDependencyFinished;
}

private boolean isKeyMode(Collection<TaskInfo> allTasks) {
return allTasks.stream().map(TaskInfo::getTaskStatus).anyMatch(TaskStatus::isKeyModeStatus);
}
Expand Down

0 comments on commit 78489ea

Please sign in to comment.