Skip to content

Commit

Permalink
extract crafting of error MQTT message from operation handlers
Browse files Browse the repository at this point in the history
1. Changed error type from `ConversionError` to `OperationError` which
   displays its source error in a single line
2. Added context to some operation error cases

Signed-off-by: Marcel Guzik <[email protected]>
  • Loading branch information
Bravo555 committed Jul 15, 2024
1 parent b5e0d2a commit 1603106
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 120 deletions.
49 changes: 14 additions & 35 deletions crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::error::OperationError;
use super::EntityTarget;
use super::OperationContext;
use super::OperationResult;
Expand All @@ -6,7 +7,6 @@ use crate::error::ConversionError;
use crate::error::CumulocityMapperError;
use anyhow::Context;
use c8y_api::json_c8y_deserializer::C8yUploadConfigFile;
use c8y_api::smartrest::smartrest_serializer::fail_operation;
use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations;
use camino::Utf8PathBuf;
use std::borrow::Cow;
Expand Down Expand Up @@ -92,7 +92,7 @@ impl OperationContext {
entity: &EntityTarget,
cmd_id: &str,
message: &MqttMessage,
) -> Result<OperationResult, ConversionError> {
) -> Result<OperationResult, OperationError> {
if !self.capabilities.config_snapshot {
warn!(
"Received a config_snapshot command, however, config_snapshot feature is disabled"
Expand All @@ -105,7 +105,9 @@ impl OperationContext {
target.topic_id.clone(),
cmd_id.into(),
message.payload_bytes(),
)? {
)
.context("Could not parse command as a config snapshot command")?
{
Some(command) => command,
None => {
// The command has been fully processed
Expand Down Expand Up @@ -150,28 +152,14 @@ impl OperationContext {
.downloader
.clone()
.await_response((cmd_id.to_string(), download_request))
.await?;

match download_result {
Err(err) => {
let smartrest_error =
fail_operation(
CumulocitySupportedOperations::C8yUploadConfigFile,
&format!("tedge-mapper-c8y failed to download configuration snapshot from file-transfer service: {err}"),
);

let c8y_notification = MqttMessage::new(smartrest_topic, smartrest_error);
.await
.context("Unexpected ChannelError")?;

return Ok(OperationResult::Finished {
messages: vec![c8y_notification],

});
}
Ok(download) => download,
};
download_result.context( "tedge-mapper-c8y failed to download configuration snapshot from file-transfer service")?;

let file_path =
Utf8PathBuf::try_from(destination_path).map_err(|e| e.into_io_error())?;
let file_path = Utf8PathBuf::try_from(destination_path)
.map_err(|e| e.into_io_error())
.context("Could not parse destination path as utf-8")?;
let event_type = command.payload.config_type.clone();

// Upload the file to C8y
Expand All @@ -185,7 +173,8 @@ impl OperationContext {
event_type,
None,
)
.await?;
.await
.context("Could not upload config file to C8y")?;

let smartrest_response = super::get_smartrest_response_for_upload_result(
upload_result,
Expand All @@ -199,17 +188,7 @@ impl OperationContext {
messages: vec![c8y_notification],
})
}
CommandStatus::Failed { reason } => {
let smartrest_operation_status =
fail_operation(CumulocitySupportedOperations::C8yUploadConfigFile, &reason);
let c8y_notification =
MqttMessage::new(smartrest_topic, smartrest_operation_status);

Ok(OperationResult::Finished {
messages: vec![c8y_notification],

})
}
CommandStatus::Failed { reason } => Err(anyhow::anyhow!(reason).into()),
_ => {
// Do nothing as other components might handle those states
Ok(OperationResult::Ignored)
Expand Down
21 changes: 7 additions & 14 deletions crates/extensions/c8y_mapper_ext/src/operations/config_update.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::converter::CumulocityConverter;
use crate::error::ConversionError;
use crate::error::CumulocityMapperError;
use anyhow::Context;
use c8y_api::json_c8y_deserializer::C8yDownloadConfigFile;
use c8y_api::smartrest::smartrest_serializer::fail_operation;
use c8y_api::smartrest::smartrest_serializer::succeed_operation_no_payload;
use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations;
use std::sync::Arc;
Expand All @@ -22,6 +22,7 @@ use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::TopicFilter;
use tracing::log::warn;

use super::error::OperationError;
use super::EntityTarget;
use super::OperationContext;
use super::OperationResult;
Expand Down Expand Up @@ -51,7 +52,7 @@ impl OperationContext {
target: &EntityTarget,
cmd_id: &str,
message: &MqttMessage,
) -> Result<OperationResult, ConversionError> {
) -> Result<OperationResult, OperationError> {
if !self.capabilities.config_update {
warn!("Received a config_update command, however, config_update feature is disabled");
return Ok(OperationResult::Ignored);
Expand All @@ -61,7 +62,9 @@ impl OperationContext {
target.topic_id.clone(),
cmd_id.into(),
message.payload_bytes(),
)? {
)
.context("Could not parse command as a config update command")?
{
Some(command) => command,
None => {
// The command has been fully processed
Expand All @@ -83,17 +86,7 @@ impl OperationContext {
messages: vec![c8y_notification],
})
}
CommandStatus::Failed { reason } => {
let smartrest_operation_status = fail_operation(
CumulocitySupportedOperations::C8yDownloadConfigFile,
&reason,
);
let c8y_notification = MqttMessage::new(sm_topic, smartrest_operation_status);

Ok(OperationResult::Finished {
messages: vec![c8y_notification],
})
}
CommandStatus::Failed { reason } => Err(anyhow::anyhow!(reason).into()),
_ => {
Ok(OperationResult::Ignored) // Do nothing as other components might handle those states
}
Expand Down
58 changes: 58 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/operations/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::error::Error;
use std::fmt::Display;

/// An error type for operation handlers.
///
/// This error type wraps an [`anyhow::Error`] and provides a display implementation that assumes
/// that source errors print their own causes after `:` character. This is in order to ensure that
/// failures are printed properly in Cumulocity web interface.
#[derive(Debug)]
pub(crate) struct OperationError(anyhow::Error);

impl Display for OperationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)?;

if let Some(source) = self.0.source() {
write!(f, ": {}", source)?;
}

Ok(())
}
}

impl Error for OperationError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
self.0.source()
}
}

impl From<anyhow::Error> for OperationError {
fn from(error: anyhow::Error) -> Self {
Self(error)
}
}

#[cfg(test)]
mod tests {
use super::*;
use anyhow::Context;

#[test]
fn separates_error_levels_on_a_single_line() {
let example_io_err: Result<(), std::io::Error> = Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Example io error",
));

let operation_error: OperationError = example_io_err
.context("Could not perform io operation")
.unwrap_err()
.into();

assert_eq!(
&operation_error.to_string(),
"Could not perform io operation: Example io error"
);
}
}
19 changes: 7 additions & 12 deletions crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use super::error::OperationError;
use super::EntityTarget;
use super::OperationContext;
use super::OperationResult;
use crate::converter::CumulocityConverter;
use crate::error::ConversionError;
use crate::error::CumulocityMapperError;
use anyhow::Context;
use c8y_api::json_c8y_deserializer::C8yFirmware;
use c8y_api::smartrest::smartrest_serializer::fail_operation;
use c8y_api::smartrest::smartrest_serializer::succeed_operation_no_payload;
use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations;
use tedge_api::commands::FirmwareInfo;
Expand Down Expand Up @@ -87,7 +88,7 @@ impl OperationContext {
target: &EntityTarget,
cmd_id: &str,
message: &MqttMessage,
) -> Result<OperationResult, ConversionError> {
) -> Result<OperationResult, OperationError> {
if !self.capabilities.firmware_update {
warn!(
"Received a firmware_update command, however, firmware_update feature is disabled"
Expand All @@ -99,7 +100,9 @@ impl OperationContext {
target.topic_id.clone(),
cmd_id.into(),
message.payload_bytes(),
)? {
)
.context("Could not parse command as a firmware update command")?
{
Some(command) => command,
None => {
// The command has been fully processed
Expand Down Expand Up @@ -138,15 +141,7 @@ impl OperationContext {
messages: vec![c8y_notification, twin_metadata],
})
}
CommandStatus::Failed { reason } => {
let smartrest_operation_status =
fail_operation(CumulocitySupportedOperations::C8yFirmware, &reason);
let c8y_notification = MqttMessage::new(sm_topic, smartrest_operation_status);

Ok(OperationResult::Finished {
messages: vec![c8y_notification],
})
}
CommandStatus::Failed { reason } => Err(anyhow::anyhow!(reason).into()),
_ => Ok(OperationResult::Ignored),
}
}
Expand Down
56 changes: 19 additions & 37 deletions crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::error::OperationError;
use super::EntityTarget;
use super::OperationContext;
use crate::converter::CumulocityConverter;
Expand All @@ -6,7 +7,6 @@ use crate::error::CumulocityMapperError;
use crate::operations::OperationResult;
use anyhow::Context;
use c8y_api::json_c8y_deserializer::C8yLogfileRequest;
use c8y_api::smartrest::smartrest_serializer::fail_operation;
use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations;
use camino::Utf8PathBuf;
use tedge_api::commands::CommandStatus;
Expand Down Expand Up @@ -90,7 +90,7 @@ impl OperationContext {
target: &EntityTarget,
cmd_id: &str,
message: &MqttMessage,
) -> Result<OperationResult, ConversionError> {
) -> Result<OperationResult, OperationError> {
if !self.capabilities.log_upload {
warn!("Received a log_upload command, however, log_upload feature is disabled");
return Ok(OperationResult::Ignored);
Expand All @@ -100,7 +100,9 @@ impl OperationContext {
target.topic_id.clone(),
cmd_id.into(),
message.payload_bytes(),
)? {
)
.context("Could not parse command as a log upload command")?
{
Some(command) => command,
None => {
// The command has been fully processed
Expand Down Expand Up @@ -128,30 +130,17 @@ impl OperationContext {
.clone()
.await_response((cmd_id.into(), download_request))
.await
.map_err(CumulocityMapperError::ChannelError)?;

let download_response = match download_result {
Err(err) => {
let smartrest_error = fail_operation(
CumulocitySupportedOperations::C8yLogFileRequest,
&format!(
"tedge-mapper-c8y failed to download log from file transfer service: {err}",
),
);

let c8y_notification = MqttMessage::new(smartrest_topic, smartrest_error);
return Ok(OperationResult::Finished {
messages: vec![c8y_notification],

});
}
Ok(download) => download,
};
.context("Unexpected ChannelError")?;

let download_response = download_result.context(
"tedge-mapper-c8y failed to download log from file transfer service",
)?;

let file_path = Utf8PathBuf::try_from(download_response.file_path)
.map_err(|e| e.into_io_error())?;
let response = &LogUploadCmdPayload::from_json(message.payload_str()?)?;
let event_type = response.log_type.clone();
.map_err(|e| e.into_io_error())
.context("Could not parse file path as Utf-8")?;

let event_type = &command.payload.log_type;

let (binary_upload_event_url, upload_result) = self
.upload_file(
Expand All @@ -163,7 +152,8 @@ impl OperationContext {
event_type.clone(),
None,
)
.await?;
.await
.context("Could not upload log file to C8y")?;

let smartrest_response = super::get_smartrest_response_for_upload_result(
upload_result,
Expand All @@ -179,22 +169,14 @@ impl OperationContext {
&OperationType::LogUpload,
&command.clone().into_generic_command(&self.mqtt_schema),
)
.await?;
.await
.context("Could not upload operation log")?;

Ok(OperationResult::Finished {
messages: vec![c8y_notification],
})
}
CommandStatus::Failed { reason } => {
let smartrest_operation_status =
fail_operation(CumulocitySupportedOperations::C8yLogFileRequest, &reason);
let c8y_notification =
MqttMessage::new(smartrest_topic, smartrest_operation_status);
Ok(OperationResult::Finished {
messages: vec![c8y_notification],

})
}
CommandStatus::Failed { reason } => Err(anyhow::anyhow!(reason).into()),
_ => {
// Do nothing as other components might handle those states
Ok(OperationResult::Ignored)
Expand Down
4 changes: 3 additions & 1 deletion crates/extensions/c8y_mapper_ext/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations;
use c8y_auth_proxy::url::ProxyUrlGenerator;
use c8y_http_proxy::handle::C8YHttpProxy;
use camino::Utf8Path;
use error::OperationError;
use std::collections::HashMap;
use std::sync::Arc;
use tedge_actors::ClientMessageBox;
Expand All @@ -54,6 +55,7 @@ use tracing::error;

pub mod config_snapshot;
pub mod config_update;
mod error;
pub mod firmware_update;
pub mod log_upload;
mod restart;
Expand Down Expand Up @@ -345,7 +347,7 @@ enum OperationResult {

/// Converts operation result to valid C8y response.
fn to_response(
result: Result<OperationResult, ConversionError>,
result: Result<OperationResult, OperationError>,
operation_type: CumulocitySupportedOperations,
smartrest_publish_topic: &Topic,
) -> OperationResult {
Expand Down
Loading

0 comments on commit 1603106

Please sign in to comment.