From 7f71ee4bd215c809f86bb99d1223c6f85cbb1348 Mon Sep 17 00:00:00 2001 From: Paul-Nicolas Madelaine Date: Sat, 30 Sep 2023 14:26:59 +0200 Subject: [PATCH] server: graceful shutdown --- typhon/src/lib.rs | 12 ++++++++++++ typhon/src/main.rs | 13 ++++++++++--- typhon/src/tasks.rs | 30 ++++++++++++++++++++++++++++-- 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/typhon/src/lib.rs b/typhon/src/lib.rs index c6aa36e2..d9bc9e8b 100644 --- a/typhon/src/lib.rs +++ b/typhon/src/lib.rs @@ -284,3 +284,15 @@ pub fn log_event(event: Event) { LISTENERS.lock().await.log(event); }); } + +pub async fn shutdown() { + let res = tokio::signal::ctrl_c().await; + eprintln!("Typhon is shutting down..."); + let _ = res.map_err(|e| log::error!("{}", e)); + tokio::join!( + EVALUATIONS.shutdown(), + JOBS_BUILD.shutdown(), + JOBS_POST.shutdown(), + JOBS_PRE.shutdown(), + ); +} diff --git a/typhon/src/main.rs b/typhon/src/main.rs index 232b31fd..71975ce7 100644 --- a/typhon/src/main.rs +++ b/typhon/src/main.rs @@ -30,8 +30,15 @@ async fn main() -> std::io::Result<()> { .expect("failed to run migrations"); // Run actix server - HttpServer::new(|| App::new().configure(typhon::api::config)) + let actix = HttpServer::new(|| App::new().configure(typhon::api::config)) .bind(("127.0.0.1", 8000))? - .run() - .await + .run(); + + // Graceful shutdown + tokio::select! { + _ = actix => panic!(), + _ = typhon::shutdown() => eprintln!("Good bye!"), + } + + Ok(()) } diff --git a/typhon/src/tasks.rs b/typhon/src/tasks.rs index 77a8c881..97967d92 100644 --- a/typhon/src/tasks.rs +++ b/typhon/src/tasks.rs @@ -4,6 +4,11 @@ use tokio::sync::Mutex; use std::collections::HashMap; use std::future::Future; +#[derive(Debug)] +pub enum Error { + ShuttingDown, +} + #[derive(Debug)] struct TaskHandle { canceler: Option>, @@ -13,6 +18,7 @@ struct TaskHandle { #[derive(Debug)] struct TasksUnwrapped { handles: HashMap, + shutdown: bool, } #[derive(Debug)] @@ -20,11 +26,12 @@ pub struct Tasks { tasks: Mutex>, } -impl Tasks { +impl Tasks { pub fn new() -> Self { Tasks { tasks: Mutex::new(TasksUnwrapped { handles: HashMap::new(), + shutdown: false, }), } } @@ -59,8 +66,11 @@ impl Tasks { id: Id, task: T, f: F, - ) -> () { + ) -> Result<(), Error> { let mut tasks = self.tasks.lock().await; + if tasks.shutdown { + return Err(Error::ShuttingDown); + } let (send, recv) = channel::<()>(); let handle = TaskHandle { canceler: Some(send), @@ -80,6 +90,7 @@ impl Tasks { } }); }); + Ok(()) } pub async fn cancel(&self, id: &Id) -> bool { @@ -92,4 +103,19 @@ impl Tasks { .flatten() .is_some() } + + pub async fn shutdown(&'static self) { + let mut tasks = self.tasks.lock().await; + tasks.shutdown = true; + let mut set = tokio::task::JoinSet::new(); + for id in tasks.handles.keys() { + set.spawn({ + let id = id.clone(); + async move { self.wait(&id).await } + }); + self.cancel(id).await; + } + drop(tasks); + while let Some(_) = set.join_next().await {} + } }