From ff94107f21937268c1f198da1d309d54e6538094 Mon Sep 17 00:00:00 2001 From: Riccardo Gallo Date: Fri, 27 Sep 2024 17:37:31 +0200 Subject: [PATCH] feat(shutdown): handle SIGINT/SIGTERM signals Gracefully shut down all the active tasks when SIGINT or SIGTERM signals are received. Signed-off-by: Riccardo Gallo --- Cargo.lock | 1 + Cargo.toml | 7 ++++--- src/lib.rs | 1 + src/main.rs | 36 +++++++++++++++++++++++----------- src/shutdown.rs | 51 +++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 82 insertions(+), 14 deletions(-) create mode 100644 src/shutdown.rs diff --git a/Cargo.lock b/Cargo.lock index f2d316d..256c6fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2049,6 +2049,7 @@ dependencies = [ "astarte-device-sdk", "clap", "color-eyre", + "futures", "rand", "serde", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 7c34bee..dde0b9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,10 +21,11 @@ description = "Astarte Rust SDK based data stream test." astarte-device-sdk = { version = "0.9.1", features = ["message-hub"] } clap = { version = "=4.4.18", features = ["derive", "env", "string"] } color-eyre = "0.6.3" -toml = "0.8.12" +futures = "0.3.30" +rand = "0.8.5" +serde = { version = "1.0.207", features = ["derive"] } tokio = { version = "1.37.0", features = ["rt-multi-thread", "sync", "macros", "signal"] } +toml = "0.8.12" tracing = "0.1.37" tracing-subscriber = { version = "0.3.0", features = ["env-filter"]} -rand = "0.8.5" -serde = { version = "1.0.207", features = ["derive"] } uuid = { version = "1.10.0", features = ["v4"] } diff --git a/src/lib.rs b/src/lib.rs index 877867b..d7bbaa3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,3 +10,4 @@ pub mod astarte; pub mod cli; pub mod math; +pub mod shutdown; diff --git a/src/main.rs b/src/main.rs index 52d7ae5..965fa66 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,6 +11,7 @@ use color_eyre::eyre::WrapErr; use std::time::SystemTime; use stream_rust_test::astarte::{send_data, ConnectionConfigBuilder, SdkConnection}; use stream_rust_test::cli::Config; +use stream_rust_test::shutdown::shutdown; use tokio::task::JoinSet; use tracing::{debug, error, info, warn}; use tracing_subscriber::layer::SubscriberExt; @@ -76,17 +77,30 @@ async fn main() -> eyre::Result<()> { tasks.spawn(send_data(client, now, cli_cfg)); // handle tasks termination - while let Some(res) = tasks.join_next().await { - match res { - Ok(Ok(())) => {} - Err(err) if err.is_cancelled() => {} - Err(err) => { - error!(error = %err, "Task panicked"); - return Err(err.into()); - } - Ok(Err(err)) => { - error!(error = %err, "Task returned an error"); - return Err(err); + loop { + tokio::select! { + _ = shutdown()? => { + info!("CTRL C received, shutting down"); + tasks.abort_all(); + break; + }, + opt = tasks.join_next() => { + let Some(res) = opt else { + break; + }; + + match res { + Ok(Ok(())) => {} + Err(err) if err.is_cancelled() => {} + Err(err) => { + error!(error = %err, "Task panicked"); + return Err(err.into()); + } + Ok(Err(err)) => { + error!(error = %err, "Task returned an error"); + return Err(err); + } + } } } } diff --git a/src/shutdown.rs b/src/shutdown.rs new file mode 100644 index 0000000..82517f6 --- /dev/null +++ b/src/shutdown.rs @@ -0,0 +1,51 @@ +// This file is part of Astarte. +// +// Copyright 2024 SECO Mind Srl +// +// SPDX-License-Identifier: Apache-2.0 + +//! Define shutdown futures to stop the docker container with CTRL+C command + +use color_eyre::eyre; +use color_eyre::eyre::WrapErr; +use tracing::error; + +#[cfg(unix)] +/// Shut down the application in case a SIGTERM or SIGINT is received. +pub fn shutdown() -> eyre::Result> { + use futures::FutureExt; + use tokio::signal::unix::SignalKind; + + let mut term = tokio::signal::unix::signal(SignalKind::terminate()) + .wrap_err("couldn't create SIGTERM listener")?; + + let future = async move { + let term = std::pin::pin!(async move { + if term.recv().await.is_none() { + error!("no more signal events can be received") + } + }); + + let ctrl_c = std::pin::pin!(tokio::signal::ctrl_c().map(|res| { + if let Err(err) = res { + error!("couldn't receive SIGINT {err}"); + } + })); + + futures::future::select(term, ctrl_c).await; + }; + + Ok(future) +} + +#[cfg(not(unix))] +/// Shut down the application in case a SIGINT is received. +pub fn shutdown() -> eyre::Result> { + use futures::FutureExt; + + Ok(tokio::signal::ctrl_c().map(|res| { + if let Err(err) = res { + error!("couldn't receive SIGINT {err}"); + } + })) +}