Skip to content

Commit

Permalink
perf(restarts): Optimized RestartStageHandler. (#4621)
Browse files Browse the repository at this point in the history
Co-authored-by: Valerii Sitkevich <[email protected]>
Co-authored-by: Jason <[email protected]>
  • Loading branch information
3 people authored Jan 29, 2024
1 parent a587515 commit 8c0c189
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ public List<StageExecution> allDownstreamStages() {
List<StageExecution> children = new ArrayList<>();

if (execution != null) {
HashSet<String> visited = new HashSet<>();
List<StageExecution> notVisited = new ArrayList<>(getExecution().getStages());
LinkedList<StageExecution> queue = new LinkedList<>();

queue.push(this);
Expand All @@ -563,11 +563,12 @@ public List<StageExecution> allDownstreamStages() {
}

first = false;
visited.add(stage.getRefId());
notVisited.remove(stage);

List<StageExecution> childStages = stage.downstreamStages();

childStages.stream().filter(s -> !visited.contains(s.getRefId())).forEach(queue::add);
notVisited.stream()
.filter(
s -> s.getRequisiteStageRefIds().contains(stage.getRefId()) && !queue.contains(s))
.forEach(queue::add);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,57 @@ class StageSpec extends Specification {
descendants.find {it.refId == "7"} != null
}

def "descendents do not multiply"() {
given:
def pipeline = pipeline {
stage {
refId = "0"
}
stage {
refId = "1"
requisiteStageRefIds = ["0"]
}
stage {
refId = "2"
requisiteStageRefIds = ["0"]
}
stage {
refId = "3"
requisiteStageRefIds = ["0"]
}
stage {
refId = "4"
requisiteStageRefIds = ["1", "2", "3"]
}
stage {
refId = "5"
requisiteStageRefIds = ["1", "2", "3"]
}
stage {
refId = "6"
requisiteStageRefIds = ["1", "2", "3"]
}
stage {
refId = "7"
requisiteStageRefIds = ["4", "5", "6"]
}
}

def stage = pipeline.stageByRef("0")

when:
def descendants = stage.allDownstreamStages()

then:
descendants.size() == 7
descendants.find {it.refId == "2"} != null
descendants.find {it.refId == "3"} != null
descendants.find {it.refId == "4"} != null
descendants.find {it.refId == "5"} != null
descendants.find {it.refId == "6"} != null
descendants.find {it.refId == "7"} != null
}

def "should not fail on no descendents"() {
given:
def pipeline = pipeline {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class RestartStageHandler(
if (topStage.status.isComplete || topStage.status == NOT_STARTED) {
topStage.addRestartDetails(message.user)
topStage.reset()
topStage.resetChildren()
if (stage.execution.shouldQueue()) {
// this pipeline is already running and has limitConcurrent = true
if (topStage.execution.status == NOT_STARTED) {
Expand Down Expand Up @@ -126,8 +127,10 @@ class RestartStageHandler(

removeSynthetics()
}
}

downstreamStages().forEach { it.reset() }
private fun StageExecution.resetChildren() {
allDownstreamStages().forEach { it.reset() }
}

private fun StageExecution.removeSynthetics() {
Expand Down

0 comments on commit 8c0c189

Please sign in to comment.