Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Improve c8y_mapper_ext operation handler API #2997

Merged
14 changes: 0 additions & 14 deletions crates/core/c8y_api/src/smartrest/smartrest_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use mqtt_channel::MqttMessage;
use serde::ser::SerializeSeq;
use serde::Serialize;
use serde::Serializer;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::SoftwareListCommand;
use tedge_api::SoftwareModule;
use tedge_config::TopicPrefix;
Expand Down Expand Up @@ -103,19 +102,6 @@ impl From<CumulocitySupportedOperations> for &'static str {
}
}

impl From<CumulocitySupportedOperations> for OperationType {
fn from(value: CumulocitySupportedOperations) -> Self {
match value {
CumulocitySupportedOperations::C8ySoftwareUpdate => OperationType::ConfigSnapshot,
CumulocitySupportedOperations::C8yLogFileRequest => OperationType::LogUpload,
CumulocitySupportedOperations::C8yRestartRequest => OperationType::Restart,
CumulocitySupportedOperations::C8yUploadConfigFile => OperationType::ConfigSnapshot,
CumulocitySupportedOperations::C8yDownloadConfigFile => OperationType::ConfigUpdate,
CumulocitySupportedOperations::C8yFirmware => OperationType::FirmwareUpdate,
}
}
}

pub fn declare_supported_operations(ops: &[&str]) -> String {
format!("114,{}", fields_to_csv_string(ops))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use super::error::OperationError;
use super::EntityTarget;
use super::OperationContext;
use super::OperationOutcome;
use crate::converter::CumulocityConverter;
use crate::error::ConversionError;
use crate::error::CumulocityMapperError;
use anyhow::Context;
use c8y_api::json_c8y_deserializer::C8yUploadConfigFile;
use c8y_api::smartrest::smartrest_serializer::fail_operation;
use c8y_api::smartrest::smartrest_serializer::set_operation_executing;
use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations;
use camino::Utf8PathBuf;
use std::borrow::Cow;
Expand All @@ -19,11 +19,9 @@ use tedge_api::mqtt_topics::EntityFilter;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::workflow::GenericCommandState;
use tedge_api::Jsonify;
use tedge_downloader_ext::DownloadRequest;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::QoS;
use tedge_mqtt_ext::TopicFilter;
use tracing::log::warn;

Expand Down Expand Up @@ -91,42 +89,37 @@ impl OperationContext {
/// - "failed", it converts the message to SmartREST "Failed".
pub async fn handle_config_snapshot_state_change(
&self,
entity: EntityTarget,
entity: &EntityTarget,
cmd_id: &str,
message: &MqttMessage,
) -> Result<(Vec<MqttMessage>, Option<GenericCommandState>), ConversionError> {
) -> Result<OperationOutcome, OperationError> {
if !self.capabilities.config_snapshot {
warn!(
"Received a config_snapshot command, however, config_snapshot feature is disabled"
);
return Ok((vec![], None));
return Ok(OperationOutcome::Ignored);
}
let target = entity;
let topic_id = &target.topic_id;

let command = match ConfigSnapshotCmd::try_from_bytes(
topic_id.clone(),
target.topic_id.clone(),
cmd_id.into(),
message.payload_bytes(),
)? {
)
.context("Could not parse command as a config snapshot command")?
{
Some(command) => command,
None => {
// The command has been fully processed
return Ok((vec![], None));
return Ok(OperationOutcome::Ignored);
}
};

let smartrest_topic = target.smartrest_publish_topic;
let smartrest_topic = &target.smartrest_publish_topic;
let cmd_id = command.cmd_id.as_str();

let messages = match command.status() {
CommandStatus::Executing => {
let smartrest_operation_status =
set_operation_executing(CumulocitySupportedOperations::C8yUploadConfigFile);
vec![MqttMessage::new(
&smartrest_topic,
smartrest_operation_status,
)]
}
match command.status() {
CommandStatus::Executing => Ok(OperationOutcome::Executing),
CommandStatus::Successful => {
// Send a request to the Downloader to download the file asynchronously from FTS
let config_filename = format!(
Expand Down Expand Up @@ -159,28 +152,14 @@ impl OperationContext {
.downloader
.clone()
.await_response((cmd_id.to_string(), download_request))
.await?;

match download_result {
Err(err) => {
let smartrest_error =
fail_operation(
CumulocitySupportedOperations::C8yUploadConfigFile,
&format!("tedge-mapper-c8y failed to download configuration snapshot from file-transfer service: {err}"),
);
.await
.context("Unexpected ChannelError")?;

let c8y_notification = MqttMessage::new(&smartrest_topic, smartrest_error);
let clean_operation = MqttMessage::new(&message.topic, "")
.with_retain()
.with_qos(QoS::AtLeastOnce);

return Ok((vec![c8y_notification, clean_operation], None));
}
Ok(download) => download,
};
download_result.context( "tedge-mapper-c8y failed to download configuration snapshot from file-transfer service")?;

let file_path =
Utf8PathBuf::try_from(destination_path).map_err(|e| e.into_io_error())?;
let file_path = Utf8PathBuf::try_from(destination_path)
.map_err(|e| e.into_io_error())
.context("Could not parse destination path as utf-8")?;
let event_type = command.payload.config_type.clone();

// Upload the file to C8y
Expand All @@ -194,40 +173,27 @@ impl OperationContext {
event_type,
None,
)
.await?;
.await
.context("Could not upload config file to C8y")?;

let smartrest_response = super::get_smartrest_response_for_upload_result(
upload_result,
c8y_binary_url.as_str(),
CumulocitySupportedOperations::C8yUploadConfigFile,
);

let c8y_notification = MqttMessage::new(&smartrest_topic, smartrest_response);
let clear_local_cmd = MqttMessage::new(&message.topic, "")
.with_retain()
.with_qos(QoS::AtLeastOnce);
let c8y_notification = MqttMessage::new(smartrest_topic, smartrest_response);

vec![c8y_notification, clear_local_cmd]
}
CommandStatus::Failed { reason } => {
let smartrest_operation_status =
fail_operation(CumulocitySupportedOperations::C8yUploadConfigFile, &reason);
let c8y_notification =
MqttMessage::new(&smartrest_topic, smartrest_operation_status);
let clear_local_cmd = MqttMessage::new(&message.topic, "")
.with_retain()
.with_qos(QoS::AtLeastOnce);
vec![c8y_notification, clear_local_cmd]
Ok(OperationOutcome::Finished {
messages: vec![c8y_notification],
})
}
CommandStatus::Failed { reason } => Err(anyhow::anyhow!(reason).into()),
_ => {
vec![] // Do nothing as other components might handle those states
// Do nothing as other components might handle those states
Ok(OperationOutcome::Ignored)
}
};

Ok((
messages,
Some(command.into_generic_command(&self.mqtt_schema)),
))
}
}
}

Expand Down
61 changes: 20 additions & 41 deletions crates/extensions/c8y_mapper_ext/src/operations/config_update.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::converter::CumulocityConverter;
use crate::error::ConversionError;
use crate::error::CumulocityMapperError;
use anyhow::Context;
use c8y_api::json_c8y_deserializer::C8yDownloadConfigFile;
use c8y_api::smartrest::smartrest_serializer::fail_operation;
use c8y_api::smartrest::smartrest_serializer::set_operation_executing;
use c8y_api::smartrest::smartrest_serializer::succeed_operation_no_payload;
use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations;
use std::sync::Arc;
Expand All @@ -18,15 +17,15 @@ use tedge_api::mqtt_topics::EntityFilter;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::workflow::GenericCommandState;
use tedge_api::Jsonify;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::QoS;
use tedge_mqtt_ext::TopicFilter;
use tracing::log::warn;

use super::error::OperationError;
use super::EntityTarget;
use super::OperationContext;
use super::OperationOutcome;

pub fn topic_filter(mqtt_schema: &MqttSchema) -> TopicFilter {
[
Expand All @@ -50,68 +49,48 @@ impl OperationContext {
/// - "failed", it converts the message to SmartREST "Failed".
pub async fn handle_config_update_state_change(
&self,
target: EntityTarget,
target: &EntityTarget,
cmd_id: &str,
message: &MqttMessage,
) -> Result<(Vec<MqttMessage>, Option<GenericCommandState>), ConversionError> {
) -> Result<OperationOutcome, OperationError> {
if !self.capabilities.config_update {
warn!("Received a config_update command, however, config_update feature is disabled");
return Ok((vec![], None));
return Ok(OperationOutcome::Ignored);
}

let command = match ConfigUpdateCmd::try_from_bytes(
target.topic_id.clone(),
cmd_id.into(),
message.payload_bytes(),
)? {
)
.context("Could not parse command as a config update command")?
{
Some(command) => command,
None => {
// The command has been fully processed
return Ok((vec![], None));
return Ok(OperationOutcome::Ignored);
}
};

let sm_topic = target.smartrest_publish_topic;

let messages = match command.status() {
CommandStatus::Executing => {
let smartrest_operation_status =
set_operation_executing(CumulocitySupportedOperations::C8yDownloadConfigFile);
let sm_topic = &target.smartrest_publish_topic;

vec![MqttMessage::new(&sm_topic, smartrest_operation_status)]
}
match command.status() {
CommandStatus::Executing => Ok(OperationOutcome::Executing),
CommandStatus::Successful => {
let smartrest_operation_status = succeed_operation_no_payload(
CumulocitySupportedOperations::C8yDownloadConfigFile,
);
let c8y_notification = MqttMessage::new(&sm_topic, smartrest_operation_status);
let clear_local_cmd = MqttMessage::new(&message.topic, "")
.with_retain()
.with_qos(QoS::AtLeastOnce);
let c8y_notification = MqttMessage::new(sm_topic, smartrest_operation_status);

vec![c8y_notification, clear_local_cmd]
}
CommandStatus::Failed { reason } => {
let smartrest_operation_status = fail_operation(
CumulocitySupportedOperations::C8yDownloadConfigFile,
&reason,
);
let c8y_notification = MqttMessage::new(&sm_topic, smartrest_operation_status);
let clear_local_cmd = MqttMessage::new(&message.topic, "")
.with_retain()
.with_qos(QoS::AtLeastOnce);

vec![c8y_notification, clear_local_cmd]
Ok(OperationOutcome::Finished {
messages: vec![c8y_notification],
})
}
CommandStatus::Failed { reason } => Err(anyhow::anyhow!(reason).into()),
_ => {
vec![] // Do nothing as other components might handle those states
Ok(OperationOutcome::Ignored) // Do nothing as other components might handle those states
}
};

Ok((
messages,
Some(command.into_generic_command(&self.mqtt_schema)),
))
}
}
}

Expand Down
58 changes: 58 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/operations/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::error::Error;
use std::fmt::Display;

/// An error type for operation handlers.
///
/// This error type wraps an [`anyhow::Error`] and provides a display implementation that assumes
/// that source errors print their own causes after `:` character. This is in order to ensure that
/// failures are printed properly in Cumulocity web interface.
#[derive(Debug)]
pub(crate) struct OperationError(anyhow::Error);

impl Display for OperationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)?;

if let Some(source) = self.0.source() {
write!(f, ": {}", source)?;
}

Ok(())
}
}

impl Error for OperationError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
self.0.source()
}
}

impl From<anyhow::Error> for OperationError {
fn from(error: anyhow::Error) -> Self {
Self(error)
}
}

#[cfg(test)]
mod tests {
use super::*;
use anyhow::Context;

#[test]
fn separates_error_levels_on_a_single_line() {
let example_io_err: Result<(), std::io::Error> = Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Example io error",
));

let operation_error: OperationError = example_io_err
.context("Could not perform io operation")
.unwrap_err()
.into();

assert_eq!(
&operation_error.to_string(),
"Could not perform io operation: Example io error"
);
}
}
Loading