Skip to content

Commit

Permalink
feat(shutdown): handle SIGINT/SIGTERM signals
Browse files Browse the repository at this point in the history
Gracefully shut down all the active tasks when SIGINT or SIGTERM
signals are received.

Signed-off-by: Riccardo Gallo <[email protected]>
  • Loading branch information
rgallor committed Oct 4, 2024
1 parent 8e84648 commit 1d2b0cc
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ description = "Astarte Rust SDK based data stream test."
astarte-device-sdk = { version = "0.9.0", features = ["message-hub"] }
clap = { version = "=4.4.18", features = ["derive", "env", "string"] }
color-eyre = "0.6.3"
toml = "0.8.12"
futures = "0.3.30"
rand = "0.8.5"
serde = { version = "1.0.207", features = ["derive"] }
tokio = { version = "1.37.0", features = ["rt-multi-thread", "sync", "macros", "signal"] }
tokio-stream = "0.1.15"
toml = "0.8.12"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"]}
rand = "0.8.5"
serde = { version = "1.0.207", features = ["derive"] }
uuid = { version = "1.10.0", features = ["v4"] }
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
pub mod astarte;
pub mod cli;
pub mod math;
pub mod shutdown;
38 changes: 26 additions & 12 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use std::env::VarError;
use std::time::SystemTime;
use stream_rust_test::astarte::{send_data, ConnectionConfigBuilder, SdkConnection};
use stream_rust_test::cli::Config;
use stream_rust_test::shutdown::shutdown;
use tokio::task::JoinSet;
use tracing::{debug, error};
use tracing::{debug, error, info};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{fmt, EnvFilter};
Expand Down Expand Up @@ -78,17 +79,30 @@ async fn main() -> eyre::Result<()> {
tasks.spawn(send_data(client, now, cli_cfg));

// handle tasks termination
while let Some(res) = tasks.join_next().await {
match res {
Ok(Ok(())) => {}
Err(err) if err.is_cancelled() => {}
Err(err) => {
error!(error = %err, "Task panicked");
return Err(err.into());
}
Ok(Err(err)) => {
error!(error = %err, "Task returned an error");
return Err(err);
loop {
tokio::select! {
_ = shutdown()? => {
info!("CTRL C received, shutting down");
tasks.abort_all();
break;
},
opt = tasks.join_next() => {
let Some(res) = opt else {
break;
};

match res {
Ok(Ok(())) => {}
Err(err) if err.is_cancelled() => {}
Err(err) => {
error!(error = %err, "Task panicked");
return Err(err.into());
}
Ok(Err(err)) => {
error!(error = %err, "Task returned an error");
return Err(err);
}
}
}
}
}
Expand Down
51 changes: 51 additions & 0 deletions src/shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// This file is part of Astarte.
//
// Copyright 2024 SECO Mind Srl
//
// SPDX-License-Identifier: Apache-2.0

//! Define shutdown futures to stop the docker container with CTRL+C command
use color_eyre::eyre;
use color_eyre::eyre::WrapErr;
use tracing::error;

#[cfg(unix)]
/// Shut down the application in case a SIGTERM or SIGINT is received.
pub fn shutdown() -> eyre::Result<impl std::future::Future<Output = ()>> {

Check warning on line 15 in src/shutdown.rs

View check run for this annotation

Codecov / codecov/patch

src/shutdown.rs#L15

Added line #L15 was not covered by tests
use futures::FutureExt;
use tokio::signal::unix::SignalKind;

let mut term = tokio::signal::unix::signal(SignalKind::terminate())

Check warning on line 19 in src/shutdown.rs

View check run for this annotation

Codecov / codecov/patch

src/shutdown.rs#L19

Added line #L19 was not covered by tests
.wrap_err("couldn't create SIGTERM listener")?;

let future = async move {
let term = std::pin::pin!(async move {
if term.recv().await.is_none() {
error!("no more signal events can be received")

Check warning on line 25 in src/shutdown.rs

View check run for this annotation

Codecov / codecov/patch

src/shutdown.rs#L22-L25

Added lines #L22 - L25 were not covered by tests
}
});

let ctrl_c = std::pin::pin!(tokio::signal::ctrl_c().map(|res| {
if let Err(err) = res {
error!("couldn't receive SIGINT {err}");

Check warning on line 31 in src/shutdown.rs

View check run for this annotation

Codecov / codecov/patch

src/shutdown.rs#L29-L31

Added lines #L29 - L31 were not covered by tests
}
}));

futures::future::select(term, ctrl_c).await;

Check warning on line 35 in src/shutdown.rs

View check run for this annotation

Codecov / codecov/patch

src/shutdown.rs#L35

Added line #L35 was not covered by tests
};

Ok(future)
}

#[cfg(not(unix))]
/// Shut down the application in case a SIGINT is received.
pub fn shutdown() -> eyre::Result<impl std::future::Future<Output = ()>> {
use futures::FutureExt;

Ok(tokio::signal::ctrl_c().map(|res| {
if let Err(err) = res {
error!("couldn't receive SIGINT {err}");
}
}))
}

0 comments on commit 1d2b0cc

Please sign in to comment.