diff --git a/server/src/routes/invoke.rs b/server/src/routes/invoke.rs index bc8fb535c..2c35108ba 100644 --- a/server/src/routes/invoke.rs +++ b/server/src/routes/invoke.rs @@ -223,7 +223,7 @@ pub async fn invoke_with_object( if let Some(rx) = rx.as_mut() { loop { if let Ok(ev) = rx.recv().await { - if ev.invocation_id() == id || ev.invocation_id() == "" { + if ev.invocation_id() == id { yield Event::default().json_data(ev.clone()); if let InvocationStateChangeEvent::InvocationFinished(InvocationFinishedEvent{ id }) = ev { diff --git a/server/src/scheduler.rs b/server/src/scheduler.rs index e8e0ffddf..4aa155be4 100644 --- a/server/src/scheduler.rs +++ b/server/src/scheduler.rs @@ -49,7 +49,7 @@ impl Scheduler { let mut processed_state_change_ids = vec![]; let mut new_reduction_tasks = vec![]; let mut processed_reduction_tasks = vec![]; - let mut diagnostic_msgs = vec![]; + let mut placement_diagnostics = vec![]; let requires_task_allocation = state_change.change_type == ChangeType::TaskCreated || state_change.change_type == ChangeType::ExecutorAdded || state_change.change_type == ChangeType::ExecutorRemoved; @@ -86,7 +86,7 @@ impl Scheduler { if requires_task_allocation { let task_placement_result = self.task_allocator.schedule_unplaced_tasks()?; new_allocations.extend(task_placement_result.task_placements); - diagnostic_msgs.extend(task_placement_result.diagnostic_msgs); + placement_diagnostics.extend(task_placement_result.placement_diagnostics); } let scheduler_update_request = StateMachineUpdateRequest { @@ -97,7 +97,7 @@ impl Scheduler { new_reduction_tasks, processed_reduction_tasks, }, - diagnostic_msgs, + placement_diagnostics, }), state_changes_processed: processed_state_change_ids, }; diff --git a/server/state_store/src/invocation_events.rs b/server/state_store/src/invocation_events.rs index 901568d8b..361ea26fc 100644 --- a/server/state_store/src/invocation_events.rs +++ b/server/state_store/src/invocation_events.rs @@ -40,7 +40,10 @@ impl InvocationStateChangeEvent { InvocationStateChangeEvent::TaskCompleted(TaskCompleted { invocation_id, .. }) => { invocation_id.clone() } - InvocationStateChangeEvent::DiagnosticMessage(_) => "".to_string(), + InvocationStateChangeEvent::DiagnosticMessage(DiagnosticMessage { + invocation_id, + .. + }) => invocation_id.clone(), } } } @@ -64,6 +67,7 @@ pub struct TaskCreated { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct DiagnosticMessage { + pub invocation_id: String, pub message: String, } diff --git a/server/state_store/src/lib.rs b/server/state_store/src/lib.rs index 56f8a975c..3147e6471 100644 --- a/server/state_store/src/lib.rs +++ b/server/state_store/src/lib.rs @@ -463,12 +463,13 @@ impl IndexifyState { error!("failed to send invocation state change: {:?}", err); } } - for diagnostic_msg in &sched_update.diagnostic_msgs { + for diagnostic in &sched_update.placement_diagnostics { if let Err(err) = self.task_event_tx .send(InvocationStateChangeEvent::DiagnosticMessage( invocation_events::DiagnosticMessage { - message: diagnostic_msg.clone(), + invocation_id: diagnostic.task.invocation_id.clone(), + message: diagnostic.message.clone(), }, )) { @@ -824,7 +825,7 @@ mod tests { executor: executor_id.clone(), }], reduction_tasks: ReductionTasks::default(), - diagnostic_msgs: vec![], + placement_diagnostics: vec![], }), state_changes_processed: vec![], }) @@ -874,7 +875,7 @@ mod tests { executor: executor_id.clone(), }], reduction_tasks: ReductionTasks::default(), - diagnostic_msgs: vec![], + placement_diagnostics: vec![], }; indexify_state diff --git a/server/state_store/src/requests.rs b/server/state_store/src/requests.rs index 431887617..50c4a64a3 100644 --- a/server/state_store/src/requests.rs +++ b/server/state_store/src/requests.rs @@ -116,6 +116,12 @@ pub struct TaskPlacement { pub executor: ExecutorId, } +#[derive(Debug)] +pub struct TaskPlacementDiagnostic { + pub task: Task, + pub message: String, +} + #[derive(Default, Debug)] pub struct ReductionTasks { pub new_reduction_tasks: Vec, @@ -126,7 +132,7 @@ pub struct SchedulerUpdateRequest { pub task_requests: Vec, pub allocations: Vec, pub reduction_tasks: ReductionTasks, - pub diagnostic_msgs: Vec, + pub placement_diagnostics: Vec, } pub struct DeleteInvocationRequest { diff --git a/server/task_scheduler/src/lib.rs b/server/task_scheduler/src/lib.rs index dd9b716ee..4f50680ac 100644 --- a/server/task_scheduler/src/lib.rs +++ b/server/task_scheduler/src/lib.rs @@ -3,7 +3,10 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use data_model::{ExecutorId, Node, ReduceTask, RuntimeInformation, Task}; use rand::seq::SliceRandom; -use state_store::{requests::TaskPlacement, IndexifyState}; +use state_store::{ + requests::{TaskPlacement, TaskPlacementDiagnostic}, + IndexifyState, +}; use tracing::{error, info, span}; pub mod task_creator; @@ -38,7 +41,7 @@ pub struct FilteredExecutors { pub struct TaskPlacementResult { pub task_placements: Vec, - pub diagnostic_msgs: Vec, + pub placement_diagnostics: Vec, } struct ScheduleTaskResult { @@ -62,7 +65,7 @@ impl TaskScheduler { fn schedule_tasks(&self, tasks: Vec) -> Result { let mut task_placements = Vec::new(); - let mut diagnostic_msgs = Vec::new(); + let mut placement_diagnostics = Vec::new(); for task in tasks { let span = span!( tracing::Level::INFO, @@ -75,13 +78,18 @@ impl TaskScheduler { let _enter = span.enter(); info!("scheduling task {:?}", task.id); - match self.schedule_task(task) { + match self.schedule_task(task.clone()) { Ok(ScheduleTaskResult { task_placements: schedule_task_placements, diagnostic_msgs: schedule_diagnostic_msgs, }) => { task_placements.extend(schedule_task_placements); - diagnostic_msgs.extend(schedule_diagnostic_msgs); + placement_diagnostics.extend(schedule_diagnostic_msgs.iter().map(|msg| { + TaskPlacementDiagnostic { + task: task.clone(), + message: msg.clone(), + } + })); } Err(err) => { error!("failed to schedule task, skipping: {:?}", err); @@ -90,7 +98,7 @@ impl TaskScheduler { } Ok(TaskPlacementResult { task_placements, - diagnostic_msgs, + placement_diagnostics, }) }