Skip to content

Commit

Permalink
fix(server): stop returning diagnostic messages of all invocations be…
Browse files Browse the repository at this point in the history
…ing placed (#1132)
  • Loading branch information
seriousben authored Dec 21, 2024
1 parent 15f6b4f commit 74700c7
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 16 deletions.
2 changes: 1 addition & 1 deletion server/src/routes/invoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ pub async fn invoke_with_object(
if let Some(rx) = rx.as_mut() {
loop {
if let Ok(ev) = rx.recv().await {
if ev.invocation_id() == id || ev.invocation_id() == "" {
if ev.invocation_id() == id {
yield Event::default().json_data(ev.clone());

if let InvocationStateChangeEvent::InvocationFinished(InvocationFinishedEvent{ id }) = ev {
Expand Down
6 changes: 3 additions & 3 deletions server/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl Scheduler {
let mut processed_state_change_ids = vec![];
let mut new_reduction_tasks = vec![];
let mut processed_reduction_tasks = vec![];
let mut diagnostic_msgs = vec![];
let mut placement_diagnostics = vec![];
let requires_task_allocation = state_change.change_type == ChangeType::TaskCreated ||
state_change.change_type == ChangeType::ExecutorAdded ||
state_change.change_type == ChangeType::ExecutorRemoved;
Expand Down Expand Up @@ -86,7 +86,7 @@ impl Scheduler {
if requires_task_allocation {
let task_placement_result = self.task_allocator.schedule_unplaced_tasks()?;
new_allocations.extend(task_placement_result.task_placements);
diagnostic_msgs.extend(task_placement_result.diagnostic_msgs);
placement_diagnostics.extend(task_placement_result.placement_diagnostics);
}

let scheduler_update_request = StateMachineUpdateRequest {
Expand All @@ -97,7 +97,7 @@ impl Scheduler {
new_reduction_tasks,
processed_reduction_tasks,
},
diagnostic_msgs,
placement_diagnostics,
}),
state_changes_processed: processed_state_change_ids,
};
Expand Down
6 changes: 5 additions & 1 deletion server/state_store/src/invocation_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ impl InvocationStateChangeEvent {
InvocationStateChangeEvent::TaskCompleted(TaskCompleted { invocation_id, .. }) => {
invocation_id.clone()
}
InvocationStateChangeEvent::DiagnosticMessage(_) => "".to_string(),
InvocationStateChangeEvent::DiagnosticMessage(DiagnosticMessage {
invocation_id,
..
}) => invocation_id.clone(),
}
}
}
Expand All @@ -64,6 +67,7 @@ pub struct TaskCreated {

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DiagnosticMessage {
pub invocation_id: String,
pub message: String,
}

Expand Down
9 changes: 5 additions & 4 deletions server/state_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,12 +463,13 @@ impl IndexifyState {
error!("failed to send invocation state change: {:?}", err);
}
}
for diagnostic_msg in &sched_update.diagnostic_msgs {
for diagnostic in &sched_update.placement_diagnostics {
if let Err(err) =
self.task_event_tx
.send(InvocationStateChangeEvent::DiagnosticMessage(
invocation_events::DiagnosticMessage {
message: diagnostic_msg.clone(),
invocation_id: diagnostic.task.invocation_id.clone(),
message: diagnostic.message.clone(),
},
))
{
Expand Down Expand Up @@ -824,7 +825,7 @@ mod tests {
executor: executor_id.clone(),
}],
reduction_tasks: ReductionTasks::default(),
diagnostic_msgs: vec![],
placement_diagnostics: vec![],
}),
state_changes_processed: vec![],
})
Expand Down Expand Up @@ -874,7 +875,7 @@ mod tests {
executor: executor_id.clone(),
}],
reduction_tasks: ReductionTasks::default(),
diagnostic_msgs: vec![],
placement_diagnostics: vec![],
};

indexify_state
Expand Down
8 changes: 7 additions & 1 deletion server/state_store/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ pub struct TaskPlacement {
pub executor: ExecutorId,
}

#[derive(Debug)]
pub struct TaskPlacementDiagnostic {
pub task: Task,
pub message: String,
}

#[derive(Default, Debug)]
pub struct ReductionTasks {
pub new_reduction_tasks: Vec<ReduceTask>,
Expand All @@ -126,7 +132,7 @@ pub struct SchedulerUpdateRequest {
pub task_requests: Vec<CreateTasksRequest>,
pub allocations: Vec<TaskPlacement>,
pub reduction_tasks: ReductionTasks,
pub diagnostic_msgs: Vec<String>,
pub placement_diagnostics: Vec<TaskPlacementDiagnostic>,
}

pub struct DeleteInvocationRequest {
Expand Down
20 changes: 14 additions & 6 deletions server/task_scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::sync::Arc;
use anyhow::{anyhow, Result};
use data_model::{ExecutorId, Node, ReduceTask, RuntimeInformation, Task};
use rand::seq::SliceRandom;
use state_store::{requests::TaskPlacement, IndexifyState};
use state_store::{
requests::{TaskPlacement, TaskPlacementDiagnostic},
IndexifyState,
};
use tracing::{error, info, span};

pub mod task_creator;
Expand Down Expand Up @@ -38,7 +41,7 @@ pub struct FilteredExecutors {

pub struct TaskPlacementResult {
pub task_placements: Vec<TaskPlacement>,
pub diagnostic_msgs: Vec<String>,
pub placement_diagnostics: Vec<TaskPlacementDiagnostic>,
}

struct ScheduleTaskResult {
Expand All @@ -62,7 +65,7 @@ impl TaskScheduler {

fn schedule_tasks(&self, tasks: Vec<Task>) -> Result<TaskPlacementResult> {
let mut task_placements = Vec::new();
let mut diagnostic_msgs = Vec::new();
let mut placement_diagnostics = Vec::new();
for task in tasks {
let span = span!(
tracing::Level::INFO,
Expand All @@ -75,13 +78,18 @@ impl TaskScheduler {
let _enter = span.enter();

info!("scheduling task {:?}", task.id);
match self.schedule_task(task) {
match self.schedule_task(task.clone()) {
Ok(ScheduleTaskResult {
task_placements: schedule_task_placements,
diagnostic_msgs: schedule_diagnostic_msgs,
}) => {
task_placements.extend(schedule_task_placements);
diagnostic_msgs.extend(schedule_diagnostic_msgs);
placement_diagnostics.extend(schedule_diagnostic_msgs.iter().map(|msg| {
TaskPlacementDiagnostic {
task: task.clone(),
message: msg.clone(),
}
}));
}
Err(err) => {
error!("failed to schedule task, skipping: {:?}", err);
Expand All @@ -90,7 +98,7 @@ impl TaskScheduler {
}
Ok(TaskPlacementResult {
task_placements,
diagnostic_msgs,
placement_diagnostics,
})
}

Expand Down

0 comments on commit 74700c7

Please sign in to comment.