diff --git a/server/data_model/src/lib.rs b/server/data_model/src/lib.rs index 0f0156037..bd2d48025 100644 --- a/server/data_model/src/lib.rs +++ b/server/data_model/src/lib.rs @@ -67,6 +67,10 @@ impl ExecutorId { pub fn get(&self) -> &str { &self.0 } + + pub fn executor_key(&self) -> String { + self.0.to_string() + } } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -911,7 +915,7 @@ pub struct ExecutorMetadata { impl ExecutorMetadata { pub fn key(&self) -> String { - format!("{}", self.id) + self.id.executor_key() } } diff --git a/server/state_store/src/state_machine.rs b/server/state_store/src/state_machine.rs index 57a461861..1dae24557 100644 --- a/server/state_store/src/state_machine.rs +++ b/server/state_store/src/state_machine.rs @@ -413,7 +413,6 @@ pub(crate) fn delete_input_data_object( Ok(()) } -// TODO: Do this in a transaction. pub(crate) fn create_or_update_compute_graph( db: Arc, txn: &Transaction, @@ -735,7 +734,16 @@ pub fn allocate_tasks( executor_id: &ExecutorId, sm_metrics: Arc, ) -> Result<()> { - // TODO: check if executor is registered + if txn + .get_for_update_cf( + &IndexifyObjectsColumns::Executors.cf_db(&db), + executor_id.executor_key(), + false, + )? + .is_none() + { + return Err(anyhow!("executor not found: {}", executor_id)); + } txn.put_cf( &IndexifyObjectsColumns::TaskAllocations.cf_db(&db), task.make_allocation_key(executor_id),