Skip to content

Commit

Permalink
feat(server): prevent allocating to unregistered executors
Browse files Browse the repository at this point in the history
  • Loading branch information
seriousben committed Dec 20, 2024
1 parent 208c2e8 commit 03cb0c3
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
6 changes: 5 additions & 1 deletion server/data_model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ impl ExecutorId {
pub fn get(&self) -> &str {
&self.0
}

pub fn executor_key(&self) -> String {
self.0.to_string()
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
Expand Down Expand Up @@ -911,7 +915,7 @@ pub struct ExecutorMetadata {

impl ExecutorMetadata {
pub fn key(&self) -> String {
format!("{}", self.id)
self.id.executor_key()
}
}

Expand Down
12 changes: 10 additions & 2 deletions server/state_store/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ pub(crate) fn delete_input_data_object(
Ok(())
}

// TODO: Do this in a transaction.
pub(crate) fn create_or_update_compute_graph(
db: Arc<TransactionDB>,
txn: &Transaction<TransactionDB>,
Expand Down Expand Up @@ -735,7 +734,16 @@ pub fn allocate_tasks(
executor_id: &ExecutorId,
sm_metrics: Arc<StateStoreMetrics>,
) -> Result<()> {
// TODO: check if executor is registered
if txn
.get_for_update_cf(
&IndexifyObjectsColumns::Executors.cf_db(&db),
executor_id.executor_key(),
false,
)?
.is_none()
{
return Err(anyhow!("executor not found: {}", executor_id));
}
txn.put_cf(
&IndexifyObjectsColumns::TaskAllocations.cf_db(&db),
task.make_allocation_key(executor_id),
Expand Down

0 comments on commit 03cb0c3

Please sign in to comment.