Skip to content

Commit

Permalink
Bug fixes (#1072)
Browse files Browse the repository at this point in the history
* add check to ensure we don't update tasks if the invocation was deleted

* checking if graph context is available

* making the scheduler move on if tasks are not found
  • Loading branch information
diptanu authored Dec 1, 2024
1 parent 363c87a commit 922d329
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 20 deletions.
35 changes: 31 additions & 4 deletions server/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,40 @@ impl Scheduler {
let task = self
.indexify_state
.reader()
.get_task_from_finished_event(task_finished_event)?
.ok_or(anyhow!("task not found {}", task_finished_event.task_id))?;
.get_task_from_finished_event(task_finished_event)
.map_err(|e| {
error!("error getting task from finished event: {:?}", e);
e
})?;
if task.is_none() {
error!(
"task not found for task finished event: {}",
task_finished_event.task_id
);
continue;
}
let task =
task.ok_or(anyhow!("task not found: {}", task_finished_event.task_id))?;
let compute_graph = self
.indexify_state
.reader()
.get_compute_graph(&task.namespace, &task.compute_graph_name)?
.ok_or(anyhow!("compute graph not found"))?;
.get_compute_graph(&task.namespace, &task.compute_graph_name)
.map_err(|e| {
error!("error getting compute graph: {:?}", e);
e
})?;
if compute_graph.is_none() {
error!(
"compute graph not found: {:?} {:?}",
task.namespace, task.compute_graph_name
);
continue;
}
let compute_graph = compute_graph.ok_or(anyhow!(
"compute graph not found: {:?} {:?}",
task.namespace,
task.compute_graph_name
))?;
Some(
handle_task_finished(self.indexify_state.clone(), task, compute_graph)
.await?,
Expand Down
14 changes: 14 additions & 0 deletions server/task_scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ pub struct TaskCreationResult {
pub invocation_id: String,
}

impl TaskCreationResult {
pub fn no_tasks(namespace: &str, compute_graph: &str, invocation_id: &str) -> Self {
Self {
namespace: namespace.to_string(),
compute_graph: compute_graph.to_string(),
tasks: vec![],
new_reduction_tasks: vec![],
processed_reduction_tasks: vec![],
invocation_finished: false,
invocation_id: invocation_id.to_string(),
}
}
}

pub struct FilteredExecutors {
pub executors: Vec<ExecutorId>,
pub diagnostic_msgs: Vec<String>,
Expand Down
40 changes: 24 additions & 16 deletions server/task_scheduler/src/task_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,32 +61,40 @@ pub async fn handle_task_finished(
&task.namespace,
&task.compute_graph_name,
&task.invocation_id,
)?
.ok_or(anyhow!(
"invocation context not found for invocation_id {}",
task.invocation_id
))?;
)
.map_err(|e| {
anyhow!(
"error getting invocation context for invocation {}: {:?}",
task.invocation_id,
e
)
})?;
if invocation_ctx.is_none() {
return Ok(TaskCreationResult::no_tasks(
&task.namespace,
&task.compute_graph_name,
&task.invocation_id,
));
}
let invocation_ctx = invocation_ctx.ok_or(anyhow!(
"invocation context not found for invocation_id {}",
task.invocation_id
))?;

if task.outcome == TaskOutcome::Failure {
let mut invocation_finished = false;
if invocation_ctx.outstanding_tasks == 0 {
invocation_finished = true;
}

info!(
"Task failed, graph invocation: {:?} {}",
task.compute_graph_name, invocation_finished
);

return Ok(TaskCreationResult {
namespace: task.namespace.clone(),
compute_graph: task.compute_graph_name.clone(),
invocation_id: task.invocation_id.clone(),
tasks: vec![],
invocation_finished,
new_reduction_tasks: vec![],
processed_reduction_tasks: vec![],
});
return Ok(TaskCreationResult::no_tasks(
&task.namespace,
&task.compute_graph_name,
&task.invocation_id,
));
}
let mut new_tasks = vec![];
let mut new_reduction_tasks = vec![];
Expand Down

0 comments on commit 922d329

Please sign in to comment.