Skip to content

Commit

Permalink
server: graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
pnmadelaine committed Oct 1, 2023
1 parent db94556 commit ee13c9e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 5 deletions.
12 changes: 12 additions & 0 deletions typhon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,15 @@ pub fn log_event(event: Event) {
LISTENERS.lock().await.log(event);
});
}

pub async fn shutdown() {
let res = tokio::signal::ctrl_c().await;
eprintln!("Typhon is shutting down...");
let _ = res.map_err(|e| log::error!("{}", e));
tokio::join!(
EVALUATIONS.shutdown(),
JOBS_BUILD.shutdown(),
JOBS_BEGIN.shutdown(),
JOBS_END.shutdown(),
);
}
13 changes: 10 additions & 3 deletions typhon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,15 @@ async fn main() -> std::io::Result<()> {
.expect("failed to run migrations");

// Run actix server
HttpServer::new(|| App::new().configure(typhon::api::config))
let actix = HttpServer::new(|| App::new().configure(typhon::api::config))
.bind(("127.0.0.1", 8000))?
.run()
.await
.run();

// Graceful shutdown
tokio::select! {
_ = actix => panic!(),
_ = typhon::shutdown() => eprintln!("Good bye!"),
}

Ok(())
}
30 changes: 28 additions & 2 deletions typhon/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ use tokio::sync::Mutex;
use std::collections::HashMap;
use std::future::Future;

#[derive(Debug)]
pub enum Error {
ShuttingDown,
}

#[derive(Debug)]
struct TaskHandle {
canceler: Option<Sender<()>>,
Expand All @@ -13,18 +18,20 @@ struct TaskHandle {
#[derive(Debug)]
struct TasksUnwrapped<Id> {
handles: HashMap<Id, TaskHandle>,
shutdown: bool,
}

#[derive(Debug)]
pub struct Tasks<Id> {
tasks: Mutex<TasksUnwrapped<Id>>,
}

impl<Id: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Send> Tasks<Id> {
impl<Id: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Send + Sync> Tasks<Id> {
pub fn new() -> Self {
Tasks {
tasks: Mutex::new(TasksUnwrapped {
handles: HashMap::new(),
shutdown: false,
}),
}
}
Expand Down Expand Up @@ -59,8 +66,11 @@ impl<Id: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Send> Tasks<Id> {
id: Id,
task: T,
f: F,
) -> () {
) -> Result<(), Error> {
let mut tasks = self.tasks.lock().await;
if tasks.shutdown {
return Err(Error::ShuttingDown);
}
let (send, recv) = channel::<()>();
let handle = TaskHandle {
canceler: Some(send),
Expand All @@ -80,6 +90,7 @@ impl<Id: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Send> Tasks<Id> {
}
});
});
Ok(())
}

pub async fn cancel(&self, id: &Id) -> bool {
Expand All @@ -92,4 +103,19 @@ impl<Id: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Send> Tasks<Id> {
.flatten()
.is_some()
}

pub async fn shutdown(&'static self) {
let mut tasks = self.tasks.lock().await;
tasks.shutdown = true;
let mut set = tokio::task::JoinSet::new();
for id in tasks.handles.keys() {
set.spawn({
let id = id.clone();
async move { self.wait(&id).await }
});
self.cancel(id).await;
}
drop(tasks);
while let Some(_) = set.join_next().await {}
}
}

0 comments on commit ee13c9e

Please sign in to comment.