diff --git a/crates/common/tedge_config/src/system_services/managers/config.rs b/crates/common/tedge_config/src/system_services/managers/config.rs index b6a8f388b7c..4a0b947809c 100644 --- a/crates/common/tedge_config/src/system_services/managers/config.rs +++ b/crates/common/tedge_config/src/system_services/managers/config.rs @@ -3,6 +3,7 @@ use camino::Utf8Path; use serde::Deserialize; use std::collections::HashMap; use std::fs; +use std::time::Duration; pub const SERVICE_CONFIG_FILE: &str = "system.toml"; const REBOOT_COMMAND: &[&str] = &["init", "6"]; @@ -32,6 +33,23 @@ pub struct InitConfig { #[serde(deny_unknown_fields)] pub struct SystemSpecificCommands { pub reboot: Vec, + #[serde(default = "SystemSpecificCommands::default_reboot_timeout_seconds")] + pub reboot_timeout_seconds: u64, +} + +impl SystemSpecificCommands { + pub fn default_reboot_timeout_seconds() -> u64 { + // The linux shutdown command only supports triggering the shutdown immediately + // or in minutes, a delay in seconds is not supported. Using a shell script to delay + // the call to shutdown is generally not very reliable. + // Choose a sensible default that won't timeout if 'shutdown -r' is used + // (with some buffer), e.g. 2 x default interval (60 seconds) + 120 + } + + pub fn reboot_timeout(&self) -> Duration { + Duration::from_secs(self.reboot_timeout_seconds) + } } impl Default for SystemSpecificCommands { @@ -41,6 +59,7 @@ impl Default for SystemSpecificCommands { .iter() .map(|value| String::from(*value)) .collect::>(), + reboot_timeout_seconds: SystemSpecificCommands::default_reboot_timeout_seconds(), } } } diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs index d078fa1d72c..b5a88ab0e24 100644 --- a/crates/core/c8y_api/src/json_c8y.rs +++ b/crates/core/c8y_api/src/json_c8y.rs @@ -498,7 +498,7 @@ mod tests { let expected_json = r#"{"c8y_SoftwareList":[{"name":"a","version":"::debian","url":""},{"name":"b","version":"1.0::debian","url":""},{"name":"c","version":"::debian","url":"https://foobar.io/c.deb"},{"name":"d","version":"beta::debian","url":"https://foobar.io/d.deb"},{"name":"m","version":"::apama","url":"https://foobar.io/m.epl"}]}"#; assert_eq!(c8y_software_list, expected_struct); - assert_eq!(c8y_software_list.to_json().unwrap(), expected_json); + assert_eq!(c8y_software_list.to_json(), expected_json); } #[test] @@ -518,7 +518,7 @@ mod tests { let expected_json = r#"{"c8y_SoftwareList":[]}"#; assert_eq!(c8y_software_list, expected_struct); - assert_eq!(c8y_software_list.to_json().unwrap(), expected_json); + assert_eq!(c8y_software_list.to_json(), expected_json); } #[test] diff --git a/crates/core/c8y_api/src/smartrest/smartrest_deserializer.rs b/crates/core/c8y_api/src/smartrest/smartrest_deserializer.rs index d5f1bd95818..f964ab11e4d 100644 --- a/crates/core/c8y_api/src/smartrest/smartrest_deserializer.rs +++ b/crates/core/c8y_api/src/smartrest/smartrest_deserializer.rs @@ -542,7 +542,7 @@ mod tests { .from_smartrest(&smartrest) .unwrap() .to_thin_edge_json_with_id("123"); - let output_json = software_update_request.unwrap().to_json().unwrap(); + let output_json = software_update_request.unwrap().to_json(); let expected_json = json!({ "id": "123", diff --git a/crates/core/c8y_api/src/smartrest/topic.rs b/crates/core/c8y_api/src/smartrest/topic.rs index f6acf95d177..f3c36f8b5f4 100644 --- a/crates/core/c8y_api/src/smartrest/topic.rs +++ b/crates/core/c8y_api/src/smartrest/topic.rs @@ -19,6 +19,18 @@ pub enum C8yTopic { } impl C8yTopic { + /// Return the c8y SmartRest response topic for the given entity + pub fn smartrest_response_topic(entity: &EntityMetadata) -> Option { + match entity.r#type { + EntityType::MainDevice => Some(C8yTopic::upstream_topic()), + EntityType::ChildDevice | EntityType::Service => { + Self::ChildSmartRestResponse(entity.entity_id.clone()) + .to_topic() + .ok() + } + } + } + pub fn to_topic(&self) -> Result { Topic::new(self.to_string().as_str()) } diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index f9557ac4fcc..92fe7fe380f 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -85,7 +85,8 @@ impl AgentConfig { .with_ip_address(http_bind_address); // Restart config - let restart_config = RestartManagerConfig::from_tedge_config(tedge_config_location)?; + let restart_config = + RestartManagerConfig::from_tedge_config(&mqtt_device_topic_id, tedge_config_location)?; // Software update config let sw_update_config = SoftwareManagerConfig::from_tedge_config(tedge_config_location)?; @@ -170,6 +171,8 @@ impl Agent { // Converter actor let converter_actor_builder = TedgeOperationConverterBuilder::new( + self.config.mqtt_topic_root.as_ref(), + self.config.mqtt_device_topic_id.clone(), &mut software_update_builder, &mut restart_actor_builder, &mut mqtt_actor_builder, diff --git a/crates/core/tedge_agent/src/file_transfer_server/http_rest.rs b/crates/core/tedge_agent/src/file_transfer_server/http_rest.rs index 5e43d89ad17..51033b09a27 100644 --- a/crates/core/tedge_agent/src/file_transfer_server/http_rest.rs +++ b/crates/core/tedge_agent/src/file_transfer_server/http_rest.rs @@ -271,8 +271,8 @@ mod test { assert_eq!(actual_file_name, expected_file_name); } - const VALID_TEST_URI: &str = "http://127.0.0.1:3000/tedge/file-transfer/another/dir/test-file"; - const INVALID_TEST_URI: &str = "http://127.0.0.1:3000/wrong/place/test-file"; + const VALID_TEST_URI: &str = "http://127.0.0.1:3333/tedge/file-transfer/another/dir/test-file"; + const INVALID_TEST_URI: &str = "http://127.0.0.1:3333/wrong/place/test-file"; #[test_case(hyper::Method::GET, VALID_TEST_URI, hyper::StatusCode::OK)] #[test_case(hyper::Method::GET, INVALID_TEST_URI, hyper::StatusCode::NOT_FOUND)] @@ -348,7 +348,7 @@ mod test { let tempdir_path = ttd.utf8_path_buf(); let http_config = HttpConfig::default() .with_data_dir(tempdir_path) - .with_port(3000); + .with_port(3333); let server = http_file_transfer_server(&http_config).unwrap(); (ttd, server) } diff --git a/crates/core/tedge_agent/src/restart_manager/actor.rs b/crates/core/tedge_agent/src/restart_manager/actor.rs index 747f3c8c350..b00f1665218 100644 --- a/crates/core/tedge_agent/src/restart_manager/actor.rs +++ b/crates/core/tedge_agent/src/restart_manager/actor.rs @@ -16,10 +16,11 @@ use tedge_actors::RuntimeError; use tedge_actors::RuntimeRequest; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; -use tedge_api::OperationStatus; -use tedge_api::RestartOperationRequest; -use tedge_api::RestartOperationResponse; +use tedge_api::messages::CommandStatus; +use tedge_api::messages::RestartCommandPayload; +use tedge_api::RestartCommand; use tedge_config::system_services::SystemConfig; +use tedge_config::system_services::SystemSpecificCommands; use tokio::process::Command; use tokio::time::timeout; use tracing::error; @@ -36,7 +37,7 @@ const SUDO: &str = "echo"; pub struct RestartManagerActor { config: RestartManagerConfig, state_repository: AgentStateRepository, - message_box: SimpleMessageBox, + message_box: SimpleMessageBox, } #[async_trait] @@ -51,24 +52,53 @@ impl Actor for RestartManagerActor { } while let Some(request) = self.message_box.recv().await { - let executing_response = self.update_state_repository(&request).await; + if request.status() != CommandStatus::Init { + continue; + } + let executing_response = self.update_state_repository(request.clone()).await; + let ready = executing_response.status() == CommandStatus::Executing; self.message_box.send(executing_response).await?; + if !ready { + info!("Cannot restart"); + continue; + } + info!("Triggering a restart"); - let maybe_error = self.handle_restart_operation().await; - - match timeout(Duration::from_secs(5), self.message_box.recv_signal()).await { - Ok(Some(RuntimeRequest::Shutdown)) => { - // As expected, the restart triggered a shutdown. - return Ok(()); + let restart_timeout = self.get_restart_timeout(); + match timeout(restart_timeout, self.handle_restart_operation()).await { + Ok(Err(err)) => { + let error = format!("Fail to trigger a restart: {err}"); + error!(error); + self.handle_error(request, error).await?; + } + Err(_) => { + let error = format!( + "Restart command still running after {} seconds", + restart_timeout.as_secs() + ); + error!(error); + self.handle_error(request, error).await?; } - Ok(None) | Err(_) => { - // Something went wrong. The process should have been shutdown by the restart. - if let Err(err) = maybe_error { - error!("{}", err); + Ok(Ok(not_interrupted)) => { + if not_interrupted { + info!("The restart command has been successfully executed"); + } else { + info!("The restart command has been interrupted by a signal"); + } + match timeout(restart_timeout, self.message_box.recv_signal()).await { + Ok(Some(RuntimeRequest::Shutdown)) => { + info!("As requested, a shutdown has been triggered"); + return Ok(()); + } + Ok(None) | Err(_ /* timeout */) => { + // Something went wrong. The process should have been shutdown by the restart. + let error = "No shutdown has been triggered".to_string(); + error!(error); + self.handle_error(request, error).await?; + } } - self.handle_error(&request).await?; } - } + }; } Ok(()) @@ -78,7 +108,7 @@ impl Actor for RestartManagerActor { impl RestartManagerActor { pub fn new( config: RestartManagerConfig, - message_box: SimpleMessageBox, + message_box: SimpleMessageBox, ) -> Self { let state_repository = AgentStateRepository::new_with_file_name( config.config_dir.clone(), @@ -91,89 +121,109 @@ impl RestartManagerActor { } } - async fn process_pending_restart_operation(&mut self) -> Option { - let state: Result = self.state_repository.load().await; - - if let State { - operation_id: Some(id), - operation: Some(operation), - } = match state { - Ok(state) => state, - Err(_) => State { - operation_id: None, - operation: None, - }, - } { - self.clear_state_repository().await; - - match operation { - StateStatus::Restart(RestartOperationStatus::Restarting) => { - let status = match has_rebooted(&self.config.tmp_dir) { - Ok(true) => { - info!("Device restart successful."); - OperationStatus::Successful - } - Ok(false) => { - info!("Device failed to restart."); - OperationStatus::Failed - } - Err(err) => { - error!("Fail to detect a restart: {err}"); - OperationStatus::Failed - } - }; + async fn process_pending_restart_operation(&mut self) -> Option { + match self.state_repository.load().await { + Ok(State { + operation_id: Some(operation_id), + operation: Some(operation), + }) => { + self.clear_state_repository().await; - return Some(RestartOperationResponse { id, status }); - } - StateStatus::Restart(RestartOperationStatus::Pending) => { - error!("The agent has been restarted but not the device"); - let status = OperationStatus::Failed; - return Some(RestartOperationResponse { id, status }); - } - StateStatus::Software(_) | StateStatus::UnknownOperation => { - error!("UnknownOperation in store."); + let command = RestartCommand { + target: self.config.device_topic_id.clone(), + cmd_id: operation_id, + payload: RestartCommandPayload::default(), + }; + + match operation { + StateStatus::Restart(RestartOperationStatus::Restarting) => { + let command = match has_rebooted(&self.config.tmp_dir) { + Ok(true) => { + info!("Device restart successful"); + command.with_status(CommandStatus::Successful) + } + Ok(false) => { + let error = "Device failed to restart"; + error!(error); + command.with_error(error.to_string()) + } + Err(err) => { + let error = format!("Fail to detect a restart: {err}"); + error!(error); + command.with_error(error) + } + }; + + Some(command) + } + StateStatus::Restart(RestartOperationStatus::Pending) => { + let error = "The agent has been restarted but not the device"; + error!(error); + Some(command.with_error(error.to_string())) + } + StateStatus::Software(_) | StateStatus::UnknownOperation => { + error!("UnknownOperation in store."); + None + } } - }; + } + Err(err) => { + error!("Fail to read tedge-agent state: {err}"); + None + } + Ok(_) => None, } - None } - async fn update_state_repository( - &mut self, - request: &RestartOperationRequest, - ) -> RestartOperationResponse { - let response = RestartOperationResponse::new(request); + async fn update_state_repository(&mut self, command: RestartCommand) -> RestartCommand { let state = State { - operation_id: Some(request.id.clone()), + operation_id: Some(command.cmd_id.clone()), operation: Some(StateStatus::Restart(RestartOperationStatus::Restarting)), }; if let Err(err) = self.state_repository.store(&state).await { - error!( + let reason = format!( "Fail to update the restart state in {} due to: {}", self.state_repository.state_repo_path, err ); - return response.with_status(OperationStatus::Failed); + error!(reason); + return command.with_error(reason); } if let Err(err) = create_tmp_restart_file(&self.config.tmp_dir).await { - error!( + let reason = format!( "Fail to create a witness file in {} due to: {}", self.config.tmp_dir, err ); - return response.with_status(OperationStatus::Failed); + error!(reason); + return command.with_error(reason); } - response + command.with_status(CommandStatus::Executing) } - async fn handle_restart_operation(&mut self) -> Result<(), RestartManagerError> { - let commands = self.get_restart_operation_commands().await?; + /// Run the restart command + /// + /// Returns: + /// - `Ok(true)` if all the commands run successfully. + /// - `Ok(false)` if one of the commands has been interrupted by a signal. + /// - `Err(_)` if one the commands cannot be launched or failed. + async fn handle_restart_operation(&mut self) -> Result { + let commands = self.get_restart_operation_commands()?; + let mut not_interrupted = true; for mut command in commands { + if let Some(cmd) = command.as_std().get_program().to_str() { + info!("Restarting: {cmd}"); + } match command.status().await { Ok(status) => { - if !status.success() { - return Err(RestartManagerError::CommandFailed); + if status.code().is_none() { + // This might the result of the reboot - hence not considered as an error + not_interrupted = false; + } else if !status.success() { + return Err(RestartManagerError::CommandFailed { + command: format!("{command:?}"), + }); } } Err(e) => { @@ -182,17 +232,16 @@ impl RestartManagerActor { } } - Ok(()) + Ok(not_interrupted) } async fn handle_error( &mut self, - request: &RestartOperationRequest, + command: RestartCommand, + reason: String, ) -> Result<(), ChannelError> { self.clear_state_repository().await; - let status = OperationStatus::Failed; - let response = RestartOperationResponse::new(request).with_status(status); - self.message_box.send(response).await?; + self.message_box.send(command.with_error(reason)).await?; Ok(()) } @@ -205,7 +254,7 @@ impl RestartManagerActor { } } - async fn get_restart_operation_commands(&self) -> Result, RestartManagerError> { + fn get_restart_operation_commands(&self) -> Result, RestartManagerError> { let mut vec = vec![]; // reading `config_dir` to get the restart command or defaulting to `["init", "6"]' @@ -237,4 +286,10 @@ impl RestartManagerActor { Ok(vec) } + + fn get_restart_timeout(&self) -> Duration { + SystemConfig::try_new(&self.config.config_dir) + .map(|config| config.system.reboot_timeout()) + .unwrap_or_else(|_| SystemSpecificCommands::default().reboot_timeout()) + } } diff --git a/crates/core/tedge_agent/src/restart_manager/builder.rs b/crates/core/tedge_agent/src/restart_manager/builder.rs index 4ad43bb0fca..ec1be48effb 100644 --- a/crates/core/tedge_agent/src/restart_manager/builder.rs +++ b/crates/core/tedge_agent/src/restart_manager/builder.rs @@ -8,12 +8,11 @@ use tedge_actors::RuntimeRequest; use tedge_actors::RuntimeRequestSink; use tedge_actors::ServiceProvider; use tedge_actors::SimpleMessageBoxBuilder; -use tedge_api::RestartOperationRequest; -use tedge_api::RestartOperationResponse; +use tedge_api::RestartCommand; pub struct RestartManagerBuilder { config: RestartManagerConfig, - message_box: SimpleMessageBoxBuilder, + message_box: SimpleMessageBoxBuilder, } impl RestartManagerBuilder { @@ -27,14 +26,12 @@ impl RestartManagerBuilder { } } -impl ServiceProvider - for RestartManagerBuilder -{ +impl ServiceProvider for RestartManagerBuilder { fn connect_consumer( &mut self, config: NoConfig, - response_sender: DynSender, - ) -> DynSender { + response_sender: DynSender, + ) -> DynSender { self.message_box.connect_consumer(config, response_sender) } } diff --git a/crates/core/tedge_agent/src/restart_manager/config.rs b/crates/core/tedge_agent/src/restart_manager/config.rs index 3b76aa22208..cc13add6e1c 100644 --- a/crates/core/tedge_agent/src/restart_manager/config.rs +++ b/crates/core/tedge_agent/src/restart_manager/config.rs @@ -1,19 +1,28 @@ use camino::Utf8PathBuf; +use tedge_api::mqtt_topics::EntityTopicId; + #[derive(Debug, Clone)] pub struct RestartManagerConfig { + pub device_topic_id: EntityTopicId, pub tmp_dir: Utf8PathBuf, pub config_dir: Utf8PathBuf, } impl RestartManagerConfig { - pub fn new(tmp_dir: &Utf8PathBuf, config_dir: &Utf8PathBuf) -> Self { + pub fn new( + device_topic_id: &EntityTopicId, + tmp_dir: &Utf8PathBuf, + config_dir: &Utf8PathBuf, + ) -> Self { Self { + device_topic_id: device_topic_id.clone(), tmp_dir: tmp_dir.clone(), config_dir: config_dir.clone(), } } pub fn from_tedge_config( + device_topic_id: &EntityTopicId, tedge_config_location: &tedge_config::TEdgeConfigLocation, ) -> Result { let config_repository = @@ -23,6 +32,6 @@ impl RestartManagerConfig { let tmp_dir = tedge_config.tmp.path.clone(); let config_dir = tedge_config_location.tedge_config_root_path.clone(); - Ok(Self::new(&tmp_dir, &config_dir)) + Ok(Self::new(device_topic_id, &tmp_dir, &config_dir)) } } diff --git a/crates/core/tedge_agent/src/restart_manager/error.rs b/crates/core/tedge_agent/src/restart_manager/error.rs index 333abb26608..7337d205d2a 100644 --- a/crates/core/tedge_agent/src/restart_manager/error.rs +++ b/crates/core/tedge_agent/src/restart_manager/error.rs @@ -4,8 +4,8 @@ pub enum RestartManagerError { #[error(transparent)] FromIo(#[from] std::io::Error), - #[error("Command returned non 0 exit code.")] - CommandFailed, + #[error("Command returned non 0 exit code: {command}")] + CommandFailed { command: String }, #[error("Failed parsing /proc/uptime")] UptimeParserError, diff --git a/crates/core/tedge_agent/src/restart_manager/tests.rs b/crates/core/tedge_agent/src/restart_manager/tests.rs index 8b3dcaae829..e4245f8c454 100644 --- a/crates/core/tedge_agent/src/restart_manager/tests.rs +++ b/crates/core/tedge_agent/src/restart_manager/tests.rs @@ -11,9 +11,10 @@ use tedge_actors::Sender; use tedge_actors::ServiceConsumer; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; -use tedge_api::OperationStatus; -use tedge_api::RestartOperationRequest; -use tedge_api::RestartOperationResponse; +use tedge_api::messages::CommandStatus; +use tedge_api::messages::RestartCommandPayload; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::RestartCommand; use tedge_test_utils::fs::TempTedgeDir; const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000); @@ -30,9 +31,13 @@ async fn test_pending_restart_operation() -> Result<(), DynError> { let mut converter_box = spawn_restart_manager(&temp_dir).await?; converter_box - .assert_received([RestartOperationResponse { - id: "1234".to_string(), - status: OperationStatus::Successful, + .assert_received([RestartCommand { + target: EntityTopicId::default_main_device(), + cmd_id: "1234".to_string(), + payload: RestartCommandPayload { + status: CommandStatus::Successful, + reason: "".to_string(), + }, }]) .await; @@ -51,9 +56,13 @@ async fn test_pending_restart_operation_failed() -> Result<(), DynError> { let mut converter_box = spawn_restart_manager(&temp_dir).await?; converter_box - .assert_received([RestartOperationResponse { - id: "1234".to_string(), - status: OperationStatus::Failed, + .assert_received([RestartCommand { + target: EntityTopicId::default_main_device(), + cmd_id: "1234".to_string(), + payload: RestartCommandPayload { + status: CommandStatus::Failed, + reason: "The agent has been restarted but not the device".to_string(), + }, }]) .await; @@ -72,9 +81,13 @@ async fn test_pending_restart_operation_successful() -> Result<(), DynError> { let mut converter_box = spawn_restart_manager(&temp_dir).await?; converter_box - .assert_received([RestartOperationResponse { - id: "1234".to_string(), - status: OperationStatus::Successful, + .assert_received([RestartCommand { + target: EntityTopicId::default_main_device(), + cmd_id: "1234".to_string(), + payload: RestartCommandPayload { + status: CommandStatus::Successful, + reason: "".to_string(), + }, }]) .await; @@ -91,13 +104,18 @@ async fn test_new_restart_operation() -> Result<(), DynError> { // Simulate RestartOperationRequest converter_box - .send(RestartOperationRequest { - id: "random".to_string(), + .send(RestartCommand { + target: EntityTopicId::default_main_device(), + cmd_id: "1234".to_string(), + payload: RestartCommandPayload { + status: CommandStatus::Init, + reason: "".to_string(), + }, }) .await?; - let status = converter_box.recv().await.unwrap().status; - assert_eq!(status, OperationStatus::Executing); + let status = converter_box.recv().await.unwrap().status(); + assert_eq!(status, CommandStatus::Executing); // Check the agent restart temp file is created assert!(temp_dir.path().join("tedge_agent_restart").exists()); @@ -107,16 +125,15 @@ async fn test_new_restart_operation() -> Result<(), DynError> { async fn spawn_restart_manager( tmp_dir: &TempTedgeDir, -) -> Result< - TimedMessageBox>, - DynError, -> { - let mut converter_builder: SimpleMessageBoxBuilder< - RestartOperationResponse, - RestartOperationRequest, - > = SimpleMessageBoxBuilder::new("Converter", 5); - - let config = RestartManagerConfig::new(&tmp_dir.utf8_path_buf(), &tmp_dir.utf8_path_buf()); +) -> Result>, DynError> { + let mut converter_builder: SimpleMessageBoxBuilder = + SimpleMessageBoxBuilder::new("Converter", 5); + + let config = RestartManagerConfig::new( + &EntityTopicId::default_main_device(), + &tmp_dir.utf8_path_buf(), + &tmp_dir.utf8_path_buf(), + ); let mut restart_actor_builder = RestartManagerBuilder::new(config); converter_builder.set_connection(&mut restart_actor_builder); diff --git a/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs b/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs index 31c81807741..7e127c3ffbf 100644 --- a/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs +++ b/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs @@ -1,6 +1,5 @@ use crate::software_manager::actor::SoftwareRequest; use crate::software_manager::actor::SoftwareResponse; -use crate::tedge_operation_converter::error::TedgeOperationConverterError; use async_trait::async_trait; use log::error; use tedge_actors::fan_in_message_type; @@ -10,9 +9,12 @@ use tedge_actors::LoggingSender; use tedge_actors::MessageReceiver; use tedge_actors::RuntimeError; use tedge_actors::Sender; +use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::OperationType; use tedge_api::Jsonify; -use tedge_api::RestartOperationRequest; -use tedge_api::RestartOperationResponse; +use tedge_api::RestartCommand; use tedge_api::SoftwareListRequest; use tedge_api::SoftwareListResponse; use tedge_api::SoftwareUpdateRequest; @@ -20,12 +22,14 @@ use tedge_api::SoftwareUpdateResponse; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; -fan_in_message_type!(AgentInput[MqttMessage, SoftwareResponse, RestartOperationResponse] : Debug); +fan_in_message_type!(AgentInput[MqttMessage, SoftwareResponse, RestartCommand] : Debug); pub struct TedgeOperationConverterActor { + mqtt_schema: MqttSchema, + device_topic_id: EntityTopicId, input_receiver: LoggingReceiver, software_sender: LoggingSender, - restart_sender: LoggingSender, + restart_sender: LoggingSender, mqtt_publisher: LoggingSender, } @@ -36,6 +40,8 @@ impl Actor for TedgeOperationConverterActor { } async fn run(&mut self) -> Result<(), RuntimeError> { + self.publish_operation_capabilities().await?; + while let Some(input) = self.input_receiver.recv().await { match input { AgentInput::MqttMessage(message) => { @@ -47,8 +53,8 @@ impl Actor for TedgeOperationConverterActor { AgentInput::SoftwareResponse(SoftwareResponse::SoftwareUpdateResponse(res)) => { self.process_software_update_response(res).await?; } - AgentInput::RestartOperationResponse(res) => { - self.process_restart_response(res).await?; + AgentInput::RestartCommand(cmd) => { + self.process_restart_response(cmd).await?; } } } @@ -58,12 +64,16 @@ impl Actor for TedgeOperationConverterActor { impl TedgeOperationConverterActor { pub fn new( + mqtt_schema: MqttSchema, + device_topic_id: EntityTopicId, input_receiver: LoggingReceiver, software_sender: LoggingSender, - restart_sender: LoggingSender, + restart_sender: LoggingSender, mqtt_publisher: LoggingSender, ) -> Self { Self { + mqtt_schema, + device_topic_id, input_receiver, software_sender, restart_sender, @@ -71,10 +81,13 @@ impl TedgeOperationConverterActor { } } - async fn process_mqtt_message( - &mut self, - message: MqttMessage, - ) -> Result<(), TedgeOperationConverterError> { + async fn publish_operation_capabilities(&mut self) -> Result<(), RuntimeError> { + let restart_capability = + RestartCommand::capability_message(&self.mqtt_schema, &self.device_topic_id); + Ok(self.mqtt_publisher.send(restart_capability).await?) + } + + async fn process_mqtt_message(&mut self, message: MqttMessage) -> Result<(), RuntimeError> { match message.topic.name.as_str() { "tedge/commands/req/software/list" => { match SoftwareListRequest::from_slice(message.payload_bytes()) { @@ -96,15 +109,34 @@ impl TedgeOperationConverterActor { Err(err) => error!("Incorrect software update request payload: {err}"), } } - "tedge/commands/req/control/restart" => { - match RestartOperationRequest::from_slice(message.payload_bytes()) { - Ok(request) => { - self.restart_sender.send(request).await?; - } - Err(err) => error!("Incorrect restart request payload: {err}"), + _ => { + // Not a tedge/commands ! + } + } + + if message.topic.name.as_str().starts_with("tedge") { + return Ok(()); + } + + match self.mqtt_schema.entity_channel_of(&message.topic) { + Ok(( + target, + Channel::Command { + operation: OperationType::Restart, + cmd_id, + }, + )) => match RestartCommand::try_from(target, cmd_id, message.payload_bytes()) { + Ok(Some(cmd)) => { + self.restart_sender.send(cmd).await?; } + Ok(None) => { + // The command has been fully processed + } + Err(err) => error!("Incorrect restart request payload: {err}"), + }, + _ => { + log::error!("Unknown command channel: {}", message.topic.name); } - _ => unreachable!(), } Ok(()) } @@ -112,10 +144,10 @@ impl TedgeOperationConverterActor { async fn process_software_list_response( &mut self, response: SoftwareListResponse, - ) -> Result<(), TedgeOperationConverterError> { + ) -> Result<(), RuntimeError> { let message = MqttMessage::new( &Topic::new_unchecked("tedge/commands/res/software/list"), - response.to_bytes()?, + response.to_bytes(), ); self.mqtt_publisher.send(message).await?; Ok(()) @@ -124,10 +156,10 @@ impl TedgeOperationConverterActor { async fn process_software_update_response( &mut self, response: SoftwareUpdateResponse, - ) -> Result<(), TedgeOperationConverterError> { + ) -> Result<(), RuntimeError> { let message = MqttMessage::new( &Topic::new_unchecked("tedge/commands/res/software/update"), - response.to_bytes()?, + response.to_bytes(), ); self.mqtt_publisher.send(message).await?; Ok(()) @@ -135,12 +167,9 @@ impl TedgeOperationConverterActor { async fn process_restart_response( &mut self, - response: RestartOperationResponse, - ) -> Result<(), TedgeOperationConverterError> { - let message = MqttMessage::new( - &Topic::new_unchecked("tedge/commands/res/control/restart"), - response.to_bytes()?, - ); + response: RestartCommand, + ) -> Result<(), RuntimeError> { + let message = response.command_message(&self.mqtt_schema); self.mqtt_publisher.send(message).await?; Ok(()) } diff --git a/crates/core/tedge_agent/src/tedge_operation_converter/builder.rs b/crates/core/tedge_agent/src/tedge_operation_converter/builder.rs index e45a3304c98..d659c51f39c 100644 --- a/crates/core/tedge_agent/src/tedge_operation_converter/builder.rs +++ b/crates/core/tedge_agent/src/tedge_operation_converter/builder.rs @@ -12,29 +12,34 @@ use tedge_actors::NoConfig; use tedge_actors::RuntimeRequest; use tedge_actors::RuntimeRequestSink; use tedge_actors::ServiceProvider; -use tedge_api::RestartOperationRequest; -use tedge_api::RestartOperationResponse; +use tedge_api::mqtt_topics::ChannelFilter::Command; +use tedge_api::mqtt_topics::EntityFilter; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::OperationType; +use tedge_api::RestartCommand; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::TopicFilter; pub struct TedgeOperationConverterBuilder { + mqtt_schema: MqttSchema, + device_topic_id: EntityTopicId, input_receiver: LoggingReceiver, software_sender: LoggingSender, - restart_sender: LoggingSender, + restart_sender: LoggingSender, mqtt_publisher: LoggingSender, signal_sender: mpsc::Sender, } impl TedgeOperationConverterBuilder { pub fn new( + mqtt_topic_root: &str, + device_topic_id: EntityTopicId, software_actor: &mut impl ServiceProvider, - restart_actor: &mut impl ServiceProvider< - RestartOperationRequest, - RestartOperationResponse, - NoConfig, - >, + restart_actor: &mut impl ServiceProvider, mqtt_actor: &mut impl ServiceProvider, ) -> Self { + let mqtt_schema = MqttSchema::with_root(mqtt_topic_root.to_string()); let (input_sender, input_receiver) = mpsc::channel(10); let (signal_sender, signal_receiver) = mpsc::channel(10); @@ -51,11 +56,15 @@ impl TedgeOperationConverterBuilder { let restart_sender = restart_actor.connect_consumer(NoConfig, input_sender.clone().into()); let restart_sender = LoggingSender::new("RestartSender".into(), restart_sender); - let mqtt_publisher = - mqtt_actor.connect_consumer(Self::subscriptions(), input_sender.into()); + let mqtt_publisher = mqtt_actor.connect_consumer( + Self::subscriptions(&mqtt_schema, &device_topic_id), + input_sender.into(), + ); let mqtt_publisher = LoggingSender::new("MqttPublisher".into(), mqtt_publisher); Self { + mqtt_schema, + device_topic_id, input_receiver, software_sender, restart_sender, @@ -64,14 +73,24 @@ impl TedgeOperationConverterBuilder { } } - pub fn subscriptions() -> TopicFilter { - vec![ - "tedge/commands/req/software/list", - "tedge/commands/req/software/update", - "tedge/commands/req/control/restart", - ] - .try_into() - .expect("Infallible") + pub fn subscriptions(mqtt_schema: &MqttSchema, device_topic_id: &EntityTopicId) -> TopicFilter { + let mut topics: TopicFilter = [mqtt_schema.topics( + EntityFilter::Entity(device_topic_id), + Command(OperationType::Restart), + )] + .into_iter() + .collect(); + + topics.add_all( + vec![ + "tedge/commands/req/software/list", + "tedge/commands/req/software/update", + ] + .try_into() + .expect("Infallible"), + ); + + topics } } @@ -90,6 +109,8 @@ impl Builder for TedgeOperationConverterBuilder { fn build(self) -> TedgeOperationConverterActor { TedgeOperationConverterActor::new( + self.mqtt_schema, + self.device_topic_id, self.input_receiver, self.software_sender, self.restart_sender, diff --git a/crates/core/tedge_agent/src/tedge_operation_converter/error.rs b/crates/core/tedge_agent/src/tedge_operation_converter/error.rs deleted file mode 100644 index 22eb69b66e8..00000000000 --- a/crates/core/tedge_agent/src/tedge_operation_converter/error.rs +++ /dev/null @@ -1,17 +0,0 @@ -use tedge_actors::RuntimeError; - -#[derive(Debug, thiserror::Error)] -#[allow(clippy::enum_variant_names)] -pub enum TedgeOperationConverterError { - #[error(transparent)] - FromSerdeJson(#[from] serde_json::Error), - - #[error(transparent)] - FromChannelError(#[from] tedge_actors::ChannelError), -} - -impl From for RuntimeError { - fn from(error: TedgeOperationConverterError) -> Self { - RuntimeError::ActorError(Box::new(error)) - } -} diff --git a/crates/core/tedge_agent/src/tedge_operation_converter/mod.rs b/crates/core/tedge_agent/src/tedge_operation_converter/mod.rs index 89486268fff..0c10086537a 100644 --- a/crates/core/tedge_agent/src/tedge_operation_converter/mod.rs +++ b/crates/core/tedge_agent/src/tedge_operation_converter/mod.rs @@ -1,6 +1,5 @@ pub mod actor; pub mod builder; -pub mod error; #[cfg(test)] mod tests; diff --git a/crates/core/tedge_agent/src/tedge_operation_converter/tests.rs b/crates/core/tedge_agent/src/tedge_operation_converter/tests.rs index 4e0d1df5ac5..28178dc3337 100644 --- a/crates/core/tedge_agent/src/tedge_operation_converter/tests.rs +++ b/crates/core/tedge_agent/src/tedge_operation_converter/tests.rs @@ -11,11 +11,13 @@ use tedge_actors::MessageReceiver; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; +use tedge_api::messages::CommandStatus; +use tedge_api::messages::RestartCommandPayload; use tedge_api::messages::SoftwareModuleAction; use tedge_api::messages::SoftwareModuleItem; use tedge_api::messages::SoftwareRequestResponseSoftwareList; -use tedge_api::RestartOperationRequest; -use tedge_api::RestartOperationResponse; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::RestartCommand; use tedge_api::SoftwareListRequest; use tedge_api::SoftwareListResponse; use tedge_api::SoftwareUpdateRequest; @@ -28,7 +30,8 @@ const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000); #[tokio::test] async fn convert_incoming_software_list_request() -> Result<(), DynError> { // Spawn incoming mqtt message converter - let (mut software_box, _restart_box, mut mqtt_box) = spawn_mqtt_operation_converter().await?; + let (mut software_box, _restart_box, mut mqtt_box) = + spawn_mqtt_operation_converter("device/main//").await?; // Simulate SoftwareList MQTT message received. let mqtt_message = MqttMessage::new( @@ -49,7 +52,8 @@ async fn convert_incoming_software_list_request() -> Result<(), DynError> { #[tokio::test] async fn convert_incoming_software_update_request() -> Result<(), DynError> { // Spawn incoming mqtt message converter - let (mut software_box, _restart_box, mut mqtt_box) = spawn_mqtt_operation_converter().await?; + let (mut software_box, _restart_box, mut mqtt_box) = + spawn_mqtt_operation_converter("device/main//").await?; // Simulate SoftwareUpdate MQTT message received. let mqtt_message = MqttMessage::new( @@ -84,20 +88,28 @@ async fn convert_incoming_software_update_request() -> Result<(), DynError> { #[tokio::test] async fn convert_incoming_restart_request() -> Result<(), DynError> { + let target_device = "device/child-foo//"; + // Spawn incoming mqtt message converter - let (_software_box, mut restart_box, mut mqtt_box) = spawn_mqtt_operation_converter().await?; + let (_software_box, mut restart_box, mut mqtt_box) = + spawn_mqtt_operation_converter(target_device).await?; // Simulate Restart MQTT message received. let mqtt_message = MqttMessage::new( - &Topic::new_unchecked("tedge/commands/req/control/restart"), - r#"{"id": "random"}"#, + &Topic::new_unchecked(&format!("te/{target_device}/cmd/restart/random")), + r#"{"status": "init"}"#, ); mqtt_box.send(mqtt_message).await?; // Assert RestartOperationRequest restart_box - .assert_received([RestartOperationRequest { - id: "random".to_string(), + .assert_received([RestartCommand { + target: target_device.parse()?, + cmd_id: "random".to_string(), + payload: RestartCommandPayload { + status: CommandStatus::Init, + reason: "".to_string(), + }, }]) .await; @@ -107,7 +119,11 @@ async fn convert_incoming_restart_request() -> Result<(), DynError> { #[tokio::test] async fn convert_outgoing_software_list_response() -> Result<(), DynError> { // Spawn outgoing mqtt message converter - let (mut software_box, _restart_box, mut mqtt_box) = spawn_mqtt_operation_converter().await?; + let (mut software_box, _restart_box, mut mqtt_box) = + spawn_mqtt_operation_converter("device/main//").await?; + + // Skip capabilities messages + mqtt_box.skip(1).await; // Simulate SoftwareList response message received. let software_list_request = SoftwareListRequest::new_with_id("1234"); @@ -124,10 +140,31 @@ async fn convert_outgoing_software_list_response() -> Result<(), DynError> { Ok(()) } +#[tokio::test] +async fn publish_capabilities_on_start() -> Result<(), DynError> { + // Spawn outgoing mqtt message converter + let (_software_box, _restart_box, mut mqtt_box) = + spawn_mqtt_operation_converter("device/child//").await?; + + mqtt_box + .assert_received([MqttMessage::new( + &Topic::new_unchecked("te/device/child///cmd/restart"), + "{}", + ) + .with_retain()]) + .await; + + Ok(()) +} + #[tokio::test] async fn convert_outgoing_software_update_response() -> Result<(), DynError> { // Spawn outgoing mqtt message converter - let (mut software_box, _restart_box, mut mqtt_box) = spawn_mqtt_operation_converter().await?; + let (mut software_box, _restart_box, mut mqtt_box) = + spawn_mqtt_operation_converter("device/main//").await?; + + // Skip capabilities messages + mqtt_box.skip(1).await; // Simulate SoftwareUpdate response message received. let software_update_request = SoftwareUpdateRequest::new_with_id("1234"); @@ -147,10 +184,21 @@ async fn convert_outgoing_software_update_response() -> Result<(), DynError> { #[tokio::test] async fn convert_outgoing_restart_response() -> Result<(), DynError> { // Spawn outgoing mqtt message converter - let (_software_box, mut restart_box, mut mqtt_box) = spawn_mqtt_operation_converter().await?; - - // Simulate SoftwareList response message received. - let executing_response = RestartOperationResponse::new(&RestartOperationRequest::default()); + let (_software_box, mut restart_box, mut mqtt_box) = + spawn_mqtt_operation_converter("device/main//").await?; + + // Skip capabilities messages + mqtt_box.skip(1).await; + + // Simulate Restart response message received. + let executing_response = RestartCommand { + target: EntityTopicId::default_main_device(), + cmd_id: "abc".to_string(), + payload: RestartCommandPayload { + status: CommandStatus::Executing, + reason: "".to_string(), + }, + }; restart_box.send(executing_response).await?; let (topic, payload) = mqtt_box @@ -158,30 +206,32 @@ async fn convert_outgoing_restart_response() -> Result<(), DynError> { .await .map(|msg| (msg.topic, msg.payload)) .expect("MqttMessage"); - assert_eq!(topic.name, "tedge/commands/res/control/restart"); + assert_eq!(topic.name, "te/device/main///cmd/restart/abc"); assert!(format!("{:?}", payload).contains(r#"status":"executing"#)); Ok(()) } -async fn spawn_mqtt_operation_converter() -> Result< +async fn spawn_mqtt_operation_converter( + device_topic_id: &str, +) -> Result< ( TimedMessageBox>, - TimedMessageBox>, + TimedMessageBox>, TimedMessageBox>, ), DynError, > { let mut software_builder: SimpleMessageBoxBuilder = SimpleMessageBoxBuilder::new("Software", 5); - let mut restart_builder: SimpleMessageBoxBuilder< - RestartOperationRequest, - RestartOperationResponse, - > = SimpleMessageBoxBuilder::new("Restart", 5); + let mut restart_builder: SimpleMessageBoxBuilder = + SimpleMessageBoxBuilder::new("Restart", 5); let mut mqtt_builder: SimpleMessageBoxBuilder = SimpleMessageBoxBuilder::new("MQTT", 5); let converter_actor_builder = TedgeOperationConverterBuilder::new( + "te", + device_topic_id.parse().expect("Invalid topic id"), &mut software_builder, &mut restart_builder, &mut mqtt_builder, diff --git a/crates/core/tedge_api/src/lib.rs b/crates/core/tedge_api/src/lib.rs index 95aabf4692e..1b86c511f54 100644 --- a/crates/core/tedge_api/src/lib.rs +++ b/crates/core/tedge_api/src/lib.rs @@ -22,8 +22,7 @@ pub use messages::control_filter_topic; pub use messages::software_filter_topic; pub use messages::Jsonify; pub use messages::OperationStatus; -pub use messages::RestartOperationRequest; -pub use messages::RestartOperationResponse; +pub use messages::RestartCommand; pub use messages::SoftwareListRequest; pub use messages::SoftwareListResponse; pub use messages::SoftwareRequestResponse; @@ -37,6 +36,8 @@ pub const DEFAULT_FILE_TRANSFER_DIR_NAME: &str = "file-transfer"; #[cfg(test)] mod tests { use super::*; + use crate::mqtt_topics::EntityTopicId; + use crate::mqtt_topics::MqttSchema; use mqtt_channel::Topic; use regex::Regex; @@ -61,12 +62,11 @@ mod tests { Topic::new_unchecked("tedge/commands/res/software/update") ); assert_eq!( - RestartOperationRequest::topic(), - Topic::new_unchecked("tedge/commands/req/control/restart") - ); - assert_eq!( - RestartOperationResponse::topic(), - Topic::new_unchecked("tedge/commands/res/control/restart") + RestartCommand::new(EntityTopicId::default_main_device()) + .with_id("abc".to_string()) + .command_message(&MqttSchema::default()) + .topic, + Topic::new_unchecked("te/device/main///cmd/restart/abc") ); } @@ -75,7 +75,7 @@ mod tests { let request = SoftwareListRequest::new_with_id("1"); let expected_json = r#"{"id":"1"}"#; - let actual_json = request.to_json().expect("Failed to serialize"); + let actual_json = request.to_json(); assert_eq!(actual_json, expected_json); } @@ -161,7 +161,7 @@ mod tests { {"name":"m","url":"https://foobar.io/m.epl"} ]} ]}"#; - let actual_json = response.to_json().expect("Failed to serialize"); + let actual_json = response.to_json(); assert_eq!(actual_json, remove_whitespace(expected_json)); } @@ -245,7 +245,7 @@ mod tests { "reason": "Request_timed-out" }"#; - let actual_json = response.to_json().expect("Failed to serialize"); + let actual_json = response.to_json(); assert_eq!(actual_json, remove_whitespace(expected_json)); } @@ -348,7 +348,7 @@ mod tests { } ] }"#; - let actual_json = request.to_json().expect("Failed to serialize"); + let actual_json = request.to_json(); assert_eq!(actual_json, remove_whitespace(expected_json)); } @@ -423,7 +423,7 @@ mod tests { } ] }"#; - let actual_json = request.to_json().expect("Failed to serialize"); + let actual_json = request.to_json(); assert_eq!(actual_json, remove_whitespace(expected_json)); } @@ -495,7 +495,7 @@ mod tests { } ] }"#; - let actual_json = request.to_json().expect("Failed to serialize"); + let actual_json = request.to_json(); assert_eq!(actual_json, remove_whitespace(expected_json)); } @@ -611,7 +611,7 @@ mod tests { "status": "executing" }"#; - let actual_json = response.to_json().expect("Failed to serialize"); + let actual_json = response.to_json(); assert_eq!(actual_json, remove_whitespace(expected_json)); } @@ -708,7 +708,7 @@ mod tests { ] }"#; - let actual_json = response.to_json().expect("Failed to serialize"); + let actual_json = response.to_json(); assert_eq!(actual_json, remove_whitespace(expected_json)); } @@ -831,7 +831,7 @@ mod tests { ] }"#; - let actual_json = response.to_json().expect("Failed to serialize"); + let actual_json = response.to_json(); assert_eq!( remove_whitespace(&actual_json), remove_whitespace(expected_json) diff --git a/crates/core/tedge_api/src/messages.rs b/crates/core/tedge_api/src/messages.rs index 4f36925620d..2edb94eb2b0 100644 --- a/crates/core/tedge_api/src/messages.rs +++ b/crates/core/tedge_api/src/messages.rs @@ -1,6 +1,12 @@ use crate::error::SoftwareError; +use crate::mqtt_topics::Channel; +use crate::mqtt_topics::EntityTopicId; +use crate::mqtt_topics::MqttSchema; +use crate::mqtt_topics::OperationType; use crate::software::*; use download::DownloadInfo; +use mqtt_channel::Message; +use mqtt_channel::QoS; use mqtt_channel::Topic; use nanoid::nanoid; use serde::Deserialize; @@ -11,8 +17,6 @@ const SOFTWARE_LIST_REQUEST_TOPIC: &str = "tedge/commands/req/software/list"; const SOFTWARE_LIST_RESPONSE_TOPIC: &str = "tedge/commands/res/software/list"; const SOFTWARE_UPDATE_REQUEST_TOPIC: &str = "tedge/commands/req/software/update"; const SOFTWARE_UPDATE_RESPONSE_TOPIC: &str = "tedge/commands/res/software/update"; -const DEVICE_RESTART_REQUEST_TOPIC: &str = "tedge/commands/req/control/restart"; -const DEVICE_RESTART_RESPONSE_TOPIC: &str = "tedge/commands/res/control/restart"; /// All the messages are serialized using json. pub trait Jsonify<'a> @@ -27,12 +31,12 @@ where serde_json::from_slice(bytes) } - fn to_json(&self) -> Result { - serde_json::to_string(self) + fn to_json(&self) -> String { + serde_json::to_string(self).unwrap() // all thin-edge data can be serialized to json } - fn to_bytes(&self) -> Result, serde_json::Error> { - serde_json::to_vec(self) + fn to_bytes(&self) -> Vec { + serde_json::to_vec(self).unwrap() // all thin-edge data can be serialized to json } } @@ -473,65 +477,143 @@ impl From for Option { } } -#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] -pub enum RestartOperation { - Request(RestartOperationRequest), - Response(RestartOperationResponse), +/// Command to restart a device +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct RestartCommand { + pub target: EntityTopicId, + pub cmd_id: String, + pub payload: RestartCommandPayload, } -/// Message payload definition for restart operation request. -#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] -#[serde(deny_unknown_fields)] -#[serde(rename_all = "camelCase")] -pub struct RestartOperationRequest { - pub id: String, -} +impl RestartCommand { + /// Create a [RestartCommand] to restart a target device. + /// + /// - use a fresh cmd id + /// - set the status to [CommandStatus::Init] + pub fn new(target: EntityTopicId) -> Self { + let cmd_id = nanoid!(); + let payload = RestartCommandPayload::default(); + RestartCommand { + target, + cmd_id, + payload, + } + } -impl<'a> Jsonify<'a> for RestartOperationRequest {} + /// A new command with a given cmd id + pub fn with_id(self, cmd_id: String) -> Self { + Self { cmd_id, ..self } + } -impl Default for RestartOperationRequest { - fn default() -> RestartOperationRequest { - let id = nanoid!(); - RestartOperationRequest { id } + /// Return the RestartCommand received on a topic + pub fn try_from( + target: EntityTopicId, + cmd_id: String, + bytes: &[u8], + ) -> Result, serde_json::Error> { + if bytes.is_empty() { + Ok(None) + } else { + let payload = RestartCommandPayload::from_slice(bytes)?; + Ok(Some(RestartCommand { + target, + cmd_id, + payload, + })) + } } -} -impl RestartOperationRequest { - pub fn new_with_id(id: &str) -> RestartOperationRequest { - RestartOperationRequest { id: id.to_string() } + /// Return the current status of the command + pub fn status(&self) -> CommandStatus { + self.payload.status } - pub fn topic() -> Topic { - Topic::new_unchecked(DEVICE_RESTART_REQUEST_TOPIC) + /// Return the reason why this command failed + pub fn reason(&self) -> &str { + &self.payload.reason } -} -#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] -pub struct RestartOperationResponse { - pub id: String, - pub status: OperationStatus, -} + /// Set the status of the command + pub fn with_status(mut self, status: CommandStatus) -> Self { + self.payload.status = status; + self + } -impl<'a> Jsonify<'a> for RestartOperationResponse {} + /// Set the failure reason of the command + pub fn with_error(mut self, reason: String) -> Self { + self.payload.status = CommandStatus::Failed; + self.payload.reason = reason; + self + } -impl RestartOperationResponse { - pub fn new(req: &RestartOperationRequest) -> Self { - Self { - id: req.id.clone(), - status: OperationStatus::Executing, + /// Return the MQTT topic identifier of the target + fn topic_id(&self) -> &EntityTopicId { + &self.target + } + + /// Return the MQTT channel for this command + fn channel(&self) -> Channel { + Channel::Command { + operation: OperationType::Restart, + cmd_id: self.cmd_id.clone(), } } - pub fn with_status(self, status: OperationStatus) -> Self { - Self { status, ..self } + /// Return the MQTT topic for this command + fn topic(&self, schema: &MqttSchema) -> Topic { + schema.topic_for(self.topic_id(), &self.channel()) } - pub fn topic() -> Topic { - Topic::new_unchecked(DEVICE_RESTART_RESPONSE_TOPIC) + /// Return the MQTT message to register `restart` as a supported command on a given target device + pub fn capability_message(schema: &MqttSchema, target: &EntityTopicId) -> Message { + let meta_topic = schema.topic_for( + target, + &Channel::CommandMetadata { + operation: OperationType::Restart, + }, + ); + let payload = "{}"; + Message::new(&meta_topic, payload) + .with_retain() + .with_qos(QoS::AtLeastOnce) } - pub fn status(&self) -> OperationStatus { - self.status + /// Return the MQTT message for this command + pub fn command_message(&self, schema: &MqttSchema) -> Message { + let topic = self.topic(schema); + let payload = self.payload.to_bytes(); + Message::new(&topic, payload) + .with_qos(QoS::AtLeastOnce) + .with_retain() + } + + /// Return the MQTT message to clear this command + pub fn clearing_message(&self, schema: &MqttSchema) -> Message { + let topic = self.topic(schema); + Message::new(&topic, vec![]) + .with_qos(QoS::AtLeastOnce) + .with_retain() + } +} + +/// Command to restart a device +#[derive(Debug, Clone, Deserialize, Serialize, Eq, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct RestartCommandPayload { + pub status: CommandStatus, + + #[serde(default, skip_serializing_if = "String::is_empty")] + pub reason: String, +} + +impl<'a> Jsonify<'a> for RestartCommandPayload {} + +impl Default for RestartCommandPayload { + fn default() -> Self { + RestartCommandPayload { + status: CommandStatus::Init, + reason: String::new(), + } } } @@ -600,7 +682,7 @@ mod tests { }; let expected_json = r#"{"id":"1234"}"#; - let actual_json = request.to_json().expect("Failed to serialize"); + let actual_json = request.to_json(); assert_eq!(actual_json, expected_json); @@ -652,7 +734,7 @@ mod tests { let expected_json = r#"{"id":"1234","updateList":[{"type":"debian","modules":[{"name":"debian1","version":"0.0.1","action":"install"},{"name":"debian2","version":"0.0.2","action":"install"}]},{"type":"docker","modules":[{"name":"docker1","version":"0.0.1","url":"test.com","action":"remove"}]}]}"#; - let actual_json = request.to_json().expect("Fail to serialize the request"); + let actual_json = request.to_json(); assert_eq!(actual_json, expected_json); let parsed_request = @@ -672,7 +754,7 @@ mod tests { let expected_json = r#"{"id":"1234","status":"successful","currentSoftwareList":[]}"#; - let actual_json = request.to_json().expect("Fail to serialize the request"); + let actual_json = request.to_json(); assert_eq!(actual_json, expected_json); let parsed_request = SoftwareRequestResponse::from_json(&actual_json) @@ -705,7 +787,7 @@ mod tests { let expected_json = r#"{"id":"1234","status":"successful","currentSoftwareList":[{"type":"debian","modules":[{"name":"debian1","version":"0.0.1"}]}]}"#; - let actual_json = request.to_json().expect("Fail to serialize the request"); + let actual_json = request.to_json(); assert_eq!(actual_json, expected_json); let parsed_request = SoftwareRequestResponse::from_json(&actual_json) diff --git a/crates/core/tedge_api/src/topic.rs b/crates/core/tedge_api/src/topic.rs index f0cf3a02a86..40b0fd920e9 100644 --- a/crates/core/tedge_api/src/topic.rs +++ b/crates/core/tedge_api/src/topic.rs @@ -5,7 +5,6 @@ use std::convert::TryFrom; pub enum ResponseTopic { SoftwareListResponse, SoftwareUpdateResponse, - RestartResponse, } impl ResponseTopic { @@ -13,7 +12,6 @@ impl ResponseTopic { match self { Self::SoftwareListResponse => r#"tedge/commands/res/software/list"#, Self::SoftwareUpdateResponse => r#"tedge/commands/res/software/update"#, - Self::RestartResponse => r#"tedge/commands/res/control/restart"#, } } } @@ -25,7 +23,6 @@ impl TryFrom for ResponseTopic { match value.as_str() { r#"tedge/commands/res/software/list"# => Ok(ResponseTopic::SoftwareListResponse), r#"tedge/commands/res/software/update"# => Ok(ResponseTopic::SoftwareUpdateResponse), - r#"tedge/commands/res/control/restart"# => Ok(ResponseTopic::RestartResponse), err => Err(TopicError::UnknownTopic { topic: err.to_string(), }), @@ -45,7 +42,6 @@ impl TryFrom<&str> for ResponseTopic { pub enum RequestTopic { SoftwareListRequest, SoftwareUpdateRequest, - RestartRequest, } impl RequestTopic { @@ -53,7 +49,6 @@ impl RequestTopic { match self { Self::SoftwareListRequest => r#"tedge/commands/req/software/list"#, Self::SoftwareUpdateRequest => r#"tedge/commands/req/software/update"#, - Self::RestartRequest => r#"tedge/commands/req/control/restart"#, } } } diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index 99c475816a8..57b2d6785f3 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -6,7 +6,11 @@ use camino::Utf8PathBuf; use std::net::IpAddr; use std::path::Path; use std::path::PathBuf; +use tedge_api::mqtt_topics::ChannelFilter::Command; +use tedge_api::mqtt_topics::ChannelFilter::CommandMetadata; +use tedge_api::mqtt_topics::EntityFilter::AnyEntity; use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::OperationType; use tedge_api::topic::ResponseTopic; use tedge_api::DEFAULT_FILE_TRANSFER_DIR_NAME; use tedge_config::ConfigNotSet; @@ -98,6 +102,8 @@ impl C8yMapperConfig { let mut topics = Self::default_internal_topic_filter(&config_dir)?; // Add feature topic filters + topics.add_all(mqtt_schema.topics(AnyEntity, Command(OperationType::Restart))); + topics.add_all(mqtt_schema.topics(AnyEntity, CommandMetadata(OperationType::Restart))); if capabilities.log_management { topics.add_all(crate::log_upload::log_upload_topic_filter(&mqtt_schema)); } @@ -133,7 +139,6 @@ impl C8yMapperConfig { C8yTopic::SmartRestRequest.to_string().as_str(), ResponseTopic::SoftwareListResponse.as_str(), ResponseTopic::SoftwareUpdateResponse.as_str(), - ResponseTopic::RestartResponse.as_str(), ] .try_into() .expect("topics that mapper should subscribe to"); diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index c688dba6da1..4e575f17acc 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -58,6 +58,7 @@ use tedge_api::entity_store::EntityRegistrationMessage; use tedge_api::entity_store::EntityType; use tedge_api::event::error::ThinEdgeJsonDeserializerError; use tedge_api::event::ThinEdgeEvent; +use tedge_api::messages::CommandStatus; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; @@ -68,8 +69,7 @@ use tedge_api::DownloadInfo; use tedge_api::EntityStore; use tedge_api::Jsonify; use tedge_api::OperationStatus; -use tedge_api::RestartOperationRequest; -use tedge_api::RestartOperationResponse; +use tedge_api::RestartCommand; use tedge_api::SoftwareListRequest; use tedge_api::SoftwareListResponse; use tedge_api::SoftwareUpdateResponse; @@ -77,6 +77,7 @@ use tedge_config::TEdgeConfigError; use tedge_mqtt_ext::Message; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; +use tedge_utils::file::create_directory_with_defaults; use tedge_utils::file::create_file_with_defaults; use tedge_utils::size_threshold::SizeThreshold; use thiserror::Error; @@ -415,9 +416,7 @@ impl CumulocityConverter { "528" if device_id == self.device_name => { self.forward_software_request(payload).await } - "510" if device_id == self.device_name => { - Self::forward_restart_request(payload) - } + "510" => self.forward_restart_request(payload), template if device_id == self.device_name => { self.forward_operation_request(payload, template).await } @@ -473,16 +472,24 @@ impl CumulocityConverter { Ok(vec![Message::new( &topic, - software_update_request.to_json().unwrap(), + software_update_request.to_json(), )]) } - fn forward_restart_request(smartrest: &str) -> Result, CumulocityMapperError> { - let topic = Topic::new(RequestTopic::RestartRequest.as_str())?; - let _ = SmartRestRestartRequest::from_smartrest(smartrest)?; - - let request = RestartOperationRequest::default(); - Ok(vec![Message::new(&topic, request.to_json()?)]) + fn forward_restart_request( + &mut self, + smartrest: &str, + ) -> Result, CumulocityMapperError> { + let request = SmartRestRestartRequest::from_smartrest(smartrest)?; + let device_id = &request.device; + let target = self.entity_store.get_by_id(device_id).ok_or_else(|| { + CumulocityMapperError::UnknownDevice { + device_id: device_id.to_owned(), + } + })?; + let command = RestartCommand::new(target.topic_id.clone()); + let message = command.command_message(&self.mqtt_schema); + Ok(vec![message]) } async fn forward_operation_request( @@ -704,11 +711,14 @@ impl CumulocityConverter { } }; - let entity = self.entity_store.get(&source).unwrap(); - if let Some(message) = external_device_registration_message(entity) { - self.children - .insert(entity.entity_id.to_string(), Operations::default()); - registration_messages.push(message); + if let Some(entity) = self.entity_store.get(&source) { + if let Some(message) = external_device_registration_message(entity) { + self.children + .insert(entity.entity_id.to_string(), Operations::default()); + registration_messages.push(message); + } + } else { + error!("Cannot auto-register entity with non-standard MQTT identifier: {source}"); } } } @@ -732,6 +742,17 @@ impl CumulocityConverter { vec![] } + Channel::CommandMetadata { + operation: OperationType::Restart, + } => self.register_restart_operation(&source).await?, + Channel::Command { + operation: OperationType::Restart, + cmd_id, + } => { + self.publish_restart_operation_status(&source, cmd_id, message) + .await? + } + Channel::CommandMetadata { operation: OperationType::LogUpload, } => self.convert_log_metadata(&source, message)?, @@ -782,9 +803,6 @@ impl CumulocityConverter { ) .await?) } - Ok(MapperSubscribeTopic::ResponseTopic(ResponseTopic::RestartResponse)) => { - Ok(publish_restart_operation_status(message.payload_str()?).await?) - } Ok(MapperSubscribeTopic::C8yTopic(_)) => self.parse_c8y_topics(message).await, _ => { error!("Unsupported topic: {}", message.topic.name); @@ -890,7 +908,7 @@ fn create_device_data_fragments( fn create_get_software_list_message() -> Result { let request = SoftwareListRequest::default(); let topic = Topic::new(RequestTopic::SoftwareListRequest.as_str())?; - let payload = request.to_json().unwrap(); + let payload = request.to_json(); Ok(Message::new(&topic, payload)) } @@ -969,35 +987,94 @@ fn create_inventory_fragments_message( Ok(Message::new(&topic, ops_msg.to_string())) } -async fn publish_restart_operation_status( - json_response: &str, -) -> Result, CumulocityMapperError> { - let response = RestartOperationResponse::from_json(json_response)?; - let topic = C8yTopic::SmartRestResponse.to_topic()?; +impl CumulocityConverter { + async fn register_restart_operation( + &self, + target: &EntityTopicId, + ) -> Result, ConversionError> { + match self.entity_store.get(target) { + None => { + error!("Fail to register `restart` operation for unknown device: {target}"); + Ok(vec![]) + } + Some(device) => { + let ops_dir = match device.r#type { + EntityType::MainDevice => self.ops_dir.clone(), + EntityType::ChildDevice => self.ops_dir.clone().join(&device.entity_id), + EntityType::Service => { + error!("Unsupported `restart` operation for a service: {target}"); + return Ok(vec![]); + } + }; + let ops_file = ops_dir.join("c8y_Restart"); + create_directory_with_defaults(&ops_dir)?; + create_file_with_defaults(ops_file, None)?; + let device_operations = create_supported_operations(&ops_dir)?; + Ok(vec![device_operations]) + } + } + } - match response.status() { - OperationStatus::Executing => { - let smartrest_set_operation = SmartRestSetOperationToExecuting::new( - CumulocitySupportedOperations::C8yRestartRequest, - ) - .to_smartrest()?; + async fn publish_restart_operation_status( + &mut self, + target: &EntityTopicId, + cmd_id: &str, + message: &Message, + ) -> Result, CumulocityMapperError> { + let command = match RestartCommand::try_from( + target.clone(), + cmd_id.to_owned(), + message.payload_bytes(), + )? { + Some(command) => command, + None => { + // The command has been fully processed + return Ok(vec![]); + } + }; + let topic = self + .entity_store + .get(target) + .and_then(C8yTopic::smartrest_response_topic) + .ok_or_else(|| CumulocityMapperError::UnregisteredDevice { + topic_id: target.to_string(), + })?; - Ok(vec![Message::new(&topic, smartrest_set_operation)]) - } - OperationStatus::Successful => { - let smartrest_set_operation = SmartRestSetOperationToSuccessful::new( - CumulocitySupportedOperations::C8yRestartRequest, - ) - .to_smartrest()?; - Ok(vec![Message::new(&topic, smartrest_set_operation)]) - } - OperationStatus::Failed => { - let smartrest_set_operation = SmartRestSetOperationToFailed::new( - CumulocitySupportedOperations::C8yRestartRequest, - "Restart Failed".into(), - ) - .to_smartrest()?; - Ok(vec![Message::new(&topic, smartrest_set_operation)]) + match command.status() { + CommandStatus::Executing => { + let smartrest_set_operation = SmartRestSetOperationToExecuting::new( + CumulocitySupportedOperations::C8yRestartRequest, + ) + .to_smartrest()?; + + Ok(vec![Message::new(&topic, smartrest_set_operation)]) + } + CommandStatus::Successful => { + let smartrest_set_operation = SmartRestSetOperationToSuccessful::new( + CumulocitySupportedOperations::C8yRestartRequest, + ) + .to_smartrest()?; + + Ok(vec![ + command.clearing_message(&self.mqtt_schema), + Message::new(&topic, smartrest_set_operation), + ]) + } + CommandStatus::Failed => { + let smartrest_set_operation = SmartRestSetOperationToFailed::new( + CumulocitySupportedOperations::C8yRestartRequest, + format!("Restart Failed: {}", command.reason()), + ) + .to_smartrest()?; + Ok(vec![ + command.clearing_message(&self.mqtt_schema), + Message::new(&topic, smartrest_set_operation), + ]) + } + _ => { + // The other states are ignored + Ok(vec![]) + } } } } @@ -1119,7 +1196,6 @@ fn get_inventory_fragments( async fn create_tedge_agent_supported_ops(ops_dir: PathBuf) -> Result<(), ConversionError> { create_file_with_defaults(ops_dir.join("c8y_SoftwareUpdate"), None)?; - create_file_with_defaults(ops_dir.join("c8y_Restart"), None)?; Ok(()) } diff --git a/crates/extensions/c8y_mapper_ext/src/log_upload.rs b/crates/extensions/c8y_mapper_ext/src/log_upload.rs index 3fe713d6b18..9c1bfe8baa9 100644 --- a/crates/extensions/c8y_mapper_ext/src/log_upload.rs +++ b/crates/extensions/c8y_mapper_ext/src/log_upload.rs @@ -82,7 +82,7 @@ impl CumulocityConverter { }; // Command messages must be retained - Ok(vec![Message::new(&topic, request.to_json()?).with_retain()]) + Ok(vec![Message::new(&topic, request.to_json()).with_retain()]) } /// Address a received log_upload command. If its status is diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index 452bb97e66d..669964676ff 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -885,6 +885,23 @@ async fn mapper_dynamically_updates_supported_operations_for_tedge_device() { .expect("Send failed"); // Expect smartrest message on `c8y/s/us` with expected payload "114,c8y_TestOp1,c8y_TestOp2,c8y_TestOp3". + assert_received_contains_str( + &mut mqtt, + [( + "c8y/s/us", + "114,c8y_SoftwareUpdate,c8y_TestOp1,c8y_TestOp2,c8y_TestOp3", + )], + ) + .await; + + // Then the agent start adding it's own set of capabilities + mqtt.send( + MqttMessage::new(&Topic::new_unchecked("te/device/main///cmd/restart"), "{}").with_retain(), + ) + .await + .expect("Send failed"); + + // Expect an update list of capabilities with agent capabilities assert_received_contains_str( &mut mqtt, [( @@ -933,6 +950,41 @@ async fn mapper_dynamically_updates_supported_operations_for_child_device() { )], ) .await; + + // Then the agent start on the child device adding it's own set of capabilities + mqtt.send( + MqttMessage::new( + &Topic::new_unchecked("te/device/child1///cmd/restart"), + "{}", + ) + .with_retain(), + ) + .await + .expect("Send failed"); + + // Being new, the child device is first registered on the bus + let child_metadata = mqtt.recv().await.unwrap(); + assert_eq!(child_metadata.topic.name, "te/device/child1//"); + assert_eq!( + child_metadata.payload_str().unwrap(), + r#"{ "@type":"child-device", "@id":"child1"}"# + ); + let child_c8y_registration = mqtt.recv().await.unwrap(); + assert_eq!(child_c8y_registration.topic.name, "c8y/s/us"); + assert_eq!( + child_c8y_registration.payload_str().unwrap(), + "101,child1,child1,thin-edge.io-child" + ); + + // Expect an update list of capabilities with agent capabilities + assert_received_contains_str( + &mut mqtt, + [( + "c8y/s/us/child1", + "114,c8y_ChildTestOp1,c8y_ChildTestOp2,c8y_ChildTestOp3,c8y_Restart", + )], + ) + .await; } #[tokio::test] diff --git a/crates/extensions/tedge_log_manager/src/actor.rs b/crates/extensions/tedge_log_manager/src/actor.rs index 9aa804369d5..0a0d0e1cdf4 100644 --- a/crates/extensions/tedge_log_manager/src/actor.rs +++ b/crates/extensions/tedge_log_manager/src/actor.rs @@ -240,11 +240,8 @@ impl LogManagerActor { topic: &Topic, request: &LogUploadCmdPayload, ) -> Result<(), ChannelError> { - match request_into_message(topic, request) { - Ok(message) => self.mqtt_publisher.send(message).await?, - Err(err) => error!("Fail to build a message for {:?}: {err}", request), - } - Ok(()) + let message = request_into_message(topic, request); + self.mqtt_publisher.send(message).await } } @@ -260,9 +257,6 @@ fn request_from_message( } } -fn request_into_message( - topic: &Topic, - request: &LogUploadCmdPayload, -) -> Result { - Ok(MqttMessage::new(topic, request.to_json()?).with_retain()) +fn request_into_message(topic: &Topic, request: &LogUploadCmdPayload) -> MqttMessage { + MqttMessage::new(topic, request.to_json()).with_retain() } diff --git a/tests/RobotFramework/libraries/ThinEdgeIO/ThinEdgeIO.py b/tests/RobotFramework/libraries/ThinEdgeIO/ThinEdgeIO.py index 8c09a924e9f..5b4b8e29e18 100644 --- a/tests/RobotFramework/libraries/ThinEdgeIO/ThinEdgeIO.py +++ b/tests/RobotFramework/libraries/ThinEdgeIO/ThinEdgeIO.py @@ -587,6 +587,35 @@ def register_child( device.assert_command(" && ".join(cmd)) + @keyword("Set Restart Command") + def set_restart_command(self, command: str, **kwargs): + """Set the restart command used by thin-edge.io to restart the device. + + *Examples:* + | `Set Restart Command` | ["/sbin/reboot"] | + | `Set Restart Command` | ["/usr/bin/on_shutdown.sh", "300"] | + """ + self.execute_command( + f"sed -i 's|reboot =.*|reboot = {command}|g' /etc/tedge/system.toml", + **kwargs, + ) + + @keyword("Set Restart Timeout") + def set_restart_timeout(self, value: Union[str,int], **kwargs): + """Set the restart timeout interval in seconds for how long thin-edge.io + should wait to for a device restart to happen. + + Use a value of "default" if you want to revert to the default thin-edge.io timeout setting. + + *Examples:* + | `Set Restart Timeout` | 60 | + | `Set Restart Timeout` | default | + """ + if str(value).lower() == 'default': + command = "sed -i -e '/reboot_timeout_seconds/d' /etc/tedge/system.toml" + else: + command = f"sed -i -e '/reboot_timeout_seconds/d' -e '/reboot =/a reboot_timeout_seconds = {value}' /etc/tedge/system.toml", + self.execute_command(command, **kwargs,) def to_date(value: relativetime_) -> datetime: if isinstance(value, datetime): diff --git a/tests/RobotFramework/tests/cumulocity/restart/on_shutdown.sh b/tests/RobotFramework/tests/cumulocity/restart/on_shutdown.sh index df36a12ba27..6a2e03cccbb 100755 --- a/tests/RobotFramework/tests/cumulocity/restart/on_shutdown.sh +++ b/tests/RobotFramework/tests/cumulocity/restart/on_shutdown.sh @@ -1,9 +1,27 @@ #!/bin/sh set -e -tedge mqtt pub --qos 0 tedge/events/boot_event "$(printf '{"text": "Warning device is about to reboot!", "type": "device_boot"}')" 2>/dev/null -sleep 5 -if command -v sudo >/dev/null 2>&1; then - sudo shutdown -r now +TARGET="$(tedge config get mqtt.topic_root)/$(tedge config get mqtt.device_topic_id)" +tedge mqtt pub --qos 0 "$TARGET/e/device_boot" '{"text": "Warning device is about to reboot!"}' 2>/dev/null + +# Configurable delay before rebooting +DELAY=1 +if [ $# -gt 0 ] && [ "$1" -gt 0 ]; then + DELAY="$1" +fi + +echo "Sleeping $DELAY seconds" +sleep "$DELAY" + +use_sudo() { + command -v sudo >/dev/null 2>&1 && [ "$(id -u)" != "0" ] +} + +# Note: Delaying the shutdown using 'shutdown -r +1' does not work when using systemd inside a container +REBOOT="shutdown -r now" + +if use_sudo; then + # shellcheck disable=SC2086 + sudo $REBOOT else - shutdown -r now + $REBOOT fi diff --git a/tests/RobotFramework/tests/cumulocity/restart/on_shutdown_no_reboot.sh b/tests/RobotFramework/tests/cumulocity/restart/on_shutdown_no_reboot.sh new file mode 100755 index 00000000000..c6c3f8c64d7 --- /dev/null +++ b/tests/RobotFramework/tests/cumulocity/restart/on_shutdown_no_reboot.sh @@ -0,0 +1,11 @@ +#!/bin/sh +set -e + +# Configurable delay before rebooting +DELAY=1 +if [ $# -gt 0 ] && [ "$1" -gt 0 ]; then + DELAY="$1" +fi + +echo "Sleeping $DELAY seconds" +sleep "$DELAY" diff --git a/tests/RobotFramework/tests/cumulocity/restart/on_shutdown_signal_interrupt.sh b/tests/RobotFramework/tests/cumulocity/restart/on_shutdown_signal_interrupt.sh new file mode 100755 index 00000000000..bd0a2d74c12 --- /dev/null +++ b/tests/RobotFramework/tests/cumulocity/restart/on_shutdown_signal_interrupt.sh @@ -0,0 +1,19 @@ +#!/bin/bash +set -e + +# Simulate the script being stopped via a signal +{ + sleep 2 + echo "Simulating SIGTERM interrupt" + kill -SIGTERM $$ +} & + +# Configurable delay before rebooting +DELAY=30 +if [ $# -gt 0 ] && [ "$1" -gt 0 ]; then + DELAY="$1" +fi + +echo "Sleeping $DELAY seconds" +sleep "$DELAY" +reboot diff --git a/tests/RobotFramework/tests/cumulocity/restart/on_startup.sh b/tests/RobotFramework/tests/cumulocity/restart/on_startup.sh index ce81a5ce6dc..f917df68e17 100755 --- a/tests/RobotFramework/tests/cumulocity/restart/on_startup.sh +++ b/tests/RobotFramework/tests/cumulocity/restart/on_startup.sh @@ -1,10 +1,12 @@ #!/bin/sh set -e +TARGET="$(tedge config get mqtt.topic_root)/$(tedge config get mqtt.device_topic_id)" + # Wait for publish to be successful as the mosquitto client can take a while to start while true do - if ! tedge mqtt pub --retain --qos 0 tedge/events/boot_event "$(printf '{"text": "device booted up 🟢 %s", "type": "device_boot"}' "$(uname -a)")" 2>/dev/null + if ! tedge mqtt pub --retain --qos 0 "$TARGET/e/device_boot" "$(printf '{"text": "device booted up 🟢 %s"}' "$(uname -a)")" 2>/dev/null then sleep 1 else diff --git a/tests/RobotFramework/tests/cumulocity/restart/restart_device.robot b/tests/RobotFramework/tests/cumulocity/restart/restart_device.robot index bb775d6890e..281ddaaf01c 100644 --- a/tests/RobotFramework/tests/cumulocity/restart/restart_device.robot +++ b/tests/RobotFramework/tests/cumulocity/restart/restart_device.robot @@ -2,7 +2,6 @@ Resource ../../../resources/common.resource Library Cumulocity Library ThinEdgeIO -Library DebugLibrary Test Tags theme:c8y theme:troubleshooting Test Setup Custom Setup @@ -12,6 +11,7 @@ Test Teardown Custom Teardown Supports restarting the device [Documentation] Use a longer timeout period to allow the device time to restart and allow the initial token fetching process to fail at least one (due to the 60 seconds retry window) + Cumulocity.Should Contain Supported Operations c8y_Restart ${operation}= Cumulocity.Restart Device Operation Should Be SUCCESSFUL ${operation} timeout=180 @@ -37,9 +37,9 @@ Custom Setup Device Should Exist ${DEVICE_SN} Transfer To Device ${CURDIR}/*.sh /usr/bin/ Transfer To Device ${CURDIR}/*.service /etc/systemd/system/ - Execute Command apt-get install -y systemd-sysv && chmod a+x /usr/bin/*.sh && chmod 644 /etc/systemd/system/*.service && systemctl enable on_startup.service + Execute Command chmod a+x /usr/bin/*.sh && chmod 644 /etc/systemd/system/*.service && systemctl enable on_startup.service Execute Command cmd=echo 'tedge ALL = (ALL) NOPASSWD: /usr/bin/tedge, /etc/tedge/sm-plugins/[a-zA-Z0-9]*, /bin/sync, /sbin/init, /sbin/shutdown, /usr/bin/on_shutdown.sh' > /etc/sudoers.d/tedge - Execute Command cmd=sed -i 's|reboot =.*|reboot = ["/usr/bin/on_shutdown.sh"]|g' /etc/tedge/system.toml + Set Restart Command ["/usr/bin/on_shutdown.sh"] Custom Teardown # Restore sudo in case if the tests are run on a device (and not in a container) diff --git a/tests/RobotFramework/tests/cumulocity/restart/restart_device_child.robot b/tests/RobotFramework/tests/cumulocity/restart/restart_device_child.robot new file mode 100644 index 00000000000..9ce1e0d27dd --- /dev/null +++ b/tests/RobotFramework/tests/cumulocity/restart/restart_device_child.robot @@ -0,0 +1,110 @@ +*** Settings *** +Resource ../../../resources/common.resource +Library Cumulocity +Library ThinEdgeIO + +Suite Setup Custom Setup +Test Teardown Test Teardown + +Test Tags theme:c8y theme:troubleshooting theme:childdevices + + +*** Test Cases *** +Support restarting the device + Set Restart Command ["/usr/bin/on_shutdown.sh", "1"] + Set Restart Timeout default + Cumulocity.Should Contain Supported Operations c8y_Restart + ${operation}= Cumulocity.Restart Device + Operation Should Be SUCCESSFUL ${operation} timeout=180 + +Restart operation should be set to failed when an non-existent command is configured + Set Restart Command ["/usr/bin/on_shutdown_does_not_exist.sh"] + Set Restart Timeout 60 + ${operation}= Cumulocity.Restart Device + Operation Should Be FAILED ${operation} failure_reason=Restart Failed: Fail to trigger a restart.* timeout=30 + +Restart operation should be set to failed when command is not allowed by sudo + Set Restart Command ["/sbin/reboot"] + Set Restart Timeout 60 + ${operation}= Cumulocity.Restart Device + Operation Should Be FAILED ${operation} failure_reason=Restart Failed: Fail to trigger a restart.* timeout=30 + +Restart operation should be set to failed when the command does not restart the device + Set Restart Command ["/usr/bin/on_shutdown_no_reboot.sh"] + Set Restart Timeout 30 + ${operation}= Cumulocity.Restart Device + Operation Should Be FAILED ${operation} failure_reason=Restart Failed: No shutdown has been triggered.* timeout=60 + +Restart operation should be set to failed if the restart command times out + [Documentation] tedge should protect against commands which don't finish within a given time period (protect against hanging scripts) + Set Restart Command ["/usr/bin/on_shutdown.sh", "300"] + Set Restart Timeout 5 + ${operation}= Cumulocity.Restart Device + Operation Should Be FAILED ${operation} failure_reason=Restart Failed: Restart command still running after 5 seconds timeout=30 + +Restart operation should be set to failed when the command has been killed by a signal + [Documentation] If the restart command is killed, assume it did trigger a restart, and wait for the restart. Only fail the + ... the operation if the "wait for restart" logic does not detect a restart. This is because sometimes a shutdown + ... can trigger the script to be killed before it has a chance to exit successfully. + Set Restart Command ["/usr/bin/on_shutdown_signal_interrupt.sh"] + Set Restart Timeout 30 + ${operation}= Cumulocity.Restart Device + Operation Should Be FAILED ${operation} failure_reason=Restart Failed: No shutdown has been triggered timeout=60 + +Default restart timeout supports the default 60 second delay of the linux shutdown command + [Documentation] The shutdown -r command performs a device restart 60 seconds after being called. thin-edge.io should + ... support this setting out of the box + Set Restart Command ["shutdown", "-r"] + Set Restart Timeout default + ${operation}= Cumulocity.Restart Device + Operation Should Be SUCCESSFUL ${operation} timeout=180 + +*** Keywords *** +Setup Child Device + ThinEdgeIO.Set Device Context ${CHILD_SN} + Execute Command sudo dpkg -i packages/tedge_*.deb + + Execute Command sudo tedge config set mqtt.client.host ${PARENT_IP} + Execute Command sudo tedge config set mqtt.client.port 1883 + Execute Command sudo tedge config set mqtt.topic_root te + Execute Command sudo tedge config set mqtt.device_topic_id "device/${CHILD_SN}//" + + # Install plugin after the default settings have been updated to prevent it from starting up as the main plugin + Execute Command sudo dpkg -i packages/tedge-agent*.deb + Execute Command sudo systemctl enable tedge-agent + Execute Command sudo systemctl start tedge-agent + + Transfer To Device ${CURDIR}/*.sh /usr/bin/ + Transfer To Device ${CURDIR}/*.service /etc/systemd/system/ + Execute Command cmd=chmod a+x /usr/bin/*.sh && chmod 644 /etc/systemd/system/*.service && systemctl enable on_startup.service + Execute Command cmd=echo 'tedge ALL = (ALL) NOPASSWD: /usr/bin/tedge, /etc/tedge/sm-plugins/[a-zA-Z0-9]*, /bin/sync, /sbin/init, /sbin/shutdown, /usr/bin/on_shutdown.sh, /usr/bin/on_shutdown_no_reboot.sh, /usr/bin/on_shutdown_does_not_exist.sh, /usr/bin/on_shutdown_does_not_exist.sh, /usr/bin/on_shutdown_signal_interrupt.sh, !/sbin/reboot' > /etc/sudoers.d/tedge + Set Restart Command ["/usr/bin/on_shutdown.sh"] + Set Restart Timeout default + + # WORKAROUND: Uncomment next line once https://github.com/thin-edge/thin-edge.io/issues/2253 has been resolved + # ThinEdgeIO.Service Health Status Should Be Up tedge-agent device=${CHILD_SN} + +Custom Setup + # Parent + ${parent_sn}= Setup skip_bootstrap=${True} + Set Suite Variable $PARENT_SN ${parent_sn} + Execute Command test -f ./bootstrap.sh && ./bootstrap.sh --no-connect || true + + ${parent_ip}= Get IP Address + Set Suite Variable $PARENT_IP ${parent_ip} + Execute Command sudo tedge config set c8y.enable.log_management true + Execute Command sudo tedge config set mqtt.external.bind.address ${PARENT_IP} + Execute Command sudo tedge config set mqtt.external.bind.port 1883 + + ThinEdgeIO.Connect Mapper c8y + ThinEdgeIO.Service Health Status Should Be Up tedge-mapper-c8y + + # Child + ${child_sn}= Setup skip_bootstrap=${True} + Set Suite Variable $CHILD_SN ${child_sn} + Setup Child Device + Cumulocity.Device Should Exist ${CHILD_SN} + +Test Teardown + Get Logs name=${PARENT_SN} + Get Logs name=${CHILD_SN} diff --git a/tests/images/debian-systemd/debian-systemd.dockerfile b/tests/images/debian-systemd/debian-systemd.dockerfile index 7306571dc0d..f2abed35936 100644 --- a/tests/images/debian-systemd/debian-systemd.dockerfile +++ b/tests/images/debian-systemd/debian-systemd.dockerfile @@ -10,6 +10,8 @@ RUN apt-get -y update \ apt-transport-https \ ca-certificates \ systemd \ + dbus \ + systemd-sysv \ ssh \ mosquitto \ mosquitto-clients \