From e4b175c73025d4e6a568a5358b4d516b6e76a3cd Mon Sep 17 00:00:00 2001 From: Maxim Kozlovsky Date: Thu, 23 May 2024 19:14:44 -0400 Subject: [PATCH] handle executor removal If an executor is removed, attempt to reassign all unassigned tasks to surviving executors. --- src/coordinator.rs | 4 +++- src/scheduler/mod.rs | 14 ++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/coordinator.rs b/src/coordinator.rs index fb8febe73..2a855cb4d 100644 --- a/src/coordinator.rs +++ b/src/coordinator.rs @@ -535,7 +535,9 @@ impl Coordinator { indexify_internal_api::ChangeType::NewContent => { self.scheduler.create_new_tasks(change).await? } - indexify_internal_api::ChangeType::ExecutorRemoved => {} + indexify_internal_api::ChangeType::ExecutorRemoved => { + self.scheduler.handle_executor_removed(change).await? + } indexify_internal_api::ChangeType::NewGargabeCollectionTask => {} } } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 96d4d722c..f981b6108 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -71,6 +71,20 @@ impl Scheduler { } } + pub async fn handle_executor_removed(&self, state_change: StateChange) -> Result<()> { + let tasks = self.shared_state.unassigned_tasks().await?; + let plan = self.allocate_tasks(tasks).await?.0; + if !plan.is_empty() { + self.shared_state + .commit_task_assignments(plan, state_change.id) + .await + } else { + self.shared_state + .mark_change_events_as_processed(vec![state_change], Vec::new()) + .await + } + } + pub async fn create_new_tasks(&self, state_change: StateChange) -> Result<()> { let mut tasks: Vec = Vec::new(); let content = match self