diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 109d315d67a4..284db005c86c 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -402,8 +402,7 @@ fn start_postgres( ) -> Result<(Option, StartPostgresResult)> { // We got all we need, update the state. let mut state = compute.state.lock().unwrap(); - state.status = ComputeStatus::Init; - compute.state_changed.notify_all(); + state.set_status(ComputeStatus::Init, &compute.state_changed); info!( "running compute with features: {:?}", diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index ba7b4f37df0f..3e558a7d3ca0 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -109,6 +109,18 @@ impl ComputeState { metrics: ComputeMetrics::default(), } } + + pub fn set_status(&mut self, status: ComputeStatus, state_changed: &Condvar) { + let prev = self.status; + info!("Changing compute status from {} to {}", prev, status); + self.status = status; + state_changed.notify_all(); + } + + pub fn set_failed_status(&mut self, err: anyhow::Error, state_changed: &Condvar) { + self.error = Some(format!("{err:?}")); + self.set_status(ComputeStatus::Failed, state_changed); + } } impl Default for ComputeState { @@ -303,15 +315,12 @@ impl ComputeNode { pub fn set_status(&self, status: ComputeStatus) { let mut state = self.state.lock().unwrap(); - state.status = status; - self.state_changed.notify_all(); + state.set_status(status, &self.state_changed); } pub fn set_failed_status(&self, err: anyhow::Error) { let mut state = self.state.lock().unwrap(); - state.error = Some(format!("{err:?}")); - state.status = ComputeStatus::Failed; - self.state_changed.notify_all(); + state.set_failed_status(err, &self.state_changed); } pub fn get_status(&self) -> ComputeStatus { diff --git a/compute_tools/src/configurator.rs b/compute_tools/src/configurator.rs index 7bd0e4938df8..a2043529a192 100644 --- a/compute_tools/src/configurator.rs +++ b/compute_tools/src/configurator.rs @@ -24,8 +24,7 @@ fn configurator_main_loop(compute: &Arc) { // Re-check the status after waking up if state.status == ComputeStatus::ConfigurationPending { info!("got configuration request"); - state.status = ComputeStatus::Configuration; - compute.state_changed.notify_all(); + state.set_status(ComputeStatus::Configuration, &compute.state_changed); drop(state); let mut new_status = ComputeStatus::Failed; diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 43d29402bcfd..fade3bbe6d85 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -288,8 +288,7 @@ async fn handle_configure_request( return Err((msg, StatusCode::PRECONDITION_FAILED)); } state.pspec = Some(parsed_spec); - state.status = ComputeStatus::ConfigurationPending; - compute.state_changed.notify_all(); + state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed); drop(state); info!("set new spec and notified waiters"); } @@ -362,15 +361,15 @@ async fn handle_terminate_request(compute: &Arc) -> Result<(), (Str } if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running { let msg = format!( - "invalid compute status for termination request: {:?}", - state.status.clone() + "invalid compute status for termination request: {}", + state.status ); return Err((msg, StatusCode::PRECONDITION_FAILED)); } - state.status = ComputeStatus::TerminationPending; - compute.state_changed.notify_all(); + state.set_status(ComputeStatus::TerminationPending, &compute.state_changed); drop(state); } + forward_termination_signal(); info!("sent signal and notified waiters"); @@ -384,7 +383,8 @@ async fn handle_terminate_request(compute: &Arc) -> Result<(), (Str while state.status != ComputeStatus::Terminated { state = c.state_changed.wait(state).unwrap(); info!( - "waiting for compute to become Terminated, current status: {:?}", + "waiting for compute to become {}, current status: {:?}", + ComputeStatus::Terminated, state.status ); } diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index d05d625b0ad1..3f055b914a39 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -1,5 +1,7 @@ //! Structs representing the JSON formats used in the compute_ctl's HTTP API. +use std::fmt::Display; + use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize, Serializer}; @@ -58,6 +60,21 @@ pub enum ComputeStatus { Terminated, } +impl Display for ComputeStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ComputeStatus::Empty => f.write_str("empty"), + ComputeStatus::ConfigurationPending => f.write_str("configuration-pending"), + ComputeStatus::Init => f.write_str("init"), + ComputeStatus::Running => f.write_str("running"), + ComputeStatus::Configuration => f.write_str("configuration"), + ComputeStatus::Failed => f.write_str("failed"), + ComputeStatus::TerminationPending => f.write_str("termination-pending"), + ComputeStatus::Terminated => f.write_str("terminated"), + } + } +} + fn rfc3339_serialize(x: &Option>, s: S) -> Result where S: Serializer,