From 2fc59ce0eddc9c7b0d5812f058c3e5efb5e9ec5e Mon Sep 17 00:00:00 2001 From: Rina Fujino Date: Wed, 20 Dec 2023 12:01:30 +0100 Subject: [PATCH] Revert "Merge pull request #2482 from rina23q/improve/1718/get-operation-from-json-over-mqtt" This reverts commit 9e204819055dabe4841cd19528809b7fc5336e9a, reversing changes made to 71ad70a0b4a4b7411c4352e24b78400feb757bc4. --- Cargo.lock | 1 + .../core/c8y_api/src/json_c8y_deserializer.rs | 705 ------------------ crates/core/c8y_api/src/lib.rs | 1 - .../src/smartrest/smartrest_deserializer.rs | 394 +++++++++- .../src/cli/connect/bridge_config_c8y.rs | 2 - crates/core/tedge_api/src/mqtt_topics.rs | 4 - crates/extensions/c8y_mapper_ext/Cargo.toml | 1 + .../src/compatibility_adapter.rs | 9 +- .../extensions/c8y_mapper_ext/src/config.rs | 2 - .../c8y_mapper_ext/src/converter.rs | 305 +++----- crates/extensions/c8y_mapper_ext/src/error.rs | 5 - .../src/operations/config_snapshot.rs | 146 ++-- .../src/operations/config_update.rs | 164 ++-- .../src/operations/firmware_update.rs | 64 +- .../src/operations/log_upload.rs | 186 +++-- crates/extensions/c8y_mapper_ext/src/tests.rs | 158 ++-- 16 files changed, 872 insertions(+), 1275 deletions(-) delete mode 100644 crates/core/c8y_api/src/json_c8y_deserializer.rs diff --git a/Cargo.lock b/Cargo.lock index 5289244426e..057ffda8a89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -811,6 +811,7 @@ dependencies = [ "rand", "serde", "serde_json", + "sha256", "tedge_actors", "tedge_api", "tedge_config", diff --git a/crates/core/c8y_api/src/json_c8y_deserializer.rs b/crates/core/c8y_api/src/json_c8y_deserializer.rs deleted file mode 100644 index 0b6895293c2..00000000000 --- a/crates/core/c8y_api/src/json_c8y_deserializer.rs +++ /dev/null @@ -1,705 +0,0 @@ -use crate::smartrest::smartrest_deserializer::to_datetime; -use download::DownloadInfo; -use mqtt_channel::Topic; -use serde::Deserialize; -use std::collections::HashMap; -use tedge_api::mqtt_topics::EntityTopicId; -use tedge_api::SoftwareModule; -use tedge_api::SoftwareModuleUpdate; -use tedge_api::SoftwareUpdateCommand; -use time::OffsetDateTime; - -pub struct C8yDeviceControlTopic; - -impl C8yDeviceControlTopic { - pub fn topic() -> Topic { - Topic::new_unchecked(Self::name()) - } - - pub fn accept(topic: &Topic) -> bool { - topic.name.starts_with(Self::name()) - } - - pub fn name() -> &'static str { - "c8y/devicecontrol/notifications" - } -} - -#[derive(Debug)] -pub enum C8yDeviceControlOperation { - Restart(C8yRestart), - SoftwareUpdate(C8ySoftwareUpdate), - LogfileRequest(C8yLogfileRequest), - UploadConfigFile(C8yUploadConfigFile), - DownloadConfigFile(C8yDownloadConfigFile), - Firmware(C8yFirmware), - Custom, -} - -impl C8yDeviceControlOperation { - pub fn from_json_object( - hashmap: &HashMap, - ) -> Result { - let op = if let Some(value) = hashmap.get("c8y_Restart") { - C8yDeviceControlOperation::Restart(C8yRestart::from_json_value(value.clone())?) - } else if let Some(value) = hashmap.get("c8y_SoftwareUpdate") { - C8yDeviceControlOperation::SoftwareUpdate(C8ySoftwareUpdate::from_json_value( - value.clone(), - )?) - } else if let Some(value) = hashmap.get("c8y_LogfileRequest") { - C8yDeviceControlOperation::LogfileRequest(C8yLogfileRequest::from_json_value( - value.clone(), - )?) - } else if let Some(value) = hashmap.get("c8y_UploadConfigFile") { - C8yDeviceControlOperation::UploadConfigFile(C8yUploadConfigFile::from_json_value( - value.clone(), - )?) - } else if let Some(value) = hashmap.get("c8y_DownloadConfigFile") { - C8yDeviceControlOperation::DownloadConfigFile(C8yDownloadConfigFile::from_json_value( - value.clone(), - )?) - } else if let Some(value) = hashmap.get("c8y_Firmware") { - C8yDeviceControlOperation::Firmware(C8yFirmware::from_json_value(value.clone())?) - } else { - C8yDeviceControlOperation::Custom - }; - - Ok(op) - } -} - -/// Representation of operation object received via JSON over MQTT -/// -/// A lot information come from c8y, however, we only need these items: -/// - `id`, namely c8y's operation ID, -/// - `externalSource.externalId` as device external ID, -/// - operation fragment and its contents, here "c8y_UploadConfigFile". -/// -/// ```rust -/// // Example input from c8y -/// use c8y_api::json_c8y_deserializer::{C8yOperation, C8yUploadConfigFile}; -/// -/// let data = r#" -/// { -/// "delivery": { -/// "log": [], -/// "time": "2023-02-08T06:51:19.350Z", -/// "status": "PENDING" -/// }, -/// "agentId": "22519994", -/// "creationTime": "2023-02-08T06:51:19.318Z", -/// "deviceId": "22519994", -/// "id": "522559", -/// "status": "PENDING", -/// "description": "test operation", -/// "c8y_UploadConfigFile": { -/// "type": "/etc/tedge/tedge.toml" -/// }, -/// "externalSource": { -/// "externalId": "raspberrypi_001", -/// "type": "c8y_Serial" -/// } -/// }"#; -/// -/// // Parse the data -/// let op: C8yOperation = serde_json::from_str(data).unwrap(); -/// -/// // Get data for processing command -/// let device_xid = op.external_source.external_id; -/// let operation_id = op.op_id; -/// if let Some(v) = op.extras.get("c8y_UploadConfigFile") { -/// let c8y_upload_config_file: C8yUploadConfigFile = serde_json::from_value(v.clone()).unwrap(); -/// } -/// ``` -#[derive(Debug, Deserialize, PartialEq, Eq, Clone)] -#[serde(rename_all = "camelCase")] -pub struct C8yOperation { - /// externalSource - pub external_source: ExternalSource, - - /// Operation ID - #[serde(rename = "id")] - pub op_id: String, - - #[serde(flatten)] - pub extras: std::collections::HashMap, -} - -#[derive(Debug, Deserialize, PartialEq, Eq, Clone)] -#[serde(rename_all = "camelCase")] -pub struct ExternalSource { - pub external_id: String, - #[serde(rename = "type")] - pub source_type: String, -} - -impl C8yOperation { - pub fn from_json(json_str: &str) -> Result { - serde_json::from_str(json_str) - } -} - -/// Representation of c8y_Restart JSON object -/// -/// ```rust -/// use c8y_api::json_c8y_deserializer::C8yRestart; -/// -/// // Example input from c8y -/// let data = "{}"; -/// -/// // Parse the data -/// let req: C8yRestart = serde_json::from_str(data).unwrap(); -/// ``` -#[derive(Debug, Deserialize, Eq, PartialEq)] -pub struct C8yRestart {} - -/// Representation of c8y_SoftwareUpdate JSON object -/// -/// ```rust -/// use c8y_api::json_c8y_deserializer::{C8ySoftwareUpdate, C8ySoftwareUpdateAction}; -/// -/// // Example input from c8y -/// let data = r#"[ -/// { -/// "softwareType": "dummy", -/// "name": "foo", -/// "action": "install", -/// "id": "123456", -/// "version": "2.0.0", -/// "url": "https://example.cumulocity.com/inventory/binaries/757538" -/// }, -/// { -/// "name": "bar", -/// "action": "delete", -/// "version": "1.0.1" -/// } -/// ]"#; -/// -/// // Parse the data -/// let req: C8ySoftwareUpdate = serde_json::from_str(data).unwrap(); -/// -/// let first_list = req.lists.get(0).unwrap(); -/// assert_eq!(first_list.software_type, Some("dummy".to_string())); -/// assert_eq!(first_list.name, "foo"); -/// assert_eq!(first_list.action, "install"); -/// assert_eq!(first_list.version, "2.0.0"); -/// assert_eq!(first_list.url, Some("https://example.cumulocity.com/inventory/binaries/757538".to_string())); -/// -/// let second_list = req.lists.get(1).unwrap(); -/// assert_eq!(second_list.name, "bar"); -/// assert_eq!(second_list.action, "delete"); -/// assert_eq!(second_list.version, "1.0.1"); -/// ``` -#[derive(Debug, Deserialize, Eq, PartialEq)] -#[serde(transparent)] -pub struct C8ySoftwareUpdate { - pub lists: Vec, -} - -#[derive(Debug, Deserialize, Eq, PartialEq)] -#[serde(rename_all = "camelCase")] -pub struct C8ySoftwareUpdateModule { - pub name: String, - pub action: String, - pub version: String, - // None if the action is "delete" - pub url: Option, - // None if c8y's version is old. See issue #1352 - pub software_type: Option, - // C8y's object ID of the software to be installed. We don't use this info - pub id: Option, -} - -#[derive(Debug, Deserialize, Eq, PartialEq)] -#[serde(rename_all = "camelCase")] -pub enum C8ySoftwareUpdateAction { - Install, - Delete, -} - -impl TryFrom for C8ySoftwareUpdateAction { - type Error = C8yJsonOverMqttDeserializerError; - - fn try_from(action: String) -> Result { - match action.as_str() { - "install" => Ok(Self::Install), - "delete" => Ok(Self::Delete), - param => Err(C8yJsonOverMqttDeserializerError::InvalidParameter { - parameter: param.into(), - operation: "c8y_SoftwareUpdate".into(), - hint: "It must be install or delete.".into(), - }), - } - } -} - -impl C8ySoftwareUpdate { - pub fn modules(&self) -> &Vec { - &self.lists - } - - pub fn into_software_update_command( - &self, - target: &EntityTopicId, - cmd_id: String, - ) -> Result { - let mut request = SoftwareUpdateCommand::new(target, cmd_id); - for module in self.modules() { - match module.action.clone().try_into()? { - C8ySoftwareUpdateAction::Install => { - request.add_update(SoftwareModuleUpdate::Install { - module: SoftwareModule { - module_type: module.get_module_version_and_type().1, - name: module.name.clone(), - version: module.get_module_version_and_type().0, - url: module.get_url(), - file_path: None, - }, - }); - } - C8ySoftwareUpdateAction::Delete => { - request.add_update(SoftwareModuleUpdate::Remove { - module: SoftwareModule { - module_type: module.get_module_version_and_type().1, - name: module.name.clone(), - version: module.get_module_version_and_type().0, - url: None, - file_path: None, - }, - }); - } - } - } - Ok(request) - } -} - -impl C8ySoftwareUpdateModule { - fn get_module_version_and_type(&self) -> (Option, Option) { - if self.version.is_empty() { - (None, None) // (empty) - } else { - let split = if self.version.matches("::").count() > 1 { - self.version.rsplit_once("::") - } else { - self.version.split_once("::") - }; - - match split { - Some((v, t)) => { - if v.is_empty() { - (None, Some(t.into())) // ::debian - } else if !t.is_empty() { - (Some(v.into()), Some(t.into())) // 1.0::debian - } else { - (Some(v.into()), None) - } - } - None => { - if self.version == " " { - (None, None) // as long as c8y UI forces version input - } else { - (Some(self.version.clone()), None) // 1.0 - } - } - } - } - } - - fn get_url(&self) -> Option { - match &self.url { - Some(url) if url.trim().is_empty() => None, - Some(url) => Some(DownloadInfo::new(url.as_str())), - None => None, - } - } -} - -/// Representation of c8y_LogfileRequest JSON object -/// -/// ```rust -/// use c8y_api::json_c8y_deserializer::C8yLogfileRequest; -/// -/// // Example input from c8y -/// let data = r#" -/// { -/// "searchText": "", -/// "logFile": "foobar", -/// "dateTo": "2023-11-22T22:44:34+0100", -/// "dateFrom": "2023-11-21T22:44:34+0100", -/// "maximumLines": 1000 -/// }"#; -/// -/// // Parse the data -/// let req: C8yLogfileRequest = serde_json::from_str(data).unwrap(); -/// ``` -#[derive(Debug, Deserialize, Eq, PartialEq)] -#[serde(rename_all = "camelCase")] -pub struct C8yLogfileRequest { - pub search_text: String, - pub log_file: String, - #[serde(deserialize_with = "to_datetime")] - pub date_to: OffsetDateTime, - #[serde(deserialize_with = "to_datetime")] - pub date_from: OffsetDateTime, - pub maximum_lines: usize, -} - -/// Representation of c8y_UploadConfigFile JSON object -/// -/// ```rust -/// use c8y_api::json_c8y_deserializer::C8yUploadConfigFile; -/// -/// // Example input from c8y -/// let data = r#" -/// { -/// "type": "/etc/tedge/tedge.toml" -/// }"#; -/// -/// // Parse the data -/// let req: C8yUploadConfigFile = serde_json::from_str(data).unwrap(); -/// ``` -#[derive(Debug, Deserialize, Eq, PartialEq)] -#[serde(rename_all = "camelCase")] -pub struct C8yUploadConfigFile { - #[serde(rename = "type")] - pub config_type: String, -} - -/// Representation of c8y_DownloadConfigFile JSON object -/// -/// ```rust -/// use c8y_api::json_c8y_deserializer::C8yDownloadConfigFile; -/// -/// // Example input from c8y -/// let data = r#" -/// { -/// "type": "/etc/tedge/tedge.toml", -/// "url": "https://example.cumulocity.com/inventory/binaries/757538" -/// }"#; -/// -/// // Parse the data -/// let req: C8yDownloadConfigFile = serde_json::from_str(data).unwrap(); -/// ``` -#[derive(Debug, Deserialize, Eq, PartialEq)] -#[serde(rename_all = "camelCase")] -pub struct C8yDownloadConfigFile { - #[serde(rename = "type")] - pub config_type: String, - pub url: String, -} - -/// Representation of c8y_Firmware JSON object -/// -/// ```rust -/// use c8y_api::json_c8y_deserializer::C8yFirmware; -/// -/// // Example input from c8y -/// let data = r#" -/// { -/// "name": "foo", -/// "version": "1.0.2", -/// "url": "https://dummy.url/firmware.zip" -/// }"#; -/// -/// // Parse the data -/// let req: C8yFirmware = serde_json::from_str(data).unwrap(); -/// ``` -#[derive(Debug, Deserialize, Eq, PartialEq)] -#[serde(rename_all = "camelCase")] -pub struct C8yFirmware { - pub name: String, - pub version: String, - pub url: String, -} - -pub trait C8yDeviceControlOperationHelper { - fn from_json_value(value: serde_json::Value) -> Result - where - Self: Sized + serde::de::DeserializeOwned, - { - serde_json::from_value(value.clone()) - } -} - -impl C8yDeviceControlOperationHelper for C8yRestart {} -impl C8yDeviceControlOperationHelper for C8ySoftwareUpdate {} -impl C8yDeviceControlOperationHelper for C8yLogfileRequest {} -impl C8yDeviceControlOperationHelper for C8yUploadConfigFile {} -impl C8yDeviceControlOperationHelper for C8yDownloadConfigFile {} -impl C8yDeviceControlOperationHelper for C8yFirmware {} - -#[derive(thiserror::Error, Debug)] -pub enum C8yJsonOverMqttDeserializerError { - #[error("Parameter {parameter} is not recognized. {hint}")] - InvalidParameter { - operation: String, - parameter: String, - hint: String, - }, -} - -#[cfg(test)] -mod tests { - use crate::json_c8y_deserializer::C8yDeviceControlOperationHelper; - use crate::json_c8y_deserializer::C8yOperation; - use crate::json_c8y_deserializer::C8ySoftwareUpdate; - use crate::json_c8y_deserializer::C8ySoftwareUpdateModule; - use assert_json_diff::assert_json_eq; - use serde_json::json; - use tedge_api::mqtt_topics::EntityTopicId; - use tedge_api::Jsonify; - use tedge_api::SoftwareModule; - use tedge_api::SoftwareModuleUpdate; - use tedge_api::SoftwareUpdateCommand; - - #[test] - fn verify_get_module_version_and_type() { - let mut module = C8ySoftwareUpdateModule { - name: "software1".into(), - version: "".into(), - url: None, - software_type: None, - action: "install".into(), - id: None, - }; // "" - assert_eq!(module.get_module_version_and_type(), (None, None)); - - module.version = " ".into(); // " " (space) - assert_eq!(module.get_module_version_and_type(), (None, None)); - - module.version = "::debian".into(); - assert_eq!( - module.get_module_version_and_type(), - (None, Some("debian".to_string())) - ); - - module.version = "1.0.0::debian".into(); - assert_eq!( - module.get_module_version_and_type(), - (Some("1.0.0".to_string()), Some("debian".to_string())) - ); - - module.version = "1.0.0::1::debian".into(); - assert_eq!( - module.get_module_version_and_type(), - (Some("1.0.0::1".to_string()), Some("debian".to_string())) - ); - - module.version = "1.0.0::1::".into(); - assert_eq!( - module.get_module_version_and_type(), - (Some("1.0.0::1".to_string()), None) - ); - - module.version = "1.0.0".into(); - assert_eq!( - module.get_module_version_and_type(), - (Some("1.0.0".to_string()), None) - ); - } - - #[test] - fn deserialize_incorrect_software_update_action() { - let device = EntityTopicId::default_main_device(); - - let data = json!([ - { - "name": "bar", - "action": "unknown", - "version": "1.0.1" - } - ]); - - assert!(serde_json::from_str::(&data.to_string()) - .unwrap() - .into_software_update_command(&device, "123".to_string()) - .is_err()); - } - - #[test] - fn from_json_over_mqtt_update_software_to_software_update_cmd() { - let json_over_mqtt_payload = json!( - { - "delivery": { - "log": [], - "time": "2023-02-08T06:51:19.350Z", - "status": "PENDING" - }, - "agentId": "22519994", - "creationTime": "2023-02-08T06:51:19.318Z", - "deviceId": "22519994", - "id": "522559", - "status": "PENDING", - "description": "test operation", - "c8y_SoftwareUpdate": [ - { - "softwareType": "dummy", - "name": "software1", - "action": "install", - "id": "123456", - "version": "version1::debian", - "url": "url1" - }, - { - "name": "software2", - "action": "delete", - "version": "" - } - ], - "externalSource": { - "externalId": "external_id", - "type": "c8y_Serial" - } - }); - - let op: C8yOperation = serde_json::from_str(&json_over_mqtt_payload.to_string()).unwrap(); - let req = C8ySoftwareUpdate::from_json_value( - op.extras - .get("c8y_SoftwareUpdate") - .expect("c8y_SoftwareUpdate field is missing") - .to_owned(), - ) - .expect("Failed to deserialize"); - let device = EntityTopicId::default_main_device(); - let thin_edge_json = req - .into_software_update_command(&device, "123".to_string()) - .unwrap(); - - let mut expected_thin_edge_json = SoftwareUpdateCommand::new(&device, "123".to_string()); - expected_thin_edge_json.add_update(SoftwareModuleUpdate::install(SoftwareModule { - module_type: Some("debian".to_string()), - name: "software1".to_string(), - version: Some("version1".to_string()), - url: Some("url1".into()), - file_path: None, - })); - expected_thin_edge_json.add_update(SoftwareModuleUpdate::remove(SoftwareModule { - module_type: Some("".to_string()), - name: "software2".to_string(), - version: None, - url: None, - file_path: None, - })); - - assert_eq!(thin_edge_json, expected_thin_edge_json); - } - - #[test] - fn from_c8y_json_to_thin_edge_software_update_json() { - let data = json!([ - { - "name": "nodered", - "action": "install", - "version": "1.0.0::debian", - "url": "" - }, - { - "name": "collectd", - "action": "install", - "version": "5.7::debian", - "url": "https://collectd.org/download/collectd-tarballs/collectd-5.12.0.tar.bz2" - }, - { - "name": "nginx", - "action": "install", - "version": "1.21.0::docker", - "url": "" - }, - { - "name": "mongodb", - "action": "delete", - "version": "4.4.6::docker" - } - ]); - - let req: C8ySoftwareUpdate = serde_json::from_str(&data.to_string()).unwrap(); - - let software_update_request = req - .into_software_update_command(&EntityTopicId::default_main_device(), "123".to_string()) - .unwrap(); - - let output_json = software_update_request.payload.to_json(); - - let expected_json = json!({ - "status": "init", - "updateList": [ - { - "type": "debian", - "modules": [ - { - "name": "nodered", - "version": "1.0.0", - "action": "install" - }, - { - "name": "collectd", - "version": "5.7", - "url": "https://collectd.org/download/collectd-tarballs/collectd-5.12.0.tar.bz2", - "action": "install" - } - ] - }, - { - "type": "docker", - "modules": [ - { - "name": "nginx", - "version": "1.21.0", - "action": "install" - }, - { - "name": "mongodb", - "version": "4.4.6", - "action": "remove" - } - ] - } - ] - }); - assert_json_eq!( - serde_json::from_str::(output_json.as_str()).unwrap(), - expected_json - ); - } - - #[test] - fn access_c8y_software_update_modules() { - let data = json!([ - { - "name": "software1", - "action": "install", - "version": "version1", - "url": "url1" - }, - { - "name": "software2", - "action": "delete", - "version": "" - } - ]); - - let update_software = serde_json::from_str::(&data.to_string()).unwrap(); - - let expected_vec = vec![ - C8ySoftwareUpdateModule { - name: "software1".into(), - version: "version1".into(), - url: Some("url1".into()), - software_type: None, - action: "install".into(), - id: None, - }, - C8ySoftwareUpdateModule { - name: "software2".into(), - version: "".into(), - url: None, - software_type: None, - action: "delete".into(), - id: None, - }, - ]; - - assert_eq!(update_software.modules(), &expected_vec); - } -} diff --git a/crates/core/c8y_api/src/lib.rs b/crates/core/c8y_api/src/lib.rs index 891db60bdf8..d1d6ffb1e7e 100644 --- a/crates/core/c8y_api/src/lib.rs +++ b/crates/core/c8y_api/src/lib.rs @@ -1,6 +1,5 @@ pub mod http_proxy; pub mod json_c8y; -pub mod json_c8y_deserializer; pub mod smartrest; pub mod utils; diff --git a/crates/core/c8y_api/src/smartrest/smartrest_deserializer.rs b/crates/core/c8y_api/src/smartrest/smartrest_deserializer.rs index f097dbb94af..c4411c40828 100644 --- a/crates/core/c8y_api/src/smartrest/smartrest_deserializer.rs +++ b/crates/core/c8y_api/src/smartrest/smartrest_deserializer.rs @@ -1,14 +1,170 @@ use crate::smartrest::error::SmartRestDeserializerError; use csv::ReaderBuilder; +use download::DownloadInfo; use serde::de::Error; use serde::Deserialize; use serde::Deserializer; use serde::Serialize; +use std::convert::TryFrom; +use std::convert::TryInto; use std::fmt::Display; use std::fmt::Formatter; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::SoftwareModule; +use tedge_api::SoftwareModuleUpdate; +use tedge_api::SoftwareUpdateCommand; use time::format_description; use time::OffsetDateTime; +#[derive(Debug)] +enum CumulocitySoftwareUpdateActions { + Install, + Delete, +} + +impl TryFrom for CumulocitySoftwareUpdateActions { + type Error = SmartRestDeserializerError; + + fn try_from(action: String) -> Result { + match action.as_str() { + "install" => Ok(Self::Install), + "delete" => Ok(Self::Delete), + param => Err(SmartRestDeserializerError::InvalidParameter { + parameter: param.into(), + operation: "c8y_SoftwareUpdate".into(), + hint: "It must be install or delete.".into(), + }), + } + } +} + +#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] +pub struct SmartRestUpdateSoftware { + pub message_id: String, + pub external_id: String, + pub update_list: Vec, +} + +#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] +pub struct SmartRestUpdateSoftwareModule { + pub software: String, + pub version: Option, + pub url: Option, + pub action: String, +} + +impl Default for SmartRestUpdateSoftware { + fn default() -> Self { + Self { + message_id: "528".into(), + external_id: "".into(), + update_list: vec![], + } + } +} + +impl SmartRestUpdateSoftware { + pub fn from_smartrest(smartrest: &str) -> Result { + let mut message_id = smartrest.to_string(); + message_id.truncate(3); + + let mut rdr = ReaderBuilder::new() + .has_headers(false) + .flexible(true) + .from_reader(smartrest.as_bytes()); + let mut record: Self = Self::default(); + + for result in rdr.deserialize() { + record = result?; + } + + Ok(record) + } + + pub fn modules(&self) -> &Vec { + &self.update_list + } + + pub fn into_software_update_command( + &self, + target: &EntityTopicId, + cmd_id: String, + ) -> Result { + let mut request = SoftwareUpdateCommand::new(target, cmd_id); + for module in self.modules() { + match module.action.clone().try_into()? { + CumulocitySoftwareUpdateActions::Install => { + request.add_update(SoftwareModuleUpdate::Install { + module: SoftwareModule { + module_type: module.get_module_version_and_type().1, + name: module.software.clone(), + version: module.get_module_version_and_type().0, + url: module.get_url(), + file_path: None, + }, + }); + } + CumulocitySoftwareUpdateActions::Delete => { + request.add_update(SoftwareModuleUpdate::Remove { + module: SoftwareModule { + module_type: module.get_module_version_and_type().1, + name: module.software.clone(), + version: module.get_module_version_and_type().0, + url: None, + file_path: None, + }, + }); + } + } + } + Ok(request) + } +} + +impl SmartRestUpdateSoftwareModule { + fn get_module_version_and_type(&self) -> (Option, Option) { + let split; + match &self.version { + Some(version) => { + if version.matches("::").count() > 1 { + split = version.rsplit_once("::"); + } else { + split = version.split_once("::"); + } + + match split { + Some((v, t)) => { + if v.is_empty() { + (None, Some(t.into())) // ::debian + } else if !t.is_empty() { + (Some(v.into()), Some(t.into())) // 1.0::debian + } else { + (Some(v.into()), None) + } + } + None => { + if version == " " { + (None, None) // as long as c8y UI forces version input + } else { + (Some(version.into()), None) // 1.0 + } + } + } + } + + None => (None, None), // (empty) + } + } + + fn get_url(&self) -> Option { + match &self.url { + Some(url) if url.trim().is_empty() => None, + Some(url) => Some(DownloadInfo::new(url.as_str())), + None => None, + } + } +} + fn fix_timezone_offset(time: &str) -> String { let str_size = time.len(); let split = time.split(['+', '-']).last(); @@ -20,7 +176,7 @@ fn fix_timezone_offset(time: &str) -> String { } } -pub fn to_datetime<'de, D>(deserializer: D) -> Result +fn to_datetime<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, { @@ -70,6 +226,16 @@ pub trait SmartRestRequestGeneric { } } +#[derive(Debug)] +pub enum SmartRestOperationVariant { + Restart(SmartRestRestartRequest), + SoftwareUpdate(SmartRestUpdateSoftware), + LogfileRequest(SmartRestLogRequest), + UploadConfigFile(SmartRestConfigUploadRequest), + DownloadConfigFile(SmartRestConfigDownloadRequest), + Firmware(SmartRestFirmwareRequest), +} + #[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] pub struct SmartRestLogRequest { pub message_id: String, @@ -85,6 +251,14 @@ pub struct SmartRestLogRequest { impl SmartRestRequestGeneric for SmartRestLogRequest {} +#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] +pub struct SmartRestRestartRequest { + pub message_id: String, + pub device: String, +} + +impl SmartRestRequestGeneric for SmartRestRestartRequest {} + #[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] pub struct SmartRestConfigUploadRequest { pub message_id: String, @@ -186,6 +360,9 @@ impl Default for AvailableChildDevices { #[cfg(test)] mod tests { use super::*; + use assert_json_diff::*; + use serde_json::json; + use tedge_api::*; use test_case::test_case; #[test] @@ -213,10 +390,216 @@ mod tests { assert_matches::assert_matches!(jwt, Err(SmartRestDeserializerError::InvalidMessageId(42))); } + #[test] + fn verify_get_module_version_and_type() { + let mut module = SmartRestUpdateSoftwareModule { + software: "software1".into(), + version: None, + url: None, + action: "install".into(), + }; // "" + assert_eq!(module.get_module_version_and_type(), (None, None)); + + module.version = Some(" ".into()); // " " (space) + assert_eq!(module.get_module_version_and_type(), (None, None)); + + module.version = Some("::debian".into()); + assert_eq!( + module.get_module_version_and_type(), + (None, Some("debian".to_string())) + ); + + module.version = Some("1.0.0::debian".into()); + assert_eq!( + module.get_module_version_and_type(), + (Some("1.0.0".to_string()), Some("debian".to_string())) + ); + + module.version = Some("1.0.0::1::debian".into()); + assert_eq!( + module.get_module_version_and_type(), + (Some("1.0.0::1".to_string()), Some("debian".to_string())) + ); + + module.version = Some("1.0.0::1::".into()); + assert_eq!( + module.get_module_version_and_type(), + (Some("1.0.0::1".to_string()), None) + ); + + module.version = Some("1.0.0".into()); + assert_eq!( + module.get_module_version_and_type(), + (Some("1.0.0".to_string()), None) + ); + } + + #[test] + fn deserialize_smartrest_update_software() { + let smartrest = + String::from("528,external_id,software1,version1,url1,install,software2,,,delete"); + let update_software = SmartRestUpdateSoftware::from_smartrest(&smartrest).unwrap(); + + let expected_update_software = SmartRestUpdateSoftware { + message_id: "528".into(), + external_id: "external_id".into(), + update_list: vec![ + SmartRestUpdateSoftwareModule { + software: "software1".into(), + version: Some("version1".into()), + url: Some("url1".into()), + action: "install".into(), + }, + SmartRestUpdateSoftwareModule { + software: "software2".into(), + version: None, + url: None, + action: "delete".into(), + }, + ], + }; + + assert_eq!(update_software, expected_update_software); + } + #[test] fn deserialize_incorrect_smartrest_message_id() { let smartrest = String::from("516,external_id"); - assert!(SmartRestConfigUploadRequest::from_smartrest(&smartrest).is_err()); + assert!(SmartRestUpdateSoftware::from_smartrest(&smartrest).is_err()); + } + + #[test] + fn deserialize_incorrect_smartrest_action() { + let device = EntityTopicId::default_main_device(); + let smartrest = + String::from("528,external_id,software1,version1,url1,action,software2,,,remove"); + assert!(SmartRestUpdateSoftware::from_smartrest(&smartrest) + .unwrap() + .into_software_update_command(&device, "123".to_string()) + .is_err()); + } + + #[test] + fn from_smartrest_update_software_to_software_update_request() { + let smartrest_obj = SmartRestUpdateSoftware { + message_id: "528".into(), + external_id: "external_id".into(), + update_list: vec![ + SmartRestUpdateSoftwareModule { + software: "software1".into(), + version: Some("version1::debian".into()), + url: Some("url1".into()), + action: "install".into(), + }, + SmartRestUpdateSoftwareModule { + software: "software2".into(), + version: None, + url: None, + action: "delete".into(), + }, + ], + }; + let device = EntityTopicId::default_main_device(); + let thin_edge_json = smartrest_obj + .into_software_update_command(&device, "123".to_string()) + .unwrap(); + + let mut expected_thin_edge_json = SoftwareUpdateCommand::new(&device, "123".to_string()); + expected_thin_edge_json.add_update(SoftwareModuleUpdate::install(SoftwareModule { + module_type: Some("debian".to_string()), + name: "software1".to_string(), + version: Some("version1".to_string()), + url: Some("url1".into()), + file_path: None, + })); + expected_thin_edge_json.add_update(SoftwareModuleUpdate::remove(SoftwareModule { + module_type: Some("".to_string()), + name: "software2".to_string(), + version: None, + url: None, + file_path: None, + })); + + assert_eq!(thin_edge_json, expected_thin_edge_json); + } + + #[test] + fn from_smartrest_update_software_to_json() { + let smartrest = + String::from("528,external_id,nodered,1.0.0::debian,,install,\ + collectd,5.7::debian,https://collectd.org/download/collectd-tarballs/collectd-5.12.0.tar.bz2,install,\ + nginx,1.21.0::docker,,install,mongodb,4.4.6::docker,,delete"); + let software_update_request = SmartRestUpdateSoftware::from_smartrest(&smartrest) + .unwrap() + .into_software_update_command(&EntityTopicId::default_main_device(), "123".to_string()) + .unwrap(); + + let output_json = software_update_request.payload.to_json(); + + let expected_json = json!({ + "status": "init", + "updateList": [ + { + "type": "debian", + "modules": [ + { + "name": "nodered", + "version": "1.0.0", + "action": "install" + }, + { + "name": "collectd", + "version": "5.7", + "url": "https://collectd.org/download/collectd-tarballs/collectd-5.12.0.tar.bz2", + "action": "install" + } + ] + }, + { + "type": "docker", + "modules": [ + { + "name": "nginx", + "version": "1.21.0", + "action": "install" + }, + { + "name": "mongodb", + "version": "4.4.6", + "action": "remove" + } + ] + } + ] + }); + assert_json_eq!( + serde_json::from_str::(output_json.as_str()).unwrap(), + expected_json + ); + } + + #[test] + fn access_smartrest_update_modules() { + let smartrest = + String::from("528,external_id,software1,version1,url1,install,software2,,,delete"); + let update_software = SmartRestUpdateSoftware::from_smartrest(&smartrest).unwrap(); + + let expected_vec = vec![ + SmartRestUpdateSoftwareModule { + software: "software1".into(), + version: Some("version1".into()), + url: Some("url1".into()), + action: "install".into(), + }, + SmartRestUpdateSoftwareModule { + software: "software2".into(), + version: None, + url: None, + action: "delete".into(), + }, + ]; + + assert_eq!(update_software.modules(), &expected_vec); } #[test_case("2021-09-21T11:40:27+0200", "2021-09-22T11:40:27+0200"; "c8y expected")] @@ -234,6 +617,13 @@ mod tests { assert!(log.is_ok()); } + #[test] + fn deserialize_smartrest_restart_request_operation() { + let smartrest = "510,user".to_string(); + let log = SmartRestRestartRequest::from_smartrest(&smartrest); + assert!(log.is_ok()); + } + #[test] fn deserialize_smartrest_config_upload_request() { let message_id = "526".to_string(); diff --git a/crates/core/tedge/src/cli/connect/bridge_config_c8y.rs b/crates/core/tedge/src/cli/connect/bridge_config_c8y.rs index a0ffa9354d3..8ad07f89ab7 100644 --- a/crates/core/tedge/src/cli/connect/bridge_config_c8y.rs +++ b/crates/core/tedge/src/cli/connect/bridge_config_c8y.rs @@ -57,7 +57,6 @@ impl From for BridgeConfig { r#"measurement/measurements/create out 2 c8y/ """#.into(), r#"event/events/create out 2 c8y/ """#.into(), r#"alarm/alarms/create out 2 c8y/ """#.into(), - r#"devicecontrol/notifications in 2 c8y/ """#.into(), r#"error in 2 c8y/ """#.into(), // c8y JWT token retrieval r#"s/uat out 0 c8y/ """#.into(), @@ -198,7 +197,6 @@ fn test_bridge_config_from_c8y_params() -> anyhow::Result<()> { r#"measurement/measurements/create out 2 c8y/ """#.into(), r#"event/events/create out 2 c8y/ """#.into(), r#"alarm/alarms/create out 2 c8y/ """#.into(), - r#"devicecontrol/notifications in 2 c8y/ """#.into(), r#"error in 2 c8y/ """#.into(), // c8y JWT token retrieval r#"s/uat out 0 c8y/ """#.into(), diff --git a/crates/core/tedge_api/src/mqtt_topics.rs b/crates/core/tedge_api/src/mqtt_topics.rs index 9c70c0a3e1f..8829e7d91a3 100644 --- a/crates/core/tedge_api/src/mqtt_topics.rs +++ b/crates/core/tedge_api/src/mqtt_topics.rs @@ -710,10 +710,6 @@ impl IdGenerator { ) } - pub fn new_id_with_str(&self, value: &str) -> String { - format!("{}-{}", self.prefix, value) - } - pub fn is_generator_of(&self, cmd_id: &str) -> bool { cmd_id.contains(&self.prefix) } diff --git a/crates/extensions/c8y_mapper_ext/Cargo.toml b/crates/extensions/c8y_mapper_ext/Cargo.toml index 38381ab3333..de5f18ec63d 100644 --- a/crates/extensions/c8y_mapper_ext/Cargo.toml +++ b/crates/extensions/c8y_mapper_ext/Cargo.toml @@ -22,6 +22,7 @@ logged_command = { workspace = true } plugin_sm = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +sha256 = { workspace = true } tedge_actors = { workspace = true } tedge_api = { workspace = true } tedge_config = { workspace = true } diff --git a/crates/extensions/c8y_mapper_ext/src/compatibility_adapter.rs b/crates/extensions/c8y_mapper_ext/src/compatibility_adapter.rs index d391e9125e6..07411357d9f 100644 --- a/crates/extensions/c8y_mapper_ext/src/compatibility_adapter.rs +++ b/crates/extensions/c8y_mapper_ext/src/compatibility_adapter.rs @@ -150,12 +150,9 @@ fn convert_from_old_agent_response( if let Ok(Value::Object(response)) = serde_json::from_slice(payload) { if let Some(Value::String(cmd_id)) = response.get("id") { // The new mapper expects command ids with a specific prefix - let topic_name = if cmd_id.contains("c8y-mapper") { - format!("te/device/main///cmd/{cmd_type}/{cmd_id}") - } else { - format!("te/device/main///cmd/{cmd_type}/c8y-mapper-{cmd_id}") - }; - if let Ok(topic) = Topic::new(&topic_name) { + if let Ok(topic) = Topic::new(&format!( + "te/device/main///cmd/{cmd_type}/c8y-mapper-{cmd_id}" + )) { return Ok(Some( MqttMessage::new(&topic, payload) .with_retain() diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index befeb6f87ce..79ef81ef409 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -1,5 +1,4 @@ use crate::Capabilities; -use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; use c8y_api::smartrest::error::OperationsError; use c8y_api::smartrest::operations::Operations; use c8y_api::smartrest::topic::C8yTopic; @@ -199,7 +198,6 @@ impl C8yMapperConfig { let mut topic_filter: TopicFilter = vec![ "c8y-internal/alarms/+/+/+/+/+/a/+", C8yTopic::SmartRestRequest.to_string().as_str(), - C8yDeviceControlTopic::name(), ] .try_into() .expect("topics that mapper should subscribe to"); diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 454dde38594..8b1de50b074 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -15,11 +15,6 @@ use anyhow::Context; use c8y_api::http_proxy::C8yEndPoint; use c8y_api::json_c8y::C8yCreateEvent; use c8y_api::json_c8y::C8yUpdateSoftwareListResponse; -use c8y_api::json_c8y_deserializer::C8yDeviceControlOperation; -use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; -use c8y_api::json_c8y_deserializer::C8yJsonOverMqttDeserializerError; -use c8y_api::json_c8y_deserializer::C8yOperation; -use c8y_api::json_c8y_deserializer::C8ySoftwareUpdate; use c8y_api::smartrest::error::OperationsError; use c8y_api::smartrest::error::SmartRestDeserializerError; use c8y_api::smartrest::inventory::child_device_creation_message; @@ -36,6 +31,8 @@ use c8y_api::smartrest::operations::Operations; use c8y_api::smartrest::operations::ResultFormat; use c8y_api::smartrest::smartrest_deserializer::AvailableChildDevices; use c8y_api::smartrest::smartrest_deserializer::SmartRestRequestGeneric; +use c8y_api::smartrest::smartrest_deserializer::SmartRestRestartRequest; +use c8y_api::smartrest::smartrest_deserializer::SmartRestUpdateSoftware; use c8y_api::smartrest::smartrest_serializer::fail_operation; use c8y_api::smartrest::smartrest_serializer::request_pending_operations; use c8y_api::smartrest::smartrest_serializer::set_operation_executing; @@ -98,10 +95,8 @@ use tedge_utils::size_threshold::SizeThreshold; use thiserror::Error; use tokio::time::Duration; use tracing::debug; -use tracing::error; -use tracing::info; +use tracing::log::error; use tracing::trace; -use tracing::warn; const C8Y_CLOUD: &str = "c8y"; const SUPPORTED_OPERATIONS_DIRECTORY: &str = "operations"; @@ -210,8 +205,6 @@ pub struct CumulocityConverter { pub pending_fts_download_operations: HashMap, pub command_id: IdGenerator, - // Keep active command IDs to avoid creation of multiple commands for an operation - pub active_commands: HashSet, } impl CumulocityConverter { @@ -299,7 +292,6 @@ impl CumulocityConverter { pending_upload_operations: HashMap::new(), pending_fts_download_operations: HashMap::new(), command_id, - active_commands: HashSet::new(), }) } @@ -557,127 +549,36 @@ impl CumulocityConverter { )) } - async fn parse_c8y_devicecontrol_topic( - &mut self, - message: &Message, - ) -> Result, ConversionError> { - let operation = C8yOperation::from_json(message.payload.as_str()?)?; - let device_xid = operation.external_source.external_id; - let cmd_id = self.command_id.new_id_with_str(&operation.op_id); - - if self.active_commands.contains(&cmd_id) { - info!("{cmd_id} is already addressed"); - return Ok(vec![]); - } - - let result = self - .process_json_over_mqtt(device_xid, cmd_id, &operation.extras) - .await; - let output = self.handle_c8y_operation_result(&result); - - Ok(output) - } - - async fn process_json_over_mqtt( - &mut self, - device_xid: String, - cmd_id: String, - extras: &HashMap, - ) -> Result, CumulocityMapperError> { - let msgs = match C8yDeviceControlOperation::from_json_object(extras)? { - C8yDeviceControlOperation::Restart(_) => { - self.forward_restart_request(device_xid, cmd_id)? - } - C8yDeviceControlOperation::SoftwareUpdate(request) => { - self.forward_software_request(device_xid, cmd_id, request) - .await? - } - C8yDeviceControlOperation::LogfileRequest(request) => { - if self.config.capabilities.log_upload { - self.convert_log_upload_request(device_xid, cmd_id, request)? - } else { - warn!("Received a c8y_LogfileRequest operation, however, log_upload feature is disabled"); - vec![] - } - } - C8yDeviceControlOperation::UploadConfigFile(request) => { - if self.config.capabilities.config_snapshot { - self.convert_config_snapshot_request(device_xid, cmd_id, request)? - } else { - warn!("Received a c8y_UploadConfigFile operation, however, config_snapshot feature is disabled"); - vec![] - } - } - C8yDeviceControlOperation::DownloadConfigFile(request) => { - if self.config.capabilities.config_update { - self.convert_config_update_request(device_xid, cmd_id, request) - .await? - } else { - warn!("Received a c8y_DownloadConfigFile operation, however, config_update feature is disabled"); - vec![] - } - } - C8yDeviceControlOperation::Firmware(request) => { - if self.config.capabilities.firmware_update { - self.convert_firmware_update_request(device_xid, cmd_id, request)? - } else { - warn!("Received a c8y_Firmware operation, however, firmware_update feature is disabled"); - vec![] - } - } - C8yDeviceControlOperation::Custom => { - // Ignores custom and static template operations unsupported by thin-edge - // However, these operations can be addressed by SmartREST that is published together with JSON over MQTT - vec![] - } - }; - - Ok(msgs) - } - - async fn parse_c8y_smartrest_topics( + async fn parse_c8y_topics( &mut self, message: &Message, ) -> Result, ConversionError> { let mut output: Vec = Vec::new(); for smartrest_message in collect_smartrest_messages(message.payload_str()?) { - let result = self.process_smartrest(smartrest_message.as_str()).await; - let mut msgs = self.handle_c8y_operation_result(&result); - output.append(&mut msgs) - } - Ok(output) - } + match &self.process_smartrest(smartrest_message.as_str()).await { + Err( + err @ CumulocityMapperError::FromSmartRestDeserializer( + SmartRestDeserializerError::InvalidParameter { operation, .. }, + ) + | err @ CumulocityMapperError::ExecuteFailed { + operation_name: operation, + .. + }, + ) => { + let topic = C8yTopic::SmartRestResponse.to_topic()?; + let msg1 = Message::new(&topic, set_operation_executing(operation)); + let msg2 = Message::new(&topic, fail_operation(operation, &err.to_string())); + error!("{err}"); + output.extend_from_slice(&[msg1, msg2]); + } + Err(err) => { + error!("{err}"); + } - fn handle_c8y_operation_result( - &mut self, - result: &Result, CumulocityMapperError>, - ) -> Vec { - match result { - Err( - err @ CumulocityMapperError::FromSmartRestDeserializer( - SmartRestDeserializerError::InvalidParameter { operation, .. }, - ) - | err @ CumulocityMapperError::FromC8yJsonOverMqttDeserializerError( - C8yJsonOverMqttDeserializerError::InvalidParameter { operation, .. }, - ) - | err @ CumulocityMapperError::ExecuteFailed { - operation_name: operation, - .. - }, - ) => { - let topic = C8yTopic::SmartRestResponse.to_topic().unwrap(); - let msg1 = Message::new(&topic, set_operation_executing(operation)); - let msg2 = Message::new(&topic, fail_operation(operation, &err.to_string())); - error!("{err}"); - vec![msg1, msg2] - } - Err(err) => { - error!("{err}"); - vec![] + Ok(msgs) => output.extend_from_slice(msgs), } - - Ok(msgs) => msgs.to_owned(), } + Ok(output) } async fn process_smartrest( @@ -687,6 +588,21 @@ impl CumulocityConverter { match get_smartrest_device_id(payload) { Some(device_id) => { match get_smartrest_template_id(payload).as_str() { + // Need a check of capabilities so that user can still use custom template if disabled + "522" if self.config.capabilities.log_upload => { + self.convert_log_upload_request(payload) + } + "524" if self.config.capabilities.config_update => { + self.convert_config_update_request(payload).await + } + "526" if self.config.capabilities.config_snapshot => { + self.convert_config_snapshot_request(payload) + } + "515" if self.config.capabilities.firmware_update => { + self.convert_firmware_update_request(payload) + } + "528" => self.forward_software_request(payload).await, + "510" => self.forward_restart_request(payload), template if device_id == self.device_name => { self.forward_operation_request(payload, template).await } @@ -715,14 +631,13 @@ impl CumulocityConverter { async fn forward_software_request( &mut self, - device_xid: String, - cmd_id: String, - software_update_request: C8ySoftwareUpdate, + smartrest: &str, ) -> Result, CumulocityMapperError> { - let entity_xid: EntityExternalId = device_xid.into(); - let target = self.entity_store.try_get_by_external_id(&entity_xid)?; - let mut command = - software_update_request.into_software_update_command(&target.topic_id, cmd_id)?; + let update_software = SmartRestUpdateSoftware::from_smartrest(smartrest)?; + let device_id = &update_software.external_id.clone().into(); + let target = self.entity_store.try_get_by_external_id(device_id)?; + let cmd_id = self.command_id.new_id(); + let mut command = update_software.into_software_update_command(&target.topic_id, cmd_id)?; command.payload.update_list.iter_mut().for_each(|modules| { modules.modules.iter_mut().for_each(|module| { @@ -744,11 +659,12 @@ impl CumulocityConverter { fn forward_restart_request( &mut self, - device_xid: String, - cmd_id: String, + smartrest: &str, ) -> Result, CumulocityMapperError> { - let entity_xid: EntityExternalId = device_xid.into(); - let target = self.entity_store.try_get_by_external_id(&entity_xid)?; + let request = SmartRestRestartRequest::from_smartrest(smartrest)?; + let device_id = &request.device.into(); + let target = self.entity_store.try_get_by_external_id(device_id)?; + let cmd_id = self.command_id.new_id(); let command = RestartCommand::new(&target.topic_id, cmd_id); let message = command.command_message(&self.mqtt_schema); Ok(vec![message]) @@ -1018,7 +934,7 @@ impl CumulocityConverter { trace!("Message content: {:?}", message.payload_str()); match self.mqtt_schema.entity_channel_of(&message.topic) { Ok((source, channel)) => self.try_convert_te_topics(source, channel, message).await, - Err(_) => self.try_convert_tedge_and_c8y_topics(message).await, + Err(_) => self.try_convert_tedge_topics(message).await, } } @@ -1122,9 +1038,8 @@ impl CumulocityConverter { self.process_alarm_messages(&source, message, alarm_type) } - Channel::Command { cmd_id, .. } if message.payload_bytes().is_empty() => { + Channel::Command { .. } if message.payload_bytes().is_empty() => { // The command has been fully processed - self.active_commands.remove(cmd_id); Ok(vec![]) } @@ -1152,38 +1067,59 @@ impl CumulocityConverter { } } - Channel::Command { operation, cmd_id } if self.command_id.is_generator_of(cmd_id) => { - self.active_commands.insert(cmd_id.clone()); - match operation { - OperationType::Restart => { - self.publish_restart_operation_status(&source, cmd_id, message) - .await - } - OperationType::SoftwareList => { - self.publish_software_list(&source, cmd_id, message).await - } - OperationType::SoftwareUpdate => { - self.publish_software_update_status(&source, cmd_id, message) - .await - } - OperationType::LogUpload => { - self.handle_log_upload_state_change(&source, cmd_id, message) - .await - } - OperationType::ConfigSnapshot => { - self.handle_config_snapshot_state_change(&source, cmd_id, message) - .await - } - OperationType::ConfigUpdate => { - self.handle_config_update_state_change(&source, cmd_id, message) - .await - } - OperationType::FirmwareUpdate => { - self.handle_firmware_update_state_change(&source, message) - .await - } - _ => Ok(vec![]), - } + Channel::Command { + operation: OperationType::Restart, + cmd_id, + } if self.command_id.is_generator_of(cmd_id) => { + self.publish_restart_operation_status(&source, cmd_id, message) + .await + } + + Channel::Command { + operation: OperationType::SoftwareList, + cmd_id, + } if self.command_id.is_generator_of(cmd_id) => { + self.publish_software_list(&source, cmd_id, message).await + } + + Channel::Command { + operation: OperationType::SoftwareUpdate, + cmd_id, + } if self.command_id.is_generator_of(cmd_id) => { + self.publish_software_update_status(&source, cmd_id, message) + .await + } + + Channel::Command { + operation: OperationType::LogUpload, + cmd_id, + } if self.command_id.is_generator_of(cmd_id) => { + self.handle_log_upload_state_change(&source, cmd_id, message) + .await + } + + Channel::Command { + operation: OperationType::ConfigSnapshot, + cmd_id, + } if self.command_id.is_generator_of(cmd_id) => { + self.handle_config_snapshot_state_change(&source, cmd_id, message) + .await + } + + Channel::Command { + operation: OperationType::ConfigUpdate, + cmd_id, + } if self.command_id.is_generator_of(cmd_id) => { + self.handle_config_update_state_change(&source, cmd_id, message) + .await + } + + Channel::Command { + operation: OperationType::FirmwareUpdate, + cmd_id, + } if self.command_id.is_generator_of(cmd_id) => { + self.handle_firmware_update_state_change(&source, message) + .await } Channel::Health => self.process_health_status_message(&source, message).await, @@ -1287,7 +1223,7 @@ impl CumulocityConverter { .with_retain() } - async fn try_convert_tedge_and_c8y_topics( + async fn try_convert_tedge_topics( &mut self, message: &Message, ) -> Result, ConversionError> { @@ -1296,10 +1232,7 @@ impl CumulocityConverter { self.alarm_converter.process_internal_alarm(message); Ok(vec![]) } - topic if C8yDeviceControlTopic::accept(topic) => { - self.parse_c8y_devicecontrol_topic(message).await - } - topic if C8yTopic::accept(topic) => self.parse_c8y_smartrest_topics(message).await, + topic if C8yTopic::accept(topic) => self.parse_c8y_topics(message).await, _ => { error!("Unsupported topic: {}", message.topic.name); Ok(vec![]) @@ -1681,7 +1614,6 @@ pub(crate) mod tests { use assert_json_diff::assert_json_eq; use assert_json_diff::assert_json_include; use assert_matches::assert_matches; - use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; use c8y_api::smartrest::operations::ResultFormat; use c8y_api::smartrest::topic::SMARTREST_PUBLISH_TOPIC; use c8y_auth_proxy::url::Protocol; @@ -2786,27 +2718,8 @@ pub(crate) mod tests { EntityFilter::Entity(&child), ChannelFilter::Command(OperationType::SoftwareUpdate), ); - let mqtt_message = MqttMessage::new( - &C8yDeviceControlTopic::topic(), - json!({ - "id": "123456", - "c8y_SoftwareUpdate": [ - { - "name": "software_a", - "action": "install", - "version": "version_a", - "url": "url_a" - } - ], - "externalSource": { - "externalId": "test-device:device:childId", - "type": "c8y_Serial" - } - }) - .to_string(), - ); let command = converter - .parse_c8y_devicecontrol_topic(&mqtt_message) + .process_smartrest("528,test-device:device:childId,software_a,version_a,url_a,install") .await .unwrap() .get(0) diff --git a/crates/extensions/c8y_mapper_ext/src/error.rs b/crates/extensions/c8y_mapper_ext/src/error.rs index 562fc115d94..9a55034c407 100644 --- a/crates/extensions/c8y_mapper_ext/src/error.rs +++ b/crates/extensions/c8y_mapper_ext/src/error.rs @@ -157,11 +157,6 @@ pub enum CumulocityMapperError { #[error(transparent)] FromSmartRestDeserializer(#[from] SmartRestDeserializerError), - #[error(transparent)] - FromC8yJsonOverMqttDeserializerError( - #[from] c8y_api::json_c8y_deserializer::C8yJsonOverMqttDeserializerError, - ), - #[error(transparent)] FromSmCumulocityMapperError(#[from] SMCumulocityMapperError), diff --git a/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs b/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs index bb83e13350e..dc9fc95c696 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs @@ -6,7 +6,8 @@ use crate::error::ConversionError; use crate::error::CumulocityMapperError; use crate::operations::FtsDownloadOperationType; use anyhow::Context; -use c8y_api::json_c8y_deserializer::C8yUploadConfigFile; +use c8y_api::smartrest::smartrest_deserializer::SmartRestConfigUploadRequest; +use c8y_api::smartrest::smartrest_deserializer::SmartRestRequestGeneric; use c8y_api::smartrest::smartrest_serializer::fail_operation; use c8y_api::smartrest::smartrest_serializer::set_operation_executing; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; @@ -48,17 +49,18 @@ pub fn topic_filter(mqtt_schema: &MqttSchema) -> TopicFilter { } impl CumulocityConverter { - /// Convert c8y_UploadConfigFile JSON over MQTT operation to ThinEdge config_snapshot command + /// Convert c8y_UploadConfigFile SmartREST request to ThinEdge config_snapshot command. + /// Command ID is generated here, but it should be replaced by c8y's operation ID in the future. pub fn convert_config_snapshot_request( &self, - device_xid: String, - cmd_id: String, - config_upload_request: C8yUploadConfigFile, + smartrest: &str, ) -> Result, CumulocityMapperError> { + let snapshot_request = SmartRestConfigUploadRequest::from_smartrest(smartrest)?; let target = self .entity_store - .try_get_by_external_id(&device_xid.into())?; + .try_get_by_external_id(&snapshot_request.device.clone().into())?; + let cmd_id = self.command_id.new_id(); let channel = Channel::Command { operation: OperationType::ConfigSnapshot, cmd_id: cmd_id.clone(), @@ -70,14 +72,14 @@ impl CumulocityConverter { "http://{}/tedge/file-transfer/{}/config_snapshot/{}-{}", &self.config.tedge_http_host, target.external_id.as_ref(), - config_upload_request.config_type.replace('/', ":"), + snapshot_request.config_type.replace('/', ":"), cmd_id ); let request = ConfigSnapshotCmdPayload { status: CommandStatus::Init, tedge_url, - config_type: config_upload_request.config_type, + config_type: snapshot_request.config_type, path: None, }; @@ -262,15 +264,17 @@ mod tests { use crate::tests::skip_init_messages; use crate::tests::spawn_c8y_mapper_actor; use crate::tests::spawn_dummy_c8y_http_proxy; - use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; + use c8y_api::smartrest::topic::C8yTopic; use serde_json::json; use std::time::Duration; use tedge_actors::test_helpers::MessageReceiverExt; use tedge_actors::MessageReceiver; use tedge_actors::Sender; + use tedge_api::mqtt_topics::Channel; + use tedge_api::mqtt_topics::MqttSchema; + use tedge_api::mqtt_topics::OperationType; use tedge_downloader_ext::DownloadResponse; use tedge_mqtt_ext::test_helpers::assert_received_contains_str; - use tedge_mqtt_ext::test_helpers::assert_received_includes_json; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; use tedge_test_utils::fs::TempTedgeDir; @@ -279,47 +283,56 @@ mod tests { const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000); #[tokio::test] - async fn mapper_converts_config_upload_op_to_config_snapshot_cmd_for_main_device() { + async fn mapper_converts_smartrest_config_upload_req_to_config_snapshot_cmd_for_main_device() { let ttd = TempTedgeDir::new(); let (mqtt, _http, _fs, _timer, _ul, _dl) = spawn_c8y_mapper_actor(&ttd, true).await; let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); skip_init_messages(&mut mqtt).await; - // Simulate c8y_UploadConfigFile operation delivered via JSON over MQTT + // Simulate c8y_UploadConfigFile SmartREST request mqtt.send(MqttMessage::new( - &C8yDeviceControlTopic::topic(), - json!({ - "id": "123456", - "c8y_UploadConfigFile": { - "type": "path/config/A" - }, - "externalSource": { - "externalId": "test-device", - "type": "c8y_Serial" - } - }) - .to_string(), + &C8yTopic::downstream_topic(), + "526,test-device,path/config/A", )) .await .expect("Send failed"); - assert_received_includes_json( - &mut mqtt, - [( - "te/device/main///cmd/config_snapshot/c8y-mapper-123456", - json!({ - "status": "init", - "tedgeUrl": "http://localhost:8888/tedge/file-transfer/test-device/config_snapshot/path:config:A-c8y-mapper-123456", - "type": "path/config/A", - }), - )], - ) - .await; + let (topic, received_json) = mqtt + .recv() + .await + .map(|msg| { + ( + msg.topic, + serde_json::from_str::(msg.payload.as_str().expect("UTF8")) + .expect("JSON"), + ) + }) + .unwrap(); + + let mqtt_schema = MqttSchema::default(); + let (entity, channel) = mqtt_schema.entity_channel_of(&topic).unwrap(); + assert_eq!(entity, "device/main//"); + + if let Channel::Command { + operation: OperationType::ConfigSnapshot, + cmd_id, + } = channel + { + // Validate the payload JSON + let expected_json = json!({ + "status": "init", + "tedgeUrl": format!("http://localhost:8888/tedge/file-transfer/test-device/config_snapshot/path:config:A-{cmd_id}"), + "type": "path/config/A", + }); + assert_json_diff::assert_json_include!(actual: received_json, expected: expected_json); + } else { + panic!("Unexpected response on channel: {:?}", topic) + } } #[tokio::test] - async fn mapper_converts_config_upload_op_to_config_snapshot_cmd_for_child_device() { + async fn mapper_converts_smartrest_config_upload_req_to_config_snapshot_cmd_for_child_device() { let ttd = TempTedgeDir::new(); let (mqtt, _http, _fs, _timer, _ul, _dl) = spawn_c8y_mapper_actor(&ttd, true).await; let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); @@ -336,36 +349,45 @@ mod tests { mqtt.skip(2).await; // Skip the mapped child device registration message - // Simulate c8y_UploadConfigFile operation delivered via JSON over MQTT + // Simulate c8y_UploadConfigFile SmartREST request mqtt.send(MqttMessage::new( - &C8yDeviceControlTopic::topic(), - json!({ - "id": "123456", - "c8y_UploadConfigFile": { - "type": "configA" - }, - "externalSource": { - "externalId": "child1", - "type": "c8y_Serial" - } - }) - .to_string(), + &C8yTopic::downstream_topic(), + "526,child1,configA", )) .await .expect("Send failed"); - assert_received_includes_json( - &mut mqtt, - [( - "te/device/child1///cmd/config_snapshot/c8y-mapper-123456", - json!({ - "status": "init", - "tedgeUrl": "http://localhost:8888/tedge/file-transfer/child1/config_snapshot/configA-c8y-mapper-123456", - "type": "configA", - }), - )], - ) - .await; + let (topic, received_json) = mqtt + .recv() + .await + .map(|msg| { + ( + msg.topic, + serde_json::from_str::(msg.payload.as_str().expect("UTF8")) + .expect("JSON"), + ) + }) + .unwrap(); + + let mqtt_schema = MqttSchema::default(); + let (entity, channel) = mqtt_schema.entity_channel_of(&topic).unwrap(); + assert_eq!(entity, "device/child1//"); + + if let Channel::Command { + operation: OperationType::ConfigSnapshot, + cmd_id, + } = channel + { + // Validate the payload JSON + let expected_json = json!({ + "status": "init", + "tedgeUrl": format!("http://localhost:8888/tedge/file-transfer/child1/config_snapshot/configA-{cmd_id}"), + "type": "configA", + }); + assert_json_diff::assert_json_include!(actual: received_json, expected: expected_json); + } else { + panic!("Unexpected response on channel: {:?}", topic) + } } #[tokio::test] diff --git a/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs b/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs index 33086eca750..dde2e9c7bcb 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs @@ -1,13 +1,13 @@ use crate::converter::CumulocityConverter; use crate::error::ConversionError; use crate::error::CumulocityMapperError; -use c8y_api::json_c8y_deserializer::C8yDownloadConfigFile; +use c8y_api::smartrest::smartrest_deserializer::SmartRestConfigDownloadRequest; +use c8y_api::smartrest::smartrest_deserializer::SmartRestRequestGeneric; use c8y_api::smartrest::smartrest_serializer::fail_operation; use c8y_api::smartrest::smartrest_serializer::set_operation_executing; use c8y_api::smartrest::smartrest_serializer::succeed_operation_no_payload; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; use std::sync::Arc; -use tedge_api::entity_store::EntityExternalId; use tedge_api::entity_store::EntityMetadata; use tedge_api::messages::CommandStatus; use tedge_api::messages::ConfigUpdateCmdPayload; @@ -98,15 +98,16 @@ impl CumulocityConverter { /// command channel. pub async fn convert_config_update_request( &mut self, - device_xid: String, - cmd_id: String, - config_download_request: C8yDownloadConfigFile, + smartrest: &str, ) -> Result, CumulocityMapperError> { - let entity_xid: EntityExternalId = device_xid.into(); - let target = self.entity_store.try_get_by_external_id(&entity_xid)?; + let smartrest = SmartRestConfigDownloadRequest::from_smartrest(smartrest)?; + let target = self + .entity_store + .try_get_by_external_id(&smartrest.device.clone().into())?; - let message = - self.create_config_update_cmd(cmd_id.into(), &config_download_request, target); + let cmd_id = self.command_id.new_id(); + + let message = self.create_config_update_cmd(cmd_id.into(), &smartrest, target); Ok(message) } @@ -128,7 +129,7 @@ impl CumulocityConverter { fn create_config_update_cmd( &self, cmd_id: Arc, - config_download_request: &C8yDownloadConfigFile, + smartrest: &SmartRestConfigDownloadRequest, target: &EntityMetadata, ) -> Vec { let channel = Channel::Command { @@ -139,16 +140,16 @@ impl CumulocityConverter { let proxy_url = self .c8y_endpoint - .maybe_tenant_url(&config_download_request.url) + .maybe_tenant_url(&smartrest.url) .map(|cumulocity_url| self.auth_proxy.proxy_url(cumulocity_url).into()); - let remote_url = proxy_url.unwrap_or(config_download_request.url.to_string()); + let remote_url = proxy_url.unwrap_or(smartrest.url.to_string()); let request = ConfigUpdateCmdPayload { status: CommandStatus::Init, tedge_url: None, remote_url, - config_type: config_download_request.config_type.clone(), + config_type: smartrest.config_type.clone(), path: None, }; @@ -161,13 +162,17 @@ impl CumulocityConverter { mod tests { use crate::tests::skip_init_messages; use crate::tests::spawn_c8y_mapper_actor; - use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; + use c8y_api::smartrest::topic::C8yTopic; use serde_json::json; + use sha256::digest; use std::time::Duration; use tedge_actors::test_helpers::MessageReceiverExt; + use tedge_actors::MessageReceiver; use tedge_actors::Sender; + use tedge_api::mqtt_topics::Channel; + use tedge_api::mqtt_topics::MqttSchema; + use tedge_api::mqtt_topics::OperationType; use tedge_mqtt_ext::test_helpers::assert_received_contains_str; - use tedge_mqtt_ext::test_helpers::assert_received_includes_json; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; use tedge_test_utils::fs::TempTedgeDir; @@ -175,48 +180,57 @@ mod tests { const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000); #[tokio::test] - async fn mapper_converts_config_download_op_for_main_device() { + async fn mapper_converts_smartrest_config_download_req_with_new_download_for_main_device() { let ttd = TempTedgeDir::new(); let (mqtt, _http, _fs, _timer, _ul, _dl) = spawn_c8y_mapper_actor(&ttd, true).await; let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); skip_init_messages(&mut mqtt).await; - // Simulate c8y_DownloadConfigFile operation delivered via JSON over MQTT + // Simulate c8y_DownloadConfigFile SmartREST request mqtt.send(MqttMessage::new( - &C8yDeviceControlTopic::topic(), - json!({ - "id": "123456", - "c8y_DownloadConfigFile": { - "type": "path/config/A", - "url": "http://www.my.url" - }, - "externalSource": { - "externalId": "test-device", - "type": "c8y_Serial" - } - }) - .to_string(), + &C8yTopic::downstream_topic(), + "524,test-device,http://www.my.url,path/config/A", )) .await .expect("Send failed"); - assert_received_includes_json( - &mut mqtt, - [( - "te/device/main///cmd/config_update/c8y-mapper-123456", - json!({ - "status": "init", - "remoteUrl": "http://www.my.url", - "type": "path/config/A", - }), - )], - ) - .await; + // New config_update command should be published + let (topic, received_json) = mqtt + .recv() + .await + .map(|msg| { + ( + msg.topic, + serde_json::from_str::(msg.payload.as_str().expect("UTF8")) + .expect("JSON"), + ) + }) + .unwrap(); + + let mqtt_schema = MqttSchema::default(); + let (entity, channel) = mqtt_schema.entity_channel_of(&topic).unwrap(); + assert_eq!(entity, "device/main//"); + + let Channel::Command { + operation: OperationType::ConfigUpdate, + .. + } = channel + else { + panic!("Unexpected response on channel: {:?}", topic) + }; + + // Validate the payload JSON + let expected_json = json!({ + "status": "init", + "remoteUrl": "http://www.my.url", + "type": "path/config/A", + }); + assert_json_diff::assert_json_include!(actual: received_json, expected: expected_json); } #[tokio::test] - async fn mapper_converts_config_download_op_for_child_device() { + async fn mapper_converts_smartrest_config_download_req_for_child_device() { let ttd = TempTedgeDir::new(); let (mqtt, _http, _fs, _timer, _ul, _dl) = spawn_c8y_mapper_actor(&ttd, true).await; let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); @@ -233,37 +247,49 @@ mod tests { mqtt.skip(2).await; // Skip child device registration messages - // Simulate c8y_DownloadConfigFile operation delivered via JSON over MQTT + // Cache is already available + ttd.dir("cache").file(&digest("http://www.my.url")); + + // Simulate c8y_DownloadConfigFile SmartREST request mqtt.send(MqttMessage::new( - &C8yDeviceControlTopic::topic(), - json!({ - "id": "123456", - "c8y_DownloadConfigFile": { - "type": "configA", - "url": "http://www.my.url" - }, - "externalSource": { - "externalId": "child1", - "type": "c8y_Serial" - } - }) - .to_string(), + &C8yTopic::downstream_topic(), + "524,child1,http://www.my.url,configA", )) .await .expect("Send failed"); - assert_received_includes_json( - &mut mqtt, - [( - "te/device/child1///cmd/config_update/c8y-mapper-123456", - json!({ - "status": "init", - "remoteUrl": "http://www.my.url", - "type": "configA", - }), - )], - ) - .await; + // New config_update command should be published + let (topic, received_json) = mqtt + .recv() + .await + .map(|msg| { + ( + msg.topic, + serde_json::from_str::(msg.payload.as_str().expect("UTF8")) + .expect("JSON"), + ) + }) + .unwrap(); + + let mqtt_schema = MqttSchema::default(); + let (entity, channel) = mqtt_schema.entity_channel_of(&topic).unwrap(); + assert_eq!(entity, "device/child1//"); + + let Channel::Command { + operation: OperationType::ConfigUpdate, + .. + } = channel + else { + panic!("Unexpected response on channel: {:?}", topic) + }; + + // Validate the payload JSON + let expected_json = json!({ + "status": "init", + "remoteUrl": "http://www.my.url", + "type": "configA", + }); + assert_json_diff::assert_json_include!(actual: received_json, expected: expected_json); } #[tokio::test] diff --git a/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs b/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs index d9d411c473d..6bc9ad4c255 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs @@ -1,12 +1,12 @@ use crate::converter::CumulocityConverter; use crate::error::ConversionError; use crate::error::CumulocityMapperError; -use c8y_api::json_c8y_deserializer::C8yFirmware; +use c8y_api::smartrest::smartrest_deserializer::SmartRestFirmwareRequest; +use c8y_api::smartrest::smartrest_deserializer::SmartRestRequestGeneric; use c8y_api::smartrest::smartrest_serializer::fail_operation; use c8y_api::smartrest::smartrest_serializer::set_operation_executing; use c8y_api::smartrest::smartrest_serializer::succeed_operation_no_payload; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; -use tedge_api::entity_store::EntityExternalId; use tedge_api::entity_store::EntityType; use tedge_api::messages::FirmwareInfo; use tedge_api::messages::FirmwareUpdateCmdPayload; @@ -36,20 +36,22 @@ pub fn firmware_update_topic_filter(mqtt_schema: &MqttSchema) -> TopicFilter { } impl CumulocityConverter { - /// Convert c8y_Firmware JSON over MQTT operation to ThinEdge firmware_update command. + /// Convert c8y_Firmware SmartREST request to ThinEdge firmware_update command. + /// Command ID is generated here, but it should be replaced by c8y's operation ID in the future. pub fn convert_firmware_update_request( &self, - device_xid: String, - cmd_id: String, - firmware_request: C8yFirmware, + smartrest: &str, ) -> Result, CumulocityMapperError> { - let entity_xid: EntityExternalId = device_xid.into(); + let firmware_request = SmartRestFirmwareRequest::from_smartrest(smartrest)?; - let target = self.entity_store.try_get_by_external_id(&entity_xid)?; + let target = self + .entity_store + .try_get_by_external_id(&firmware_request.device.clone().into())?; + let cmd_id = self.command_id.new_id(); let channel = Channel::Command { operation: OperationType::FirmwareUpdate, - cmd_id, + cmd_id: cmd_id.clone(), }; let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel); @@ -176,7 +178,7 @@ impl CumulocityConverter { #[cfg(test)] mod tests { use crate::tests::*; - use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; + use c8y_api::smartrest::topic::C8yTopic; use serde_json::json; use std::time::Duration; use tedge_actors::test_helpers::MessageReceiverExt; @@ -253,29 +255,17 @@ mod tests { } #[tokio::test] - async fn mapper_converts_firmware_op_to_firmware_update_cmd_for_main_device() { + async fn mapper_converts_smartrest_firmware_req_to_firmware_update_cmd_for_main_device() { let cfg_dir = TempTedgeDir::new(); let (mqtt, _http, _fs, _timer, _ul, _dl) = spawn_c8y_mapper_actor(&cfg_dir, true).await; let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); skip_init_messages(&mut mqtt).await; - // Simulate c8y_Firmware operation delivered via JSON over MQTT + // Simulate c8y_Firmware SmartREST request mqtt.send(MqttMessage::new( - &C8yDeviceControlTopic::topic(), - json!({ - "id": "123456", - "c8y_Firmware": { - "name": "myFirmware", - "version": "1.0", - "url": "http://www.my.url" - }, - "externalSource": { - "externalId": "test-device", - "type": "c8y_Serial" - } - }) - .to_string(), + &C8yTopic::downstream_topic(), + "515,test-device,myFirmware,1.0,http://www.my.url", )) .await .expect("Send failed"); @@ -296,7 +286,7 @@ mod tests { } #[tokio::test] - async fn mapper_converts_firmware_op_to_firmware_update_cmd_for_child_device() { + async fn mapper_converts_smartrest_firmware_req_to_firmware_update_cmd_for_child_device() { let cfg_dir = TempTedgeDir::new(); let (mqtt, _http, _fs, _timer, _ul, _dl) = spawn_c8y_mapper_actor(&cfg_dir, true).await; let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); @@ -313,22 +303,10 @@ mod tests { mqtt.skip(3).await; // Skip entity registration, mapping and installed firmware messages - // Simulate c8y_Firmware operation delivered via JSON over MQTT + // Simulate c8y_Firmware SmartREST request mqtt.send(MqttMessage::new( - &C8yDeviceControlTopic::topic(), - json!({ - "id": "123456", - "c8y_Firmware": { - "name": "myFirmware", - "version": "1.0", - "url": "http://www.my.url" - }, - "externalSource": { - "externalId": "test-device:device:child1", - "type": "c8y_Serial" - } - }) - .to_string(), + &C8yTopic::downstream_topic(), + "515,test-device:device:child1,myFirmware,1.0,http://www.my.url", )) .await .expect("Send failed"); @@ -336,7 +314,7 @@ mod tests { assert_received_includes_json( &mut mqtt, [( - "te/device/child1///cmd/firmware_update/c8y-mapper-123456", + "te/device/child1///cmd/firmware_update/+", json!({ "status": "init", "name": "myFirmware", diff --git a/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs b/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs index 34f961ac8d2..6ea777b7518 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs @@ -6,7 +6,8 @@ use crate::converter::UploadOperationData; use crate::error::ConversionError; use crate::error::CumulocityMapperError; use anyhow::Context; -use c8y_api::json_c8y_deserializer::C8yLogfileRequest; +use c8y_api::smartrest::smartrest_deserializer::SmartRestLogRequest; +use c8y_api::smartrest::smartrest_deserializer::SmartRestRequestGeneric; use c8y_api::smartrest::smartrest_serializer::fail_operation; use c8y_api::smartrest::smartrest_serializer::set_operation_executing; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; @@ -50,17 +51,18 @@ pub fn log_upload_topic_filter(mqtt_schema: &MqttSchema) -> TopicFilter { } impl CumulocityConverter { - /// Convert c8y_LogfileRequest operation to a ThinEdge log_upload command + /// Convert a SmartREST logfile request to a Thin Edge log_upload command pub fn convert_log_upload_request( &self, - device_xid: String, - cmd_id: String, - log_request: C8yLogfileRequest, + smartrest: &str, ) -> Result, CumulocityMapperError> { + let log_request = SmartRestLogRequest::from_smartrest(smartrest)?; + let device_external_id = log_request.device.into(); let target = self .entity_store - .try_get_by_external_id(&device_xid.into())?; + .try_get_by_external_id(&device_external_id)?; + let cmd_id = self.command_id.new_id(); let channel = Channel::Command { operation: OperationType::LogUpload, cmd_id: cmd_id.clone(), @@ -71,18 +73,18 @@ impl CumulocityConverter { "http://{}/tedge/file-transfer/{}/log_upload/{}-{}", &self.config.tedge_http_host, target.external_id.as_ref(), - log_request.log_file, + log_request.log_type, cmd_id ); let request = LogUploadCmdPayload { status: CommandStatus::Init, tedge_url, - log_type: log_request.log_file, + log_type: log_request.log_type, date_from: log_request.date_from, date_to: log_request.date_to, - search_text: Some(log_request.search_text).filter(|s| !s.is_empty()), - lines: log_request.maximum_lines, + search_text: log_request.search_text, + lines: log_request.lines, }; // Command messages must be retained @@ -292,7 +294,7 @@ impl CumulocityConverter { mod tests { use super::*; use crate::tests::*; - use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; + use c8y_api::smartrest::topic::C8yTopic; use serde_json::json; use std::time::Duration; use tedge_actors::test_helpers::MessageReceiverExt; @@ -314,43 +316,56 @@ mod tests { skip_init_messages(&mut mqtt).await; - // Simulate c8y_LogfileRequest JSON over MQTT request + // Simulate c8y_LogfileRequest SmartREST request mqtt.send(MqttMessage::new( - &C8yDeviceControlTopic::topic(), - json!({ - "id": "123456", - "c8y_LogfileRequest": { - "searchText": "ERROR", - "logFile": "logfileA", - "dateTo": "2023-11-29T16:33:50+0100", - "dateFrom": "2023-11-28T16:33:50+0100", - "maximumLines": 1000 - }, - "externalSource": { - "externalId": "test-device", - "type": "c8y_Serial" - } - }) - .to_string(), + &C8yTopic::downstream_topic(), + "522,test-device,logfileA,2013-06-22T17:03:14.123+02:00,2013-06-23T18:03:14.123+02:00,ERROR,1000", )) - .await - .expect("Send failed"); + .await + .expect("Send failed"); - assert_received_includes_json( - &mut mqtt, - [( - "te/device/main///cmd/log_upload/c8y-mapper-123456", - json!({ - "status": "init", - "tedgeUrl": "http://localhost:8888/tedge/file-transfer/test-device/log_upload/logfileA-c8y-mapper-123456", - "type": "logfileA", - "dateFrom": "2023-11-28T16:33:50+01:00", - "dateTo": "2023-11-29T16:33:50+01:00", - "searchText": "ERROR", - "lines": 1000 - }), - )], - ).await; + let (topic, received_json) = mqtt + .recv() + .await + .map(|msg| { + ( + msg.topic, + serde_json::from_str::(msg.payload.as_str().expect("UTF8")) + .expect("JSON"), + ) + }) + .unwrap(); + + let mqtt_schema = MqttSchema::default(); + let (entity, channel) = mqtt_schema.entity_channel_of(&topic).unwrap(); + assert_eq!(entity, "device/main//"); + + if let Channel::Command { + operation: OperationType::LogUpload, + cmd_id, + } = channel + { + // Validate the topic name + assert_eq!( + topic.name, + format!("te/device/main///cmd/log_upload/{cmd_id}") + ); + + // Validate the payload JSON + let expected_json = json!({ + "status": "init", + "tedgeUrl": format!("http://localhost:8888/tedge/file-transfer/test-device/log_upload/logfileA-{cmd_id}"), + "type": "logfileA", + "dateFrom": "2013-06-22T17:03:14.123+02:00", + "dateTo": "2013-06-23T18:03:14.123+02:00", + "searchText": "ERROR", + "lines": 1000 + }); + + assert_json_diff::assert_json_include!(actual: received_json, expected: expected_json); + } else { + panic!("Unexpected response on channel: {:?}", topic) + } } #[tokio::test] @@ -371,43 +386,56 @@ mod tests { mqtt.skip(3).await; //Skip entity registration, mapping and supported log types messages - // Simulate c8y_LogfileRequest JSON over MQTT request + // Simulate c8y_LogfileRequest SmartREST request mqtt.send(MqttMessage::new( - &C8yDeviceControlTopic::topic(), - json!({ - "id": "123456", - "c8y_LogfileRequest": { - "searchText": "ERROR", - "logFile": "logfileA", - "dateTo": "2023-11-29T16:33:50+0100", - "dateFrom": "2023-11-28T16:33:50+0100", - "maximumLines": 1000 - }, - "externalSource": { - "externalId": "test-device:device:DeviceSerial", - "type": "c8y_Serial" - } - }) - .to_string(), + &C8yTopic::downstream_topic(), + "522,test-device:device:DeviceSerial,logfileA,2013-06-22T17:03:14.123+02:00,2013-06-23T18:03:14.123+02:00,ERROR,1000", )) - .await - .expect("Send failed"); + .await + .expect("Send failed"); - assert_received_includes_json( - &mut mqtt, - [( - "te/device/DeviceSerial///cmd/log_upload/c8y-mapper-123456", - json!({ - "status": "init", - "tedgeUrl": "http://localhost:8888/tedge/file-transfer/test-device:device:DeviceSerial/log_upload/logfileA-c8y-mapper-123456", - "type": "logfileA", - "dateFrom": "2023-11-28T16:33:50+01:00", - "dateTo": "2023-11-29T16:33:50+01:00", - "searchText": "ERROR", - "lines": 1000 - }), - )], - ).await; + let (topic, received_json) = mqtt + .recv() + .await + .map(|msg| { + ( + msg.topic, + serde_json::from_str::(msg.payload.as_str().expect("UTF8")) + .expect("JSON"), + ) + }) + .unwrap(); + + let mqtt_schema = MqttSchema::default(); + let (entity, channel) = mqtt_schema.entity_channel_of(&topic).unwrap(); + assert_eq!(entity, "device/DeviceSerial//"); + + if let Channel::Command { + operation: OperationType::LogUpload, + cmd_id, + } = channel + { + // Validate the topic name + assert_eq!( + topic.name, + format!("te/device/DeviceSerial///cmd/log_upload/{cmd_id}") + ); + + // Validate the payload JSON + let expected_json = json!({ + "status": "init", + "tedgeUrl": format!("http://localhost:8888/tedge/file-transfer/test-device:device:DeviceSerial/log_upload/logfileA-{cmd_id}"), + "type": "logfileA", + "dateFrom": "2013-06-22T17:03:14.123+02:00", + "dateTo": "2013-06-23T18:03:14.123+02:00", + "searchText": "ERROR", + "lines": 1000 + }); + + assert_json_diff::assert_json_include!(actual: received_json, expected: expected_json); + } else { + panic!("Unexpected response on channel: {:?}", topic) + } } #[tokio::test] diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index 34a9fddf2a2..5bd6ac0b139 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -8,7 +8,6 @@ use crate::actor::IdUploadRequest; use crate::actor::IdUploadResult; use crate::Capabilities; use assert_json_diff::assert_json_include; -use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; use c8y_api::smartrest::topic::C8yTopic; use c8y_auth_proxy::url::Protocol; use c8y_http_proxy::messages::C8YRestRequest; @@ -282,7 +281,7 @@ async fn service_registration_mapping() { #[tokio::test] async fn mapper_publishes_software_update_request() { - // The test assures c8y mapper correctly receives software update request from JSON over MQTT + // The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds` // and converts it to thin-edge json message published on `te/device/main///cmd/software_update/+`. let cfg_dir = TempTedgeDir::new(); let (mqtt, http, _fs, _timer, _ul, _dl) = spawn_c8y_mapper_actor(&cfg_dir, true).await; @@ -292,25 +291,10 @@ async fn mapper_publishes_software_update_request() { skip_init_messages(&mut mqtt).await; - // Simulate c8y_SoftwareUpdate JSON over MQTT request + // Simulate c8y_SoftwareUpdate SmartREST request mqtt.send(MqttMessage::new( - &C8yDeviceControlTopic::topic(), - json!({ - "id": "123456", - "c8y_SoftwareUpdate": [ - { - "name": "nodered", - "action": "install", - "version": "1.0.0::debian", - "url": "" - } - ], - "externalSource": { - "externalId": "test-device", - "type": "c8y_Serial" - } - }) - .to_string(), + &C8yTopic::downstream_topic(), + "528,test-device,nodered,1.0.0::debian,,install", )) .await .expect("Send failed"); @@ -318,7 +302,7 @@ async fn mapper_publishes_software_update_request() { assert_received_includes_json( &mut mqtt, [( - "te/device/main///cmd/software_update/c8y-mapper-123456", + "te/device/main///cmd/software_update/+", json!({ "status": "init", "updateList": [ @@ -439,10 +423,10 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() { #[tokio::test] async fn mapper_publishes_software_update_request_with_wrong_action() { - // The test assures c8y-mapper correctly receives software update request via JSON over MQTT - // Then the c8y-mapper finds out that wrong action as part of the update request. - // Then c8y-mapper publishes an operation status message as executing `501,c8y_SoftwareUpdate' - // Then c8y-mapper publishes an operation status message as failed `502,c8y_SoftwareUpdate,Action remove is not recognized. It must be install or delete.` on `c8/s/us`. + // The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds` + // Then the SM Mapper finds out that wrong action as part of the update request. + // Then SM Mapper publishes an operation status message as executing `501,c8y_SoftwareUpdate' + // Then SM Mapper publishes an operation status message as failed `502,c8y_SoftwareUpdate,Action remove is not recognized. It must be install or delete.` on `c8/s/us`. // Then the subscriber that subscribed for messages on `c8/s/us` receives these messages and verifies them. let cfg_dir = TempTedgeDir::new(); @@ -451,27 +435,11 @@ async fn mapper_publishes_software_update_request_with_wrong_action() { let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); skip_init_messages(&mut mqtt).await; - // Publish a c8y_SoftwareUpdate via JSON over MQTT that contains a wrong action `remove`, that is not known by c8y. - mqtt.send(MqttMessage::new( - &C8yDeviceControlTopic::topic(), - json!({ - "id": "123456", - "c8y_SoftwareUpdate": [ - { - "name": "nodered", - "action": "remove", - "version": "1.0.0::debian" - } - ], - "externalSource": { - "externalId": "test-device", - "type": "c8y_Serial" - } - }) - .to_string(), - )) - .await - .expect("Send failed"); + // Prepare and publish a c8y_SoftwareUpdate smartrest request on `c8y/s/ds` that contains a wrong action `remove`, that is not known by c8y. + let smartrest = r#"528,test-device,nodered,1.0.0::debian,,remove"#; + mqtt.send(MqttMessage::new(&C8yTopic::downstream_topic(), smartrest)) + .await + .expect("Send failed"); // Expect a 501 (executing) followed by a 502 (failed) assert_received_contains_str( @@ -1303,8 +1271,8 @@ async fn c8y_mapper_alarm_complex_text_fragment_in_payload_failed() { } #[tokio::test] -async fn mapper_handles_multiple_modules_in_update_list_sm_requests() { - // The test assures if Mapper can handle multiple update modules received via JSON over MQTT +async fn mapper_handles_multiline_sm_requests() { + // The test assures if Mapper can handle multiline smartrest messages arrived on `c8y/s/ds` let cfg_dir = TempTedgeDir::new(); let (mqtt, http, _fs, _timer, _ul, _dl) = spawn_c8y_mapper_actor(&cfg_dir, true).await; @@ -1313,60 +1281,52 @@ async fn mapper_handles_multiple_modules_in_update_list_sm_requests() { let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); skip_init_messages(&mut mqtt).await; - // Publish multiple modules software update via JSON over MQTT. - mqtt.send(MqttMessage::new( - &C8yDeviceControlTopic::topic(), - json!({ - "id": "123456", - "c8y_SoftwareUpdate": [ - { - "name": "nodered", - "action": "install", - "version": "1.0.0::debian", - "url": "" - }, - { - "name": "rolldice", - "action": "install", - "version": "2.0.0::debian", - "url": "" - } - ], - "externalSource": { - "externalId": "test-device", - "type": "c8y_Serial" - } - }) - .to_string(), - )) - .await - .expect("Send failed"); + // Prepare and publish multiline software update smartrest requests on `c8y/s/ds`. + let smartrest = "528,test-device,nodered,1.0.0::debian,,install\n528,test-device,rolldice,2.0.0::debian,,install".to_string(); + mqtt.send(MqttMessage::new(&C8yTopic::downstream_topic(), smartrest)) + .await + .expect("Send failed"); assert_received_includes_json( &mut mqtt, - [( - "te/device/main///cmd/software_update/c8y-mapper-123456", - json!({ - "status": "init", - "updateList": [ - { - "type": "debian", - "modules": [ - { - "name": "nodered", - "version": "1.0.0", - "action": "install" - }, - { - "name": "rolldice", - "version": "2.0.0", - "action": "install" - } - ] - } - ] - }), - )], + [ + ( + "te/device/main///cmd/software_update/+", + json!({ + "status": "init", + "updateList": [ + { + "type": "debian", + "modules": [ + { + "name": "nodered", + "version": "1.0.0", + "action": "install" + } + ] + } + ] + }), + ), + ( + "te/device/main///cmd/software_update/+", + json!({ + "status": "init", + "updateList": [ + { + "type": "debian", + "modules": [ + { + "name": "rolldice", + "version": "2.0.0", + "action": "install" + } + ] + } + ] + }), + ), + ], ) .await; }