diff --git a/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs b/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs index ed583acc413..1bb17f17fa5 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs @@ -1,3 +1,4 @@ +use super::error::OperationError; use super::EntityTarget; use super::OperationContext; use super::OperationResult; @@ -6,7 +7,6 @@ use crate::error::ConversionError; use crate::error::CumulocityMapperError; use anyhow::Context; use c8y_api::json_c8y_deserializer::C8yUploadConfigFile; -use c8y_api::smartrest::smartrest_serializer::fail_operation; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; use camino::Utf8PathBuf; use std::borrow::Cow; @@ -92,7 +92,7 @@ impl OperationContext { entity: &EntityTarget, cmd_id: &str, message: &MqttMessage, - ) -> Result { + ) -> Result { if !self.capabilities.config_snapshot { warn!( "Received a config_snapshot command, however, config_snapshot feature is disabled" @@ -105,7 +105,9 @@ impl OperationContext { target.topic_id.clone(), cmd_id.into(), message.payload_bytes(), - )? { + ) + .context("Could not parse command as a config snapshot command")? + { Some(command) => command, None => { // The command has been fully processed @@ -150,28 +152,14 @@ impl OperationContext { .downloader .clone() .await_response((cmd_id.to_string(), download_request)) - .await?; - - match download_result { - Err(err) => { - let smartrest_error = - fail_operation( - CumulocitySupportedOperations::C8yUploadConfigFile, - &format!("tedge-mapper-c8y failed to download configuration snapshot from file-transfer service: {err}"), - ); - - let c8y_notification = MqttMessage::new(smartrest_topic, smartrest_error); + .await + .context("Unexpected ChannelError")?; - return Ok(OperationResult::Finished { - messages: vec![c8y_notification], - - }); - } - Ok(download) => download, - }; + download_result.context( "tedge-mapper-c8y failed to download configuration snapshot from file-transfer service")?; - let file_path = - Utf8PathBuf::try_from(destination_path).map_err(|e| e.into_io_error())?; + let file_path = Utf8PathBuf::try_from(destination_path) + .map_err(|e| e.into_io_error()) + .context("Could not parse destination path as utf-8")?; let event_type = command.payload.config_type.clone(); // Upload the file to C8y @@ -185,7 +173,8 @@ impl OperationContext { event_type, None, ) - .await?; + .await + .context("Could not upload config file to C8y")?; let smartrest_response = super::get_smartrest_response_for_upload_result( upload_result, @@ -199,17 +188,7 @@ impl OperationContext { messages: vec![c8y_notification], }) } - CommandStatus::Failed { reason } => { - let smartrest_operation_status = - fail_operation(CumulocitySupportedOperations::C8yUploadConfigFile, &reason); - let c8y_notification = - MqttMessage::new(smartrest_topic, smartrest_operation_status); - - Ok(OperationResult::Finished { - messages: vec![c8y_notification], - - }) - } + CommandStatus::Failed { reason } => Err(anyhow::anyhow!(reason).into()), _ => { // Do nothing as other components might handle those states Ok(OperationResult::Ignored) diff --git a/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs b/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs index 1219ab007ad..f11add4f4ee 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs @@ -1,8 +1,8 @@ use crate::converter::CumulocityConverter; use crate::error::ConversionError; use crate::error::CumulocityMapperError; +use anyhow::Context; use c8y_api::json_c8y_deserializer::C8yDownloadConfigFile; -use c8y_api::smartrest::smartrest_serializer::fail_operation; use c8y_api::smartrest::smartrest_serializer::succeed_operation_no_payload; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; use std::sync::Arc; @@ -22,6 +22,7 @@ use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::TopicFilter; use tracing::log::warn; +use super::error::OperationError; use super::EntityTarget; use super::OperationContext; use super::OperationResult; @@ -51,7 +52,7 @@ impl OperationContext { target: &EntityTarget, cmd_id: &str, message: &MqttMessage, - ) -> Result { + ) -> Result { if !self.capabilities.config_update { warn!("Received a config_update command, however, config_update feature is disabled"); return Ok(OperationResult::Ignored); @@ -61,7 +62,9 @@ impl OperationContext { target.topic_id.clone(), cmd_id.into(), message.payload_bytes(), - )? { + ) + .context("Could not parse command as a config update command")? + { Some(command) => command, None => { // The command has been fully processed @@ -83,17 +86,7 @@ impl OperationContext { messages: vec![c8y_notification], }) } - CommandStatus::Failed { reason } => { - let smartrest_operation_status = fail_operation( - CumulocitySupportedOperations::C8yDownloadConfigFile, - &reason, - ); - let c8y_notification = MqttMessage::new(sm_topic, smartrest_operation_status); - - Ok(OperationResult::Finished { - messages: vec![c8y_notification], - }) - } + CommandStatus::Failed { reason } => Err(anyhow::anyhow!(reason).into()), _ => { Ok(OperationResult::Ignored) // Do nothing as other components might handle those states } diff --git a/crates/extensions/c8y_mapper_ext/src/operations/error.rs b/crates/extensions/c8y_mapper_ext/src/operations/error.rs new file mode 100644 index 00000000000..49790b694fb --- /dev/null +++ b/crates/extensions/c8y_mapper_ext/src/operations/error.rs @@ -0,0 +1,58 @@ +use std::error::Error; +use std::fmt::Display; + +/// An error type for operation handlers. +/// +/// This error type wraps an [`anyhow::Error`] and provides a display implementation that assumes +/// that source errors print their own causes after `:` character. This is in order to ensure that +/// failures are printed properly in Cumulocity web interface. +#[derive(Debug)] +pub(crate) struct OperationError(anyhow::Error); + +impl Display for OperationError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f)?; + + if let Some(source) = self.0.source() { + write!(f, ": {}", source)?; + } + + Ok(()) + } +} + +impl Error for OperationError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + self.0.source() + } +} + +impl From for OperationError { + fn from(error: anyhow::Error) -> Self { + Self(error) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use anyhow::Context; + + #[test] + fn separates_error_levels_on_a_single_line() { + let example_io_err: Result<(), std::io::Error> = Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Example io error", + )); + + let operation_error: OperationError = example_io_err + .context("Could not perform io operation") + .unwrap_err() + .into(); + + assert_eq!( + &operation_error.to_string(), + "Could not perform io operation: Example io error" + ); + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs b/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs index 6b9e2dd69c4..56921721416 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs @@ -1,11 +1,12 @@ +use super::error::OperationError; use super::EntityTarget; use super::OperationContext; use super::OperationResult; use crate::converter::CumulocityConverter; use crate::error::ConversionError; use crate::error::CumulocityMapperError; +use anyhow::Context; use c8y_api::json_c8y_deserializer::C8yFirmware; -use c8y_api::smartrest::smartrest_serializer::fail_operation; use c8y_api::smartrest::smartrest_serializer::succeed_operation_no_payload; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; use tedge_api::commands::FirmwareInfo; @@ -87,7 +88,7 @@ impl OperationContext { target: &EntityTarget, cmd_id: &str, message: &MqttMessage, - ) -> Result { + ) -> Result { if !self.capabilities.firmware_update { warn!( "Received a firmware_update command, however, firmware_update feature is disabled" @@ -99,7 +100,9 @@ impl OperationContext { target.topic_id.clone(), cmd_id.into(), message.payload_bytes(), - )? { + ) + .context("Could not parse command as a firmware update command")? + { Some(command) => command, None => { // The command has been fully processed @@ -138,15 +141,7 @@ impl OperationContext { messages: vec![c8y_notification, twin_metadata], }) } - CommandStatus::Failed { reason } => { - let smartrest_operation_status = - fail_operation(CumulocitySupportedOperations::C8yFirmware, &reason); - let c8y_notification = MqttMessage::new(sm_topic, smartrest_operation_status); - - Ok(OperationResult::Finished { - messages: vec![c8y_notification], - }) - } + CommandStatus::Failed { reason } => Err(anyhow::anyhow!(reason).into()), _ => Ok(OperationResult::Ignored), } } diff --git a/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs b/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs index c77d62653b8..79f2d487f40 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs @@ -1,3 +1,4 @@ +use super::error::OperationError; use super::EntityTarget; use super::OperationContext; use crate::converter::CumulocityConverter; @@ -6,7 +7,6 @@ use crate::error::CumulocityMapperError; use crate::operations::OperationResult; use anyhow::Context; use c8y_api::json_c8y_deserializer::C8yLogfileRequest; -use c8y_api::smartrest::smartrest_serializer::fail_operation; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; use camino::Utf8PathBuf; use tedge_api::commands::CommandStatus; @@ -90,7 +90,7 @@ impl OperationContext { target: &EntityTarget, cmd_id: &str, message: &MqttMessage, - ) -> Result { + ) -> Result { if !self.capabilities.log_upload { warn!("Received a log_upload command, however, log_upload feature is disabled"); return Ok(OperationResult::Ignored); @@ -100,7 +100,9 @@ impl OperationContext { target.topic_id.clone(), cmd_id.into(), message.payload_bytes(), - )? { + ) + .context("Could not parse command as a log upload command")? + { Some(command) => command, None => { // The command has been fully processed @@ -128,30 +130,17 @@ impl OperationContext { .clone() .await_response((cmd_id.into(), download_request)) .await - .map_err(CumulocityMapperError::ChannelError)?; - - let download_response = match download_result { - Err(err) => { - let smartrest_error = fail_operation( - CumulocitySupportedOperations::C8yLogFileRequest, - &format!( - "tedge-mapper-c8y failed to download log from file transfer service: {err}", - ), - ); - - let c8y_notification = MqttMessage::new(smartrest_topic, smartrest_error); - return Ok(OperationResult::Finished { - messages: vec![c8y_notification], - - }); - } - Ok(download) => download, - }; + .context("Unexpected ChannelError")?; + + let download_response = download_result.context( + "tedge-mapper-c8y failed to download log from file transfer service", + )?; let file_path = Utf8PathBuf::try_from(download_response.file_path) - .map_err(|e| e.into_io_error())?; - let response = &LogUploadCmdPayload::from_json(message.payload_str()?)?; - let event_type = response.log_type.clone(); + .map_err(|e| e.into_io_error()) + .context("Could not parse file path as Utf-8")?; + + let event_type = &command.payload.log_type; let (binary_upload_event_url, upload_result) = self .upload_file( @@ -163,7 +152,8 @@ impl OperationContext { event_type.clone(), None, ) - .await?; + .await + .context("Could not upload log file to C8y")?; let smartrest_response = super::get_smartrest_response_for_upload_result( upload_result, @@ -179,22 +169,14 @@ impl OperationContext { &OperationType::LogUpload, &command.clone().into_generic_command(&self.mqtt_schema), ) - .await?; + .await + .context("Could not upload operation log")?; Ok(OperationResult::Finished { messages: vec![c8y_notification], }) } - CommandStatus::Failed { reason } => { - let smartrest_operation_status = - fail_operation(CumulocitySupportedOperations::C8yLogFileRequest, &reason); - let c8y_notification = - MqttMessage::new(smartrest_topic, smartrest_operation_status); - Ok(OperationResult::Finished { - messages: vec![c8y_notification], - - }) - } + CommandStatus::Failed { reason } => Err(anyhow::anyhow!(reason).into()), _ => { // Do nothing as other components might handle those states Ok(OperationResult::Ignored) diff --git a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs index 3c4981f2aff..e3f3e18d017 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs @@ -30,6 +30,7 @@ use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; use c8y_auth_proxy::url::ProxyUrlGenerator; use c8y_http_proxy::handle::C8YHttpProxy; use camino::Utf8Path; +use error::OperationError; use std::collections::HashMap; use std::sync::Arc; use tedge_actors::ClientMessageBox; @@ -54,6 +55,7 @@ use tracing::error; pub mod config_snapshot; pub mod config_update; +mod error; pub mod firmware_update; pub mod log_upload; mod restart; @@ -227,47 +229,70 @@ impl RunningOperation { } }; - let Some(c8y_operation) = to_c8y_operation(&operation) else { - debug!( - topic = message.topic.name, - ?operation, - "ignoring local-only operation" - ); - return; - }; + let operation_result = match operation { + OperationType::Health | OperationType::Custom(_) => { + debug!( + topic = message.topic.name, + ?operation, + "ignoring local-only operation" + ); + Ok(OperationResult::Ignored) + } - let operation_result = match c8y_operation { - CumulocitySupportedOperations::C8yRestartRequest => { + OperationType::Restart => { context - .publish_restart_operation_status(&entity, &cmd_id, message) + .publish_restart_operation_status(&entity, &cmd_id, &message) .await } - CumulocitySupportedOperations::C8ySoftwareList => { - context + // SoftwareList is not a regular operation: it doesn't update its status and doesn't report any + // failures; it just maps local software list to c8y software list payloads and sends it via MQTT + // Smartrest 2.0/HTTP + OperationType::SoftwareList => { + let result = context .publish_software_list(&entity, &cmd_id, &message) - .await + .await; + + let mut mqtt_publisher = context.mqtt_publisher.clone(); + match result { + Err(err) => { + error!("Fail to list installed software packages: {err}"); + } + Ok(OperationResult::Finished { messages }) => { + for message in messages { + mqtt_publisher.send(message).await.unwrap(); + } + } + // command is not yet finished, avoid clearing the command topic + Ok(_) => { + continue; + } + } + + clear_command_topic(command, &mut mqtt_publisher).await; + rx.close(); + continue; } - CumulocitySupportedOperations::C8ySoftwareUpdate => { + OperationType::SoftwareUpdate => { context .publish_software_update_status(&entity, &cmd_id, &message) .await } - CumulocitySupportedOperations::C8yLogFileRequest => { + OperationType::LogUpload => { context .handle_log_upload_state_change(&entity, &cmd_id, &message) .await } - CumulocitySupportedOperations::C8yUploadConfigFile => { + OperationType::ConfigSnapshot => { context .handle_config_snapshot_state_change(&entity, &cmd_id, &message) .await } - CumulocitySupportedOperations::C8yDownloadConfigFile => { + OperationType::ConfigUpdate => { context .handle_config_update_state_change(&entity, &cmd_id, &message) .await } - CumulocitySupportedOperations::C8yFirmware => { + OperationType::FirmwareUpdate => { context .handle_firmware_update_state_change(&entity, &cmd_id, &message) .await @@ -275,6 +300,10 @@ impl RunningOperation { }; let mut mqtt_publisher = context.mqtt_publisher.clone(); + + // at this point all local operations that are not regular c8y operations should be handled above + let c8y_operation = to_c8y_operation(&operation).unwrap(); + match to_response( operation_result, c8y_operation, @@ -304,13 +333,7 @@ impl RunningOperation { mqtt_publisher.send(message).await.unwrap(); } - // clear command topic - let command = command.clear(); - let clearing_message = command.into_message(); - assert!(clearing_message.payload_bytes().is_empty()); - assert!(clearing_message.retain); - assert_eq!(clearing_message.qos, QoS::AtLeastOnce); - mqtt_publisher.send(clearing_message).await.unwrap(); + clear_command_topic(command, &mut mqtt_publisher).await; rx.close(); } @@ -322,6 +345,18 @@ impl RunningOperation { } } +async fn clear_command_topic( + command: GenericCommandState, + mqtt_publisher: &mut LoggingSender, +) { + let command = command.clear(); + let clearing_message = command.into_message(); + assert!(clearing_message.payload_bytes().is_empty()); + assert!(clearing_message.retain); + assert_eq!(clearing_message.qos, QoS::AtLeastOnce); + mqtt_publisher.send(clearing_message).await.unwrap(); +} + /// Result of an update of operation's state. /// /// When a new MQTT message is received with an updated state of the operation, the mapper needs to @@ -345,7 +380,7 @@ enum OperationResult { /// Converts operation result to valid C8y response. fn to_response( - result: Result, + result: Result, operation_type: CumulocitySupportedOperations, smartrest_publish_topic: &Topic, ) -> OperationResult { @@ -378,7 +413,9 @@ fn to_c8y_operation(operation_type: &OperationType) -> Option Some(CumulocitySupportedOperations::C8yDownloadConfigFile), OperationType::FirmwareUpdate => Some(CumulocitySupportedOperations::C8yFirmware), OperationType::SoftwareUpdate => Some(CumulocitySupportedOperations::C8ySoftwareUpdate), - OperationType::SoftwareList => Some(CumulocitySupportedOperations::C8ySoftwareList), + // software list is not an c8y, only a fragment, but is a local operation that is spawned as + // part of C8y_SoftwareUpdate operation + OperationType::SoftwareList => None, // local-only operation, not always invoked by c8y, handled in other codepath OperationType::Health => None, // other custom operations, no c8y equivalent diff --git a/crates/extensions/c8y_mapper_ext/src/operations/restart.rs b/crates/extensions/c8y_mapper_ext/src/operations/restart.rs index f71c21b449f..4faf9254da5 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/restart.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/restart.rs @@ -1,11 +1,11 @@ +use anyhow::Context; use c8y_api::smartrest; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; use tedge_api::CommandStatus; use tedge_api::RestartCommand; use tedge_mqtt_ext::MqttMessage; -use crate::error::ConversionError; - +use super::error::OperationError; use super::EntityTarget; use super::OperationContext; use super::OperationResult; @@ -15,13 +15,15 @@ impl OperationContext { &self, target: &EntityTarget, cmd_id: &str, - message: MqttMessage, - ) -> Result { + message: &MqttMessage, + ) -> Result { let command = match RestartCommand::try_from_bytes( target.topic_id.clone(), cmd_id.to_owned(), message.payload_bytes(), - )? { + ) + .context("Could not parse command as a restart command")? + { Some(command) => command, None => { // The command has been fully processed @@ -43,14 +45,7 @@ impl OperationContext { }) } CommandStatus::Failed { ref reason } => { - let smartrest_set_operation = smartrest::smartrest_serializer::fail_operation( - CumulocitySupportedOperations::C8yRestartRequest, - &format!("Restart Failed: {reason}"), - ); - - Ok(OperationResult::Finished { - messages: vec![MqttMessage::new(topic, smartrest_set_operation)], - }) + Err(anyhow::anyhow!("Restart Failed: {reason}").into()) } _ => { // The other states are ignored diff --git a/crates/extensions/c8y_mapper_ext/src/operations/software_list.rs b/crates/extensions/c8y_mapper_ext/src/operations/software_list.rs index ccd74eaa988..f261144314d 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/software_list.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/software_list.rs @@ -1,13 +1,12 @@ +use anyhow::Context; use c8y_api::json_c8y::C8yUpdateSoftwareListResponse; use c8y_api::smartrest; use tedge_api::CommandStatus; use tedge_api::SoftwareListCommand; use tedge_config::SoftwareManagementApiFlag; use tedge_mqtt_ext::MqttMessage; -use tracing::error; - -use crate::error::ConversionError; +use super::error::OperationError; use super::EntityTarget; use super::OperationContext; use super::OperationResult; @@ -20,12 +19,14 @@ impl OperationContext { target: &EntityTarget, cmd_id: &str, message: &MqttMessage, - ) -> Result { + ) -> Result { let command = match SoftwareListCommand::try_from_bytes( target.topic_id.clone(), cmd_id.to_owned(), message.payload_bytes(), - )? { + ) + .context("Could not parse command as software list command")? + { Some(command) => command, None => { // The command has been fully processed @@ -44,7 +45,8 @@ impl OperationContext { c8y_software_list, target.external_id.as_ref().to_string(), ) - .await?; + .await + .context("Could not send software list via http")?; return Ok(OperationResult::Finished { messages: vec![] }); } @@ -63,10 +65,7 @@ impl OperationContext { Ok(OperationResult::Finished { messages }) } - CommandStatus::Failed { reason } => { - error!("Fail to list installed software packages: {reason}"); - Ok(OperationResult::Finished { messages: vec![] }) - } + CommandStatus::Failed { reason } => Err(anyhow::anyhow!("{reason}").into()), CommandStatus::Init | CommandStatus::Scheduled diff --git a/crates/extensions/c8y_mapper_ext/src/operations/software_update.rs b/crates/extensions/c8y_mapper_ext/src/operations/software_update.rs index e6cd9a250b9..75aa9eba7ab 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/software_update.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/software_update.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use c8y_api::smartrest; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; use tedge_api::mqtt_topics::EntityTopicId; @@ -6,8 +7,7 @@ use tedge_api::SoftwareListCommand; use tedge_api::SoftwareUpdateCommand; use tedge_mqtt_ext::MqttMessage; -use crate::error::ConversionError; - +use super::error::OperationError; use super::EntityTarget; use super::OperationContext; use super::OperationResult; @@ -18,12 +18,14 @@ impl OperationContext { target: &EntityTarget, cmd_id: &str, message: &MqttMessage, - ) -> Result { + ) -> Result { let command = match SoftwareUpdateCommand::try_from_bytes( target.topic_id.clone(), cmd_id.to_string(), message.payload_bytes(), - )? { + ) + .context("Could not parse command as a software update command")? + { Some(command) => command, None => { // The command has been fully processed @@ -51,6 +53,7 @@ impl OperationContext { ], }) } + // TODO(marcel): use simpler error handling once software list request extracted to converter CommandStatus::Failed { reason } => { let smartrest_set_operation = smartrest::smartrest_serializer::fail_operation( CumulocitySupportedOperations::C8ySoftwareUpdate,