Skip to content

Commit

Permalink
handle executor removal (#621)
Browse files Browse the repository at this point in the history
If an executor is removed, attempt to reassign all unassigned tasks to
surviving executors.
  • Loading branch information
maxkozlovsky authored May 24, 2024
1 parent e57a5f6 commit c0822dc
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
4 changes: 3 additions & 1 deletion src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,9 @@ impl Coordinator {
indexify_internal_api::ChangeType::NewContent => {
self.scheduler.create_new_tasks(change).await?
}
indexify_internal_api::ChangeType::ExecutorRemoved => {}
indexify_internal_api::ChangeType::ExecutorRemoved => {
self.scheduler.handle_executor_removed(change).await?
}
indexify_internal_api::ChangeType::NewGargabeCollectionTask => {}
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ impl Scheduler {
}
}

pub async fn handle_executor_removed(&self, state_change: StateChange) -> Result<()> {
let tasks = self.shared_state.unassigned_tasks().await?;
let plan = self.allocate_tasks(tasks).await?.0;
if !plan.is_empty() {
self.shared_state
.commit_task_assignments(plan, state_change.id)
.await
} else {
self.shared_state
.mark_change_events_as_processed(vec![state_change], Vec::new())
.await
}
}

pub async fn create_new_tasks(&self, state_change: StateChange) -> Result<()> {
let mut tasks: Vec<internal_api::Task> = Vec::new();
let content = match self
Expand Down

0 comments on commit c0822dc

Please sign in to comment.