Skip to content

Commit

Permalink
Merge pull request #2997 from Bravo555/improve/operation-handler-deco…
Browse files Browse the repository at this point in the history
…upling

Improve c8y_mapper_ext operation handler API
  • Loading branch information
Bravo555 authored Jul 23, 2024
2 parents 64945fc + d74a8c8 commit d8b4ab4
Show file tree
Hide file tree
Showing 12 changed files with 1,237 additions and 768 deletions.
14 changes: 0 additions & 14 deletions crates/core/c8y_api/src/smartrest/smartrest_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use mqtt_channel::MqttMessage;
use serde::ser::SerializeSeq;
use serde::Serialize;
use serde::Serializer;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::SoftwareListCommand;
use tedge_api::SoftwareModule;
use tedge_config::TopicPrefix;
Expand Down Expand Up @@ -103,19 +102,6 @@ impl From<CumulocitySupportedOperations> for &'static str {
}
}

impl From<CumulocitySupportedOperations> for OperationType {
fn from(value: CumulocitySupportedOperations) -> Self {
match value {
CumulocitySupportedOperations::C8ySoftwareUpdate => OperationType::ConfigSnapshot,
CumulocitySupportedOperations::C8yLogFileRequest => OperationType::LogUpload,
CumulocitySupportedOperations::C8yRestartRequest => OperationType::Restart,
CumulocitySupportedOperations::C8yUploadConfigFile => OperationType::ConfigSnapshot,
CumulocitySupportedOperations::C8yDownloadConfigFile => OperationType::ConfigUpdate,
CumulocitySupportedOperations::C8yFirmware => OperationType::FirmwareUpdate,
}
}
}

pub fn declare_supported_operations(ops: &[&str]) -> String {
format!("114,{}", fields_to_csv_string(ops))
}
Expand Down
94 changes: 30 additions & 64 deletions crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use super::error::OperationError;
use super::EntityTarget;
use super::OperationContext;
use super::OperationOutcome;
use crate::converter::CumulocityConverter;
use crate::error::ConversionError;
use crate::error::CumulocityMapperError;
use anyhow::Context;
use c8y_api::json_c8y_deserializer::C8yUploadConfigFile;
use c8y_api::smartrest::smartrest_serializer::fail_operation;
use c8y_api::smartrest::smartrest_serializer::set_operation_executing;
use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations;
use camino::Utf8PathBuf;
use std::borrow::Cow;
Expand All @@ -19,11 +19,9 @@ use tedge_api::mqtt_topics::EntityFilter;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::workflow::GenericCommandState;
use tedge_api::Jsonify;
use tedge_downloader_ext::DownloadRequest;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::QoS;
use tedge_mqtt_ext::TopicFilter;
use tracing::log::warn;

Expand Down Expand Up @@ -91,42 +89,37 @@ impl OperationContext {
/// - "failed", it converts the message to SmartREST "Failed".
pub async fn handle_config_snapshot_state_change(
&self,
entity: EntityTarget,
entity: &EntityTarget,
cmd_id: &str,
message: &MqttMessage,
) -> Result<(Vec<MqttMessage>, Option<GenericCommandState>), ConversionError> {
) -> Result<OperationOutcome, OperationError> {
if !self.capabilities.config_snapshot {
warn!(
"Received a config_snapshot command, however, config_snapshot feature is disabled"
);
return Ok((vec![], None));
return Ok(OperationOutcome::Ignored);
}
let target = entity;
let topic_id = &target.topic_id;

let command = match ConfigSnapshotCmd::try_from_bytes(
topic_id.clone(),
target.topic_id.clone(),
cmd_id.into(),
message.payload_bytes(),
)? {
)
.context("Could not parse command as a config snapshot command")?
{
Some(command) => command,
None => {
// The command has been fully processed
return Ok((vec![], None));
return Ok(OperationOutcome::Ignored);
}
};

let smartrest_topic = target.smartrest_publish_topic;
let smartrest_topic = &target.smartrest_publish_topic;
let cmd_id = command.cmd_id.as_str();

let messages = match command.status() {
CommandStatus::Executing => {
let smartrest_operation_status =
set_operation_executing(CumulocitySupportedOperations::C8yUploadConfigFile);
vec![MqttMessage::new(
&smartrest_topic,
smartrest_operation_status,
)]
}
match command.status() {
CommandStatus::Executing => Ok(OperationOutcome::Executing),
CommandStatus::Successful => {
// Send a request to the Downloader to download the file asynchronously from FTS
let config_filename = format!(
Expand Down Expand Up @@ -159,28 +152,14 @@ impl OperationContext {
.downloader
.clone()
.await_response((cmd_id.to_string(), download_request))
.await?;

match download_result {
Err(err) => {
let smartrest_error =
fail_operation(
CumulocitySupportedOperations::C8yUploadConfigFile,
&format!("tedge-mapper-c8y failed to download configuration snapshot from file-transfer service: {err}"),
);
.await
.context("Unexpected ChannelError")?;

let c8y_notification = MqttMessage::new(&smartrest_topic, smartrest_error);
let clean_operation = MqttMessage::new(&message.topic, "")
.with_retain()
.with_qos(QoS::AtLeastOnce);

return Ok((vec![c8y_notification, clean_operation], None));
}
Ok(download) => download,
};
download_result.context( "tedge-mapper-c8y failed to download configuration snapshot from file-transfer service")?;

let file_path =
Utf8PathBuf::try_from(destination_path).map_err(|e| e.into_io_error())?;
let file_path = Utf8PathBuf::try_from(destination_path)
.map_err(|e| e.into_io_error())
.context("Could not parse destination path as utf-8")?;
let event_type = command.payload.config_type.clone();

// Upload the file to C8y
Expand All @@ -194,40 +173,27 @@ impl OperationContext {
event_type,
None,
)
.await?;
.await
.context("Could not upload config file to C8y")?;

let smartrest_response = super::get_smartrest_response_for_upload_result(
upload_result,
c8y_binary_url.as_str(),
CumulocitySupportedOperations::C8yUploadConfigFile,
);

let c8y_notification = MqttMessage::new(&smartrest_topic, smartrest_response);
let clear_local_cmd = MqttMessage::new(&message.topic, "")
.with_retain()
.with_qos(QoS::AtLeastOnce);
let c8y_notification = MqttMessage::new(smartrest_topic, smartrest_response);

vec![c8y_notification, clear_local_cmd]
}
CommandStatus::Failed { reason } => {
let smartrest_operation_status =
fail_operation(CumulocitySupportedOperations::C8yUploadConfigFile, &reason);
let c8y_notification =
MqttMessage::new(&smartrest_topic, smartrest_operation_status);
let clear_local_cmd = MqttMessage::new(&message.topic, "")
.with_retain()
.with_qos(QoS::AtLeastOnce);
vec![c8y_notification, clear_local_cmd]
Ok(OperationOutcome::Finished {
messages: vec![c8y_notification],
})
}
CommandStatus::Failed { reason } => Err(anyhow::anyhow!(reason).into()),
_ => {
vec![] // Do nothing as other components might handle those states
// Do nothing as other components might handle those states
Ok(OperationOutcome::Ignored)
}
};

Ok((
messages,
Some(command.into_generic_command(&self.mqtt_schema)),
))
}
}
}

Expand Down
61 changes: 20 additions & 41 deletions crates/extensions/c8y_mapper_ext/src/operations/config_update.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::converter::CumulocityConverter;
use crate::error::ConversionError;
use crate::error::CumulocityMapperError;
use anyhow::Context;
use c8y_api::json_c8y_deserializer::C8yDownloadConfigFile;
use c8y_api::smartrest::smartrest_serializer::fail_operation;
use c8y_api::smartrest::smartrest_serializer::set_operation_executing;
use c8y_api::smartrest::smartrest_serializer::succeed_operation_no_payload;
use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations;
use std::sync::Arc;
Expand All @@ -18,15 +17,15 @@ use tedge_api::mqtt_topics::EntityFilter;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::workflow::GenericCommandState;
use tedge_api::Jsonify;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::QoS;
use tedge_mqtt_ext::TopicFilter;
use tracing::log::warn;

use super::error::OperationError;
use super::EntityTarget;
use super::OperationContext;
use super::OperationOutcome;

pub fn topic_filter(mqtt_schema: &MqttSchema) -> TopicFilter {
[
Expand All @@ -50,68 +49,48 @@ impl OperationContext {
/// - "failed", it converts the message to SmartREST "Failed".
pub async fn handle_config_update_state_change(
&self,
target: EntityTarget,
target: &EntityTarget,
cmd_id: &str,
message: &MqttMessage,
) -> Result<(Vec<MqttMessage>, Option<GenericCommandState>), ConversionError> {
) -> Result<OperationOutcome, OperationError> {
if !self.capabilities.config_update {
warn!("Received a config_update command, however, config_update feature is disabled");
return Ok((vec![], None));
return Ok(OperationOutcome::Ignored);
}

let command = match ConfigUpdateCmd::try_from_bytes(
target.topic_id.clone(),
cmd_id.into(),
message.payload_bytes(),
)? {
)
.context("Could not parse command as a config update command")?
{
Some(command) => command,
None => {
// The command has been fully processed
return Ok((vec![], None));
return Ok(OperationOutcome::Ignored);
}
};

let sm_topic = target.smartrest_publish_topic;

let messages = match command.status() {
CommandStatus::Executing => {
let smartrest_operation_status =
set_operation_executing(CumulocitySupportedOperations::C8yDownloadConfigFile);
let sm_topic = &target.smartrest_publish_topic;

vec![MqttMessage::new(&sm_topic, smartrest_operation_status)]
}
match command.status() {
CommandStatus::Executing => Ok(OperationOutcome::Executing),
CommandStatus::Successful => {
let smartrest_operation_status = succeed_operation_no_payload(
CumulocitySupportedOperations::C8yDownloadConfigFile,
);
let c8y_notification = MqttMessage::new(&sm_topic, smartrest_operation_status);
let clear_local_cmd = MqttMessage::new(&message.topic, "")
.with_retain()
.with_qos(QoS::AtLeastOnce);
let c8y_notification = MqttMessage::new(sm_topic, smartrest_operation_status);

vec![c8y_notification, clear_local_cmd]
}
CommandStatus::Failed { reason } => {
let smartrest_operation_status = fail_operation(
CumulocitySupportedOperations::C8yDownloadConfigFile,
&reason,
);
let c8y_notification = MqttMessage::new(&sm_topic, smartrest_operation_status);
let clear_local_cmd = MqttMessage::new(&message.topic, "")
.with_retain()
.with_qos(QoS::AtLeastOnce);

vec![c8y_notification, clear_local_cmd]
Ok(OperationOutcome::Finished {
messages: vec![c8y_notification],
})
}
CommandStatus::Failed { reason } => Err(anyhow::anyhow!(reason).into()),
_ => {
vec![] // Do nothing as other components might handle those states
Ok(OperationOutcome::Ignored) // Do nothing as other components might handle those states
}
};

Ok((
messages,
Some(command.into_generic_command(&self.mqtt_schema)),
))
}
}
}

Expand Down
58 changes: 58 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/operations/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::error::Error;
use std::fmt::Display;

/// An error type for operation handlers.
///
/// This error type wraps an [`anyhow::Error`] and provides a display implementation that assumes
/// that source errors print their own causes after `:` character. This is in order to ensure that
/// failures are printed properly in Cumulocity web interface.
#[derive(Debug)]
pub(crate) struct OperationError(anyhow::Error);

impl Display for OperationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)?;

if let Some(source) = self.0.source() {
write!(f, ": {}", source)?;
}

Ok(())
}
}

impl Error for OperationError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
self.0.source()
}
}

impl From<anyhow::Error> for OperationError {
fn from(error: anyhow::Error) -> Self {
Self(error)
}
}

#[cfg(test)]
mod tests {
use super::*;
use anyhow::Context;

#[test]
fn separates_error_levels_on_a_single_line() {
let example_io_err: Result<(), std::io::Error> = Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Example io error",
));

let operation_error: OperationError = example_io_err
.context("Could not perform io operation")
.unwrap_err()
.into();

assert_eq!(
&operation_error.to_string(),
"Could not perform io operation: Example io error"
);
}
}
Loading

0 comments on commit d8b4ab4

Please sign in to comment.