From cf2babb883dcb7239aaad75f0043721920e9cb96 Mon Sep 17 00:00:00 2001 From: Krzysztof Piotrowski Date: Wed, 24 Jul 2024 17:05:22 +0000 Subject: [PATCH 1/8] feat: add config option to disable device profile Signed-off-by: Krzysztof Piotrowski --- .../tedge_config/src/tedge_config_cli/tedge_config.rs | 4 ++++ crates/extensions/c8y_mapper_ext/src/config.rs | 1 + crates/extensions/c8y_mapper_ext/src/converter.rs | 7 +++++++ crates/extensions/c8y_mapper_ext/src/lib.rs | 2 ++ crates/extensions/c8y_mapper_ext/src/tests.rs | 7 +------ 5 files changed, 15 insertions(+), 6 deletions(-) diff --git a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs index 64407269e7e..279c7fd09cc 100644 --- a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs @@ -503,6 +503,10 @@ define_tedge_config! { /// Enable firmware_update feature #[tedge_config(example = "true", default(value = false))] firmware_update: bool, + + /// Enable device_profile feature + #[tedge_config(example = "true", default(value = false))] + device_profile: bool, }, proxy: { diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index 393de82eb3a..f1c877fe645 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -176,6 +176,7 @@ impl C8yMapperConfig { config_snapshot: tedge_config.c8y.enable.config_snapshot, config_update: tedge_config.c8y.enable.config_update, firmware_update: tedge_config.c8y.enable.firmware_update, + device_profile: tedge_config.c8y.enable.device_profile, }; let c8y_prefix = tedge_config.c8y.bridge.topic_prefix.clone(); diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index d66302c57f7..fecf74d6a8f 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -686,6 +686,13 @@ impl CumulocityConverter { vec![] } } + C8yDeviceControlOperation::DeviceProfile(request) => { + if self.config.capabilities.device_profile { + vec![] + } else { + vec![] + } + } C8yDeviceControlOperation::Custom => { // Ignores custom and static template operations unsupported by thin-edge // However, these operations can be addressed by SmartREST that is published together with JSON over MQTT diff --git a/crates/extensions/c8y_mapper_ext/src/lib.rs b/crates/extensions/c8y_mapper_ext/src/lib.rs index 26d8c6bf723..91e3881b971 100644 --- a/crates/extensions/c8y_mapper_ext/src/lib.rs +++ b/crates/extensions/c8y_mapper_ext/src/lib.rs @@ -21,6 +21,7 @@ pub struct Capabilities { pub config_snapshot: bool, pub config_update: bool, pub firmware_update: bool, + pub device_profile: bool, } #[cfg(test)] @@ -31,6 +32,7 @@ impl Default for Capabilities { config_snapshot: true, config_update: true, firmware_update: true, + device_profile: true, } } } diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index f954d4409bb..9b8b19b6535 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -2808,12 +2808,7 @@ pub(crate) fn test_mapper_config(tmp_dir: &TempTedgeDir) -> C8yMapperConfig { C8yMapperConfig::default_internal_topic_filter(tmp_dir.path(), &"c8y".try_into().unwrap()) .unwrap(); - let capabilities = Capabilities { - log_upload: true, - config_snapshot: true, - config_update: true, - firmware_update: true, - }; + let capabilities = Capabilities::default(); let operation_topics = OperationHandler::topic_filter(&capabilities) .into_iter() From 5030a1755d4401f70a0e92aae1e29d67e226b7c8 Mon Sep 17 00:00:00 2001 From: Krzysztof Piotrowski Date: Wed, 24 Jul 2024 17:18:02 +0000 Subject: [PATCH 2/8] feat: add device profile to operation type Signed-off-by: Krzysztof Piotrowski --- crates/core/c8y_api/src/smartrest/smartrest_serializer.rs | 2 ++ crates/core/tedge_api/src/mqtt_topics.rs | 3 +++ .../extensions/c8y_mapper_ext/src/operations/handlers/mod.rs | 4 ++++ 3 files changed, 9 insertions(+) diff --git a/crates/core/c8y_api/src/smartrest/smartrest_serializer.rs b/crates/core/c8y_api/src/smartrest/smartrest_serializer.rs index c02483b79c8..65142a27e11 100644 --- a/crates/core/c8y_api/src/smartrest/smartrest_serializer.rs +++ b/crates/core/c8y_api/src/smartrest/smartrest_serializer.rs @@ -87,6 +87,7 @@ pub enum CumulocitySupportedOperations { C8yUploadConfigFile, C8yDownloadConfigFile, C8yFirmware, + C8yDeviceProfile, } impl From for &'static str { @@ -98,6 +99,7 @@ impl From for &'static str { CumulocitySupportedOperations::C8yUploadConfigFile => "c8y_UploadConfigFile", CumulocitySupportedOperations::C8yDownloadConfigFile => "c8y_DownloadConfigFile", CumulocitySupportedOperations::C8yFirmware => "c8y_Firmware", + CumulocitySupportedOperations::C8yDeviceProfile => "c8y_DeviceProfile", } } } diff --git a/crates/core/tedge_api/src/mqtt_topics.rs b/crates/core/tedge_api/src/mqtt_topics.rs index 5dd5747be8e..3a6772638d5 100644 --- a/crates/core/tedge_api/src/mqtt_topics.rs +++ b/crates/core/tedge_api/src/mqtt_topics.rs @@ -645,6 +645,7 @@ pub enum OperationType { ConfigUpdate, FirmwareUpdate, Health, + DeviceProfile, Custom(String), } @@ -686,6 +687,7 @@ impl<'a> From<&'a str> for OperationType { "config_snapshot" => OperationType::ConfigSnapshot, "config_update" => OperationType::ConfigUpdate, "firmware_update" => OperationType::FirmwareUpdate, + "device_profile" => OperationType::DeviceProfile, operation => OperationType::Custom(operation.to_string()), } } @@ -708,6 +710,7 @@ impl Display for OperationType { OperationType::ConfigUpdate => write!(f, "config_update"), OperationType::FirmwareUpdate => write!(f, "firmware_update"), OperationType::Health => write!(f, "health"), + OperationType::DeviceProfile => write!(f, "device_profile"), OperationType::Custom(operation) => write!(f, "{operation}"), } } diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs index f2eba4e5614..e489eda8340 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs @@ -134,6 +134,9 @@ impl OperationContext { self.handle_firmware_update_state_change(&entity, &cmd_id, &message) .await } + OperationType::DeviceProfile => { + Ok(OperationOutcome::Ignored) // to do handle state change for device profile + } }; let mut mqtt_publisher = self.mqtt_publisher.clone(); @@ -257,6 +260,7 @@ fn to_c8y_operation(operation_type: &OperationType) -> Option Some(CumulocitySupportedOperations::C8yDownloadConfigFile), OperationType::FirmwareUpdate => Some(CumulocitySupportedOperations::C8yFirmware), OperationType::SoftwareUpdate => Some(CumulocitySupportedOperations::C8ySoftwareUpdate), + OperationType::DeviceProfile => Some(CumulocitySupportedOperations::C8yDeviceProfile), // software list is not an c8y, only a fragment, but is a local operation that is spawned as // part of C8y_SoftwareUpdate operation OperationType::SoftwareList => None, From 90d023f1b16597cc898d5cf724a84c75262685f8 Mon Sep 17 00:00:00 2001 From: Krzysztof Piotrowski Date: Tue, 6 Aug 2024 13:05:42 +0000 Subject: [PATCH 3/8] feat: subscribe to the device profile topics Signed-off-by: Krzysztof Piotrowski --- crates/extensions/c8y_mapper_ext/src/operations/handler.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handler.rs b/crates/extensions/c8y_mapper_ext/src/operations/handler.rs index a8dd1fad55e..41ce2f0121a 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handler.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handler.rs @@ -196,6 +196,13 @@ impl OperationHandler { ]); } + if capabilities.device_profile { + topics.extend([ + (AnyEntity, Command(OperationType::DeviceProfile)), + (AnyEntity, CommandMetadata(OperationType::DeviceProfile)), + ]); + } + topics } } From 1be499d77a22279cc05e1b7547de7b2cd874015f Mon Sep 17 00:00:00 2001 From: Krzysztof Piotrowski Date: Wed, 24 Jul 2024 17:19:51 +0000 Subject: [PATCH 4/8] feat: create command payload for device profile Signed-off-by: Krzysztof Piotrowski --- crates/core/tedge_api/src/commands.rs | 16 +++++- crates/core/tedge_api/src/device_profile.rs | 59 +++++++++++++++++++++ crates/core/tedge_api/src/lib.rs | 1 + 3 files changed, 75 insertions(+), 1 deletion(-) create mode 100644 crates/core/tedge_api/src/device_profile.rs diff --git a/crates/core/tedge_api/src/commands.rs b/crates/core/tedge_api/src/commands.rs index 0d5f268b48a..64ee2aac1a3 100644 --- a/crates/core/tedge_api/src/commands.rs +++ b/crates/core/tedge_api/src/commands.rs @@ -545,6 +545,12 @@ impl SoftwareUpdateCommand { } } +#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SoftwareInfo { + pub update_list: Vec, +} + /// Sub list of modules grouped by plugin type. #[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] @@ -903,10 +909,18 @@ impl ConfigUpdateCmdPayload { } } +#[derive(Debug, Deserialize, Serialize, Eq, PartialEq, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ConfigInfo { + #[serde(rename = "type")] + pub config_type: String, + pub remote_url: Option, +} + /// Command to update the device firmware pub type FirmwareUpdateCmd = Command; -#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] +#[derive(Debug, Deserialize, Serialize, Eq, PartialEq, Clone)] #[serde(rename_all = "camelCase")] pub struct FirmwareInfo { pub name: Option, diff --git a/crates/core/tedge_api/src/device_profile.rs b/crates/core/tedge_api/src/device_profile.rs new file mode 100644 index 00000000000..2a0e6e6e47d --- /dev/null +++ b/crates/core/tedge_api/src/device_profile.rs @@ -0,0 +1,59 @@ +use crate::commands::Command; +use crate::commands::CommandPayload; +use crate::commands::ConfigInfo; +use crate::commands::FirmwareInfo; +use crate::commands::SoftwareInfo; +use crate::mqtt_topics::OperationType; +use crate::CommandStatus; +use crate::Jsonify; + +use serde::Deserialize; +use serde::Serialize; + +/// Command for device profile +pub type DeviceProfileCmd = Command; + +#[derive(Debug, Deserialize, Serialize, Eq, PartialEq, Clone)] +#[serde(rename_all = "camelCase")] +pub struct DeviceProfileCmdPayload { + #[serde(flatten)] + pub status: CommandStatus, + pub name: String, + pub operations: Vec, +} + +impl Jsonify for DeviceProfileCmdPayload {} + +impl CommandPayload for DeviceProfileCmdPayload { + fn operation_type() -> OperationType { + OperationType::DeviceProfile + } + + fn status(&self) -> CommandStatus { + self.status.clone() + } + + fn set_status(&mut self, status: CommandStatus) { + self.status = status + } +} + +#[derive(Debug, Deserialize, Serialize, Eq, PartialEq, Clone)] +#[serde(rename_all = "camelCase")] +pub struct DeviceProfileOperation { + operation: OperationType, + skip: bool, + #[serde(flatten)] + payload: OperationPayload, +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum OperationPayload { + #[serde(rename = "payload")] + Firmware(FirmwareInfo), + #[serde(rename = "payload")] + Software(SoftwareInfo), + #[serde(rename = "payload")] + Config(ConfigInfo), +} diff --git a/crates/core/tedge_api/src/lib.rs b/crates/core/tedge_api/src/lib.rs index 7f97bc97a3c..d35d8b75390 100644 --- a/crates/core/tedge_api/src/lib.rs +++ b/crates/core/tedge_api/src/lib.rs @@ -1,5 +1,6 @@ pub mod alarm; pub mod commands; +pub mod device_profile; pub mod entity_store; pub mod error; pub mod event; From c0c5a475548ae5c316a54e63dd03414ca874082f Mon Sep 17 00:00:00 2001 From: Krzysztof Piotrowski Date: Wed, 24 Jul 2024 22:41:36 +0000 Subject: [PATCH 5/8] feat: add deserializer for c8y_DeviceProfile json object Signed-off-by: Krzysztof Piotrowski --- .../core/c8y_api/src/json_c8y_deserializer.rs | 268 +++++++++++++++++- 1 file changed, 267 insertions(+), 1 deletion(-) diff --git a/crates/core/c8y_api/src/json_c8y_deserializer.rs b/crates/core/c8y_api/src/json_c8y_deserializer.rs index 9f370e55b0d..249707ee60b 100644 --- a/crates/core/c8y_api/src/json_c8y_deserializer.rs +++ b/crates/core/c8y_api/src/json_c8y_deserializer.rs @@ -3,6 +3,12 @@ use download::DownloadInfo; use mqtt_channel::Topic; use serde::Deserialize; use std::collections::HashMap; +use tedge_api::commands::ConfigInfo; +use tedge_api::commands::FirmwareInfo; +use tedge_api::commands::SoftwareInfo; +use tedge_api::commands::SoftwareModuleAction; +use tedge_api::commands::SoftwareModuleItem; +use tedge_api::commands::SoftwareRequestResponseSoftwareList; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::SoftwareModule; use tedge_api::SoftwareModuleUpdate; @@ -34,6 +40,7 @@ pub enum C8yDeviceControlOperation { UploadConfigFile(C8yUploadConfigFile), DownloadConfigFile(C8yDownloadConfigFile), Firmware(C8yFirmware), + DeviceProfile(C8yDeviceProfile), Custom, } @@ -61,6 +68,10 @@ impl C8yDeviceControlOperation { )?) } else if let Some(value) = hashmap.get("c8y_Firmware") { C8yDeviceControlOperation::Firmware(C8yFirmware::from_json_value(value.clone())?) + } else if let Some(value) = hashmap.get("c8y_DeviceProfile") { + C8yDeviceControlOperation::DeviceProfile(C8yDeviceProfile::from_json_value( + value.clone(), + )?) } else { C8yDeviceControlOperation::Custom }; @@ -197,6 +208,54 @@ pub struct C8ySoftwareUpdate { pub lists: Vec, } +impl TryFrom for SoftwareInfo { + type Error = C8yJsonOverMqttDeserializerError; + + fn try_from(value: C8ySoftwareUpdate) -> Result { + let mut software_info = SoftwareInfo { + update_list: Vec::new(), + }; + + for module in value.lists { + let plugin_type = module + .get_module_version_and_type() + .1 + .unwrap_or_else(SoftwareModule::default_type); + + let version = module.get_module_version_and_type().0; + let url = module.get_url(); + + let item = SoftwareModuleItem { + name: module.name, + version, + url, + action: match module.action.clone().try_into()? { + C8ySoftwareUpdateAction::Install => Some(SoftwareModuleAction::Install), + C8ySoftwareUpdateAction::Delete => Some(SoftwareModuleAction::Remove), + }, + reason: None, + }; + + if let Some(list) = software_info + .update_list + .iter_mut() + .find(|list| list.plugin_type == plugin_type) + { + list.modules.push(item); + } else { + software_info + .update_list + .push(SoftwareRequestResponseSoftwareList { + plugin_type, + modules: vec![item], + }); + } + } + + Ok(software_info) + } +} + #[derive(Debug, Deserialize, Eq, PartialEq)] #[serde(rename_all = "camelCase")] pub struct C8ySoftwareUpdateModule { @@ -389,6 +448,15 @@ pub struct C8yDownloadConfigFile { pub url: String, } +impl From for ConfigInfo { + fn from(value: C8yDownloadConfigFile) -> Self { + ConfigInfo { + config_type: value.config_type, + remote_url: Some(value.url), + } + } +} + /// Representation of c8y_Firmware JSON object /// /// ```rust @@ -405,7 +473,7 @@ pub struct C8yDownloadConfigFile { /// // Parse the data /// let req: C8yFirmware = serde_json::from_str(data).unwrap(); /// ``` -#[derive(Debug, Deserialize, Eq, PartialEq)] +#[derive(Debug, Deserialize, Eq, PartialEq, Clone)] #[serde(rename_all = "camelCase")] pub struct C8yFirmware { pub name: String, @@ -413,6 +481,66 @@ pub struct C8yFirmware { pub url: String, } +impl From for FirmwareInfo { + fn from(value: C8yFirmware) -> Self { + FirmwareInfo { + name: Some(value.name), + version: Some(value.version), + remote_url: Some(value.url), + } + } +} + +/// Representation of c8y_DeviceProfile JSON object +/// +/// ```rust +/// use c8y_api::json_c8y_deserializer::C8yDeviceProfile; +/// +/// // Example input from c8y +/// let data = r#" +/// { +/// "firmware": { +/// "name": "foo", +/// "version": "1.0.2", +/// "url": "https://dummy.url/firmware.zip" +/// }, +/// "software": [ +/// { +/// "softwareType": "dummy", +/// "name": "foo", +/// "action": "install", +/// "version": "2.0.0", +/// "url": "https://example.cumulocity.com/inventory/binaries/757538" +/// }, +/// { +/// "name": "bar", +/// "action": "delete", +/// "version": "1.0.1" +/// } +/// ], +/// "configuration": [ +/// { +/// "name": "tedge.toml", +/// "type": "/etc/tedge/tedge.toml", +/// "url": "https://example.cumulocity.com/inventory/binaries/757538" +/// } +/// ] +/// }"#; +/// +/// // Parse the data +/// let req: C8yDeviceProfile = serde_json::from_str(data).unwrap(); +/// ``` +#[derive(Debug, Deserialize, Eq, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct C8yDeviceProfile { + #[serde(skip_serializing_if = "Option::is_none")] + pub firmware: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub software: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub configuration: Vec, +} + pub trait C8yDeviceControlOperationHelper { fn from_json_value(value: serde_json::Value) -> Result where @@ -434,6 +562,8 @@ impl C8yDeviceControlOperationHelper for C8yDownloadConfigFile {} impl C8yDeviceControlOperationHelper for C8yFirmware {} +impl C8yDeviceControlOperationHelper for C8yDeviceProfile {} + #[derive(thiserror::Error, Debug)] pub enum C8yJsonOverMqttDeserializerError { #[error("Parameter {parameter} is not recognized. {hint}")] @@ -447,12 +577,15 @@ pub enum C8yJsonOverMqttDeserializerError { #[cfg(test)] mod tests { use crate::json_c8y_deserializer::C8yDeviceControlOperationHelper; + use crate::json_c8y_deserializer::C8yDeviceProfile; use crate::json_c8y_deserializer::C8yOperation; use crate::json_c8y_deserializer::C8ySoftwareUpdate; use crate::json_c8y_deserializer::C8ySoftwareUpdateModule; use assert_json_diff::assert_json_eq; use serde_json::json; + use tedge_api::device_profile::DeviceProfileCmdPayload; use tedge_api::mqtt_topics::EntityTopicId; + use tedge_api::CommandStatus; use tedge_api::Jsonify; use tedge_api::SoftwareModule; use tedge_api::SoftwareModuleUpdate; @@ -787,4 +920,137 @@ mod tests { assert_eq!(update_software.modules(), &expected_vec); } + + #[test] + fn from_json_over_mqtt_device_profile_to_device_profile_cmd() { + let json_over_mqtt_payload = json!({ + "delivery": { + "log": [], + "time": "2024-07-22T10:26:31.457Z", + "status": "PENDING" + }, + "agentId": "98523229", + "creationTime": "2024-07-22T10:26:31.441Z", + "deviceId": "98523229", + "id": "523244", + "status": "PENDING", + "profileName": "prod-profile-v2", + "description": "Assign device profile prod-profile-v2 to device TST_char_humane_exception", + "profileId": "50523216", + "c8y_DeviceProfile": { + "software": [ + { + "name": "c8y-command-plugin", + "action": "install", + "version": "latest", + "url": " " + }, + { + "name": "collectd", + "action": "install", + "version": "latest", + "url": " " + } + ], + "configuration": [ + { + "name": "collectd-v2", + "type": "collectd.conf", + "url": "http://www.example.url/inventory/binaries/88395" + } + ], + "firmware": { + "name": "core-image-tedge-rauc", + "version": "20240430.1139", + "url": "http://www.example.url/inventory/binaries/43226" + } + }, + "externalSource": { + "externalId": "TST_char_humane_exception", + "type": "c8y_Serial" + } + }); + + let op: C8yOperation = serde_json::from_str(&json_over_mqtt_payload.to_string()).unwrap(); + let req = C8yDeviceProfile::from_json_value( + op.extras + .get("c8y_DeviceProfile") + .expect("c8y_DeviceProfile field is missing") + .to_owned(), + ) + .expect("Failed to deserialize"); + + let name = serde_json::from_value( + op.extras + .get("profileName") + .expect("profileName field is missing") + .to_owned(), + ) + .expect("failed to convert profileName to string"); + + let mut thin_edge_json = DeviceProfileCmdPayload { + status: CommandStatus::Init, + name, + operations: Vec::new(), + }; + + thin_edge_json.add_firmware(req.firmware.unwrap().into()); + thin_edge_json.add_software( + req.software + .unwrap() + .try_into() + .expect("failed to extract software info"), + ); + for config in req.configuration { + thin_edge_json.add_config(config.into()); + } + let expected_thin_edge_json = json!({ + "status": "init", + "name": "prod-profile-v2", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "core-image-tedge-rauc", + "version": "20240430.1139", + "url": "http://www.example.url/inventory/binaries/43226" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "default", + "modules": [ + { + "name": "c8y-command-plugin", + "action": "install", + "version": "latest", + }, + { + "name": "collectd", + "action": "install", + "version": "latest", + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "collectd.conf", + "remoteUrl": "http://www.example.url/inventory/binaries/88395" + } + } + ] + }); + + assert_eq!(thin_edge_json.to_value(), expected_thin_edge_json); + } } From 14eddc8e48b23df4439b8319a81bfb0edd8c3d0d Mon Sep 17 00:00:00 2001 From: Krzysztof Piotrowski Date: Wed, 24 Jul 2024 22:43:18 +0000 Subject: [PATCH 6/8] feat: convert device profile request to tedge json format Signed-off-by: Krzysztof Piotrowski --- crates/core/tedge_api/src/device_profile.rs | 32 + .../c8y_mapper_ext/src/converter.rs | 13 +- .../c8y_mapper_ext/src/operations/convert.rs | 44 ++ .../src/operations/handlers/device_profile.rs | 726 ++++++++++++++++++ .../src/operations/handlers/mod.rs | 1 + 5 files changed, 815 insertions(+), 1 deletion(-) create mode 100644 crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs diff --git a/crates/core/tedge_api/src/device_profile.rs b/crates/core/tedge_api/src/device_profile.rs index 2a0e6e6e47d..d55d3239e02 100644 --- a/crates/core/tedge_api/src/device_profile.rs +++ b/crates/core/tedge_api/src/device_profile.rs @@ -57,3 +57,35 @@ pub enum OperationPayload { #[serde(rename = "payload")] Config(ConfigInfo), } + +impl DeviceProfileCmdPayload { + pub fn add_firmware(&mut self, firmware: FirmwareInfo) { + let firmware_operation = DeviceProfileOperation { + operation: OperationType::FirmwareUpdate, + skip: false, + payload: OperationPayload::Firmware(firmware), + }; + + self.operations.push(firmware_operation); + } + + pub fn add_software(&mut self, software: SoftwareInfo) { + let software_operation = DeviceProfileOperation { + operation: OperationType::SoftwareUpdate, + skip: false, + payload: OperationPayload::Software(software), + }; + + self.operations.push(software_operation); + } + + pub fn add_config(&mut self, config: ConfigInfo) { + let config_operation = DeviceProfileOperation { + operation: OperationType::ConfigUpdate, + skip: false, + payload: OperationPayload::Config(config), + }; + + self.operations.push(config_operation); + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index fecf74d6a8f..b85704f432b 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -688,8 +688,19 @@ impl CumulocityConverter { } C8yDeviceControlOperation::DeviceProfile(request) => { if self.config.capabilities.device_profile { - vec![] + if let Some(profile_name) = extras.get("profileName") { + self.convert_device_profile_request( + device_xid, + cmd_id, + request, + serde_json::from_value(profile_name.clone())?, + )? + } else { + error!("Received a c8y_DeviceProfile without a profile name"); + vec![] + } } else { + warn!("Received a c8y_DeviceProfile operation, however, device_profile feature is disabled"); vec![] } } diff --git a/crates/extensions/c8y_mapper_ext/src/operations/convert.rs b/crates/extensions/c8y_mapper_ext/src/operations/convert.rs index 7d95e737172..db155e8ec36 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/convert.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/convert.rs @@ -2,6 +2,7 @@ use std::sync::Arc; +use c8y_api::json_c8y_deserializer::C8yDeviceProfile; use c8y_api::json_c8y_deserializer::C8yDownloadConfigFile; use c8y_api::json_c8y_deserializer::C8yFirmware; use c8y_api::json_c8y_deserializer::C8yLogfileRequest; @@ -13,6 +14,7 @@ use tedge_api::commands::ConfigUpdateCmdPayload; use tedge_api::commands::FirmwareUpdateCmdPayload; use tedge_api::commands::LogMetadata; use tedge_api::commands::LogUploadCmdPayload; +use tedge_api::device_profile::DeviceProfileCmdPayload; use tedge_api::entity_store::EntityExternalId; use tedge_api::entity_store::EntityMetadata; use tedge_api::mqtt_topics::Channel; @@ -309,4 +311,46 @@ impl CumulocityConverter { Ok(messages) } + + /// Convert c8y_DeviceProfile JSON over MQTT operation to ThinEdge device_profile command. + pub fn convert_device_profile_request( + &self, + device_xid: String, + cmd_id: String, + device_profile_request: C8yDeviceProfile, + profile_name: String, + ) -> Result, CumulocityMapperError> { + let entity_xid: EntityExternalId = device_xid.into(); + + let target = self.entity_store.try_get_by_external_id(&entity_xid)?; + + let channel = Channel::Command { + operation: OperationType::DeviceProfile, + cmd_id, + }; + let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel); + + let mut request = DeviceProfileCmdPayload { + status: CommandStatus::Init, + name: profile_name, + operations: Vec::new(), + }; + + if let Some(firmware) = device_profile_request.firmware { + request.add_firmware(firmware.into()); + } + + if let Some(software) = device_profile_request.software { + request.add_software(software.try_into()?); + } + + for config in device_profile_request.configuration { + request.add_config(config.into()); + } + + // Command messages must be retained + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) + } } diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs new file mode 100644 index 00000000000..13e9d9ea407 --- /dev/null +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs @@ -0,0 +1,726 @@ +#[cfg(test)] +mod tests { + use crate::tests::skip_init_messages; + use crate::tests::spawn_c8y_mapper_actor; + use crate::tests::TestHandle; + + use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; + use serde_json::json; + use std::time::Duration; + use tedge_actors::test_helpers::MessageReceiverExt; + use tedge_actors::Sender; + use tedge_mqtt_ext::test_helpers::assert_received_includes_json; + use tedge_mqtt_ext::MqttMessage; + use tedge_mqtt_ext::Topic; + use tedge_test_utils::fs::TempTedgeDir; + + const TEST_TIMEOUT_MS: Duration = Duration::from_millis(3000); + + #[tokio::test] + async fn mapper_converts_device_profile_operation_for_main_device() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "software": [ + { + "softwareType": "apt", + "name": "test-software-1", + "action": "install", + "version": "latest", + "url": " " + }, + { + "softwareType": "apt", + "name": "test-software-2", + "action": "install", + "version": "latest", + "url": " " + } + ], + "configuration": [ + { + "name": "test-software-1", + "type": "path/config/test-software-1", + "url": "http://www.my.url" + } + ], + "firmware": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + "externalSource": { + "externalId": "test-device", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/main///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }), + )], + ) + .await; + } + + #[tokio::test] + async fn mapper_converts_device_profile_operation_for_child_device() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // The child device must be registered first + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1//"), + r#"{ "@type":"child-device", "@id":"child1" }"#, + )) + .await + .expect("fail to register the child-device"); + + mqtt.skip(1).await; // Skip child device registration messages + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "software": [ + { + "softwareType": "apt", + "name": "test-software-1", + "action": "install", + "version": "latest", + "url": " " + }, + { + "softwareType": "apt", + "name": "test-software-2", + "action": "install", + "version": "latest", + "url": " " + } + ], + "configuration": [ + { + "name": "test-software-1", + "type": "path/config/test-software-1", + "url": "http://www.my.url" + } + ], + "firmware": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + "externalSource": { + "externalId": "child1", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/child1///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }), + )], + ) + .await; + } + + #[tokio::test] + async fn mapper_converts_device_profile_operation_with_type_in_version() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "software": [ + { + "name": "test-software-1", + "action": "install", + "version": "latest::apt", + "url": " " + }, + { + "name": "test-software-2", + "action": "install", + "version": "latest::apt", + "url": " " + } + ], + "configuration": [ + { + "name": "test-software-1", + "type": "path/config/test-software-1", + "url": "http://www.my.url" + } + ], + "firmware": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + "externalSource": { + "externalId": "test-device", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/main///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }), + )], + ) + .await; + } + + #[tokio::test] + async fn mapper_converts_device_profile_operation_with_missing_software_type() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "software": [ + { + "name": "test-software-1", + "action": "install", + "version": "latest", + "url": " " + }, + { + "name": "test-software-2", + "action": "install", + "version": "latest", + "url": " " + } + ], + "configuration": [ + { + "name": "test-software-1", + "type": "path/config/test-software-1", + "url": "http://www.my.url" + } + ], + "firmware": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + "externalSource": { + "externalId": "test-device", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/main///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "default", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }), + )], + ) + .await; + } + + #[tokio::test] + async fn mapper_converts_device_profile_operation_with_missing_firmware() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "software": [ + { + "softwareType": "apt", + "name": "test-software-1", + "action": "install", + "version": "latest", + "url": " " + }, + { + "softwareType": "apt", + "name": "test-software-2", + "action": "install", + "version": "latest", + "url": " " + } + ], + "configuration": [ + { + "name": "test-software-1", + "type": "path/config/test-software-1", + "url": "http://www.my.url" + } + ] + }, + "externalSource": { + "externalId": "test-device", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/main///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }), + )], + ) + .await; + } + + #[tokio::test] + async fn mapper_converts_device_profile_operation_with_missing_software() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "configuration": [ + { + "name": "test-software-1", + "type": "path/config/test-software-1", + "url": "http://www.my.url" + } + ], + "firmware": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + "externalSource": { + "externalId": "test-device", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/main///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }), + )], + ) + .await; + } + + #[tokio::test] + async fn mapper_converts_device_profile_operation_with_missing_configuration() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "software": [ + { + "softwareType": "apt", + "name": "test-software-1", + "action": "install", + "version": "latest", + "url": " " + }, + { + "softwareType": "apt", + "name": "test-software-2", + "action": "install", + "version": "latest", + "url": " " + } + ], + "firmware": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + "externalSource": { + "externalId": "test-device", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/main///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + } + ] + }), + )], + ) + .await; + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs index e489eda8340..66b7aaa294c 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs @@ -2,6 +2,7 @@ mod config_snapshot; mod config_update; +mod device_profile; mod firmware_update; mod log_upload; mod restart; From 9651797aeca8acc944df6b3bc2dcd9a52aee2c81 Mon Sep 17 00:00:00 2001 From: Krzysztof Piotrowski Date: Wed, 31 Jul 2024 18:07:53 +0000 Subject: [PATCH 7/8] feat: handle state change for device profile Signed-off-by: Krzysztof Piotrowski --- .../core/c8y_api/src/smartrest/inventory.rs | 7 + .../src/operations/handlers/device_profile.rs | 501 ++++++++++++++++++ .../src/operations/handlers/mod.rs | 3 +- 3 files changed, 510 insertions(+), 1 deletion(-) diff --git a/crates/core/c8y_api/src/smartrest/inventory.rs b/crates/core/c8y_api/src/smartrest/inventory.rs index 0bd878f6fc0..773548ef135 100644 --- a/crates/core/c8y_api/src/smartrest/inventory.rs +++ b/crates/core/c8y_api/src/smartrest/inventory.rs @@ -141,6 +141,13 @@ impl From for MqttMessage { } } +/// Create a SmartREST payload for setting/updating the current state of the target profile +/// in its own managed object. When all individual operations are finished (i.e. firmware update, software update +/// and configuration update), the `profile_executed` field should be set to `true`, otherwise it should be `false`. +pub fn set_c8y_profile_target_payload(profile_executed: bool) -> String { + fields_to_csv_string(&["121", &profile_executed.to_string()]) +} + #[derive(thiserror::Error, Debug)] #[error("Field `{field_name}` contains invalid value: {value:?}")] pub struct InvalidValueError { diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs index 13e9d9ea407..85f5d07790d 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs @@ -1,3 +1,74 @@ +use anyhow::Context; +use c8y_api::smartrest; +use c8y_api::smartrest::inventory::set_c8y_profile_target_payload; +use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; +use tedge_api::device_profile::DeviceProfileCmd; +use tedge_api::CommandStatus; +use tedge_mqtt_ext::MqttMessage; +use tracing::warn; + +use super::EntityTarget; +use super::OperationContext; +use super::OperationError; +use super::OperationOutcome; + +impl OperationContext { + pub async fn handle_device_profile_state_change( + &self, + target: &EntityTarget, + cmd_id: &str, + message: &MqttMessage, + ) -> Result { + if !self.capabilities.device_profile { + warn!("Received a device_profile command, however, device_profile feature is disabled"); + return Ok(OperationOutcome::Ignored); + } + + let command = match DeviceProfileCmd::try_from_bytes( + target.topic_id.to_owned(), + cmd_id.into(), + message.payload_bytes(), + ) + .context("Could not parse command as a device profile command")? + { + Some(command) => command, + None => { + // The command has been fully processed + return Ok(OperationOutcome::Ignored); + } + }; + + let sm_topic = &target.smartrest_publish_topic; + + match command.status() { + CommandStatus::Executing => { + let c8y_target_profile = + MqttMessage::new(sm_topic, set_c8y_profile_target_payload(false)); // Set target profile + + Ok(OperationOutcome::Executing { + extra_messages: vec![c8y_target_profile], + }) + } + CommandStatus::Successful => { + let c8y_target_profile = + MqttMessage::new(sm_topic, set_c8y_profile_target_payload(true)); // Set the target profile as executed + + let smartrest_set_operation = + smartrest::smartrest_serializer::succeed_operation_no_payload( + CumulocitySupportedOperations::C8yDeviceProfile, + ); + let c8y_notification = MqttMessage::new(sm_topic, smartrest_set_operation); + + Ok(OperationOutcome::Finished { + messages: vec![c8y_target_profile, c8y_notification], + }) + } + CommandStatus::Failed { reason } => Err(anyhow::anyhow!(reason).into()), + _ => Ok(OperationOutcome::Ignored), + } + } +} + #[cfg(test)] mod tests { use crate::tests::skip_init_messages; @@ -9,6 +80,7 @@ mod tests { use std::time::Duration; use tedge_actors::test_helpers::MessageReceiverExt; use tedge_actors::Sender; + use tedge_mqtt_ext::test_helpers::assert_received_contains_str; use tedge_mqtt_ext::test_helpers::assert_received_includes_json; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; @@ -723,4 +795,433 @@ mod tests { ) .await; } + + #[tokio::test] + async fn handle_config_update_executing_and_failed_cmd_for_main_device() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate config_snapshot command with "executing" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/main///cmd/device_profile/c8y-mapper-123456"), + json!({ + "status": "executing", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `501` smartrest message on `c8y/s/us`. + assert_received_contains_str(&mut mqtt, [("c8y/s/us", "501,c8y_DeviceProfile")]).await; + + // Expect `121` smartrest message on `c8y/s/us`. + assert_received_contains_str(&mut mqtt, [("c8y/s/us", "121,false")]).await; + + // Simulate config_snapshot command with "failed" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/main///cmd/device_profile/c8y-mapper-123456"), + json!({ + "status": "failed", + "reason": "Something went wrong", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `502` smartrest message on `c8y/s/us`. + assert_received_contains_str( + &mut mqtt, + [("c8y/s/us", "502,c8y_DeviceProfile,Something went wrong")], + ) + .await; + } + + #[tokio::test] + async fn handle_config_update_executing_and_failed_cmd_for_child_device() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // The child device must be registered first + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1//"), + r#"{ "@type":"child-device", "@id":"child1" }"#, + )) + .await + .expect("fail to register the child-device"); + + mqtt.skip(1).await; // Skip child device registration messages + + // Simulate config_snapshot command with "executing" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1///cmd/device_profile/c8y-mapper-123456"), + json!({ + "status": "executing", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `501` smartrest message on `c8y/s/us`. + assert_received_contains_str(&mut mqtt, [("c8y/s/us/child1", "501,c8y_DeviceProfile")]) + .await; + + // Expect `121` smartrest message on `c8y/s/us`. + assert_received_contains_str(&mut mqtt, [("c8y/s/us/child1", "121,false")]).await; + + // Simulate config_snapshot command with "failed" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1///cmd/device_profile/c8y-mapper-123456"), + json!({ + "status": "failed", + "reason": "Something went wrong", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `502` smartrest message on `c8y/s/us`. + assert_received_contains_str( + &mut mqtt, + [( + "c8y/s/us/child1", + "502,c8y_DeviceProfile,Something went wrong", + )], + ) + .await; + } + + #[tokio::test] + async fn handle_device_profile_successful_cmd_for_main_device() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate config_update command with "successful" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/main///cmd/device_profile/c8y-mapper-123456"), + json!({ + "status": "successful", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `121` smartrest message on `c8y/s/us`. + assert_received_contains_str(&mut mqtt, [("c8y/s/us", "121,true")]).await; + + // Expect `503` smartrest message on `c8y/s/us`. + assert_received_contains_str(&mut mqtt, [("c8y/s/us", "503,c8y_DeviceProfile")]).await; + } + + #[tokio::test] + async fn handle_device_profile_successful_cmd_for_child_device() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // The child device must be registered first + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1//"), + r#"{ "@type":"child-device", "@id":"child1" }"#, + )) + .await + .expect("fail to register the child-device"); + + mqtt.skip(1).await; // Skip child device registration messages + + // Simulate config_update command with "successful" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1///cmd/device_profile/c8y-mapper-123456"), + json!({ + "status": "successful", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `121` smartrest message on `c8y/s/us`. + assert_received_contains_str(&mut mqtt, [("c8y/s/us/child1", "121,true")]).await; + + // Expect `503` smartrest message on `c8y/s/us`. + assert_received_contains_str(&mut mqtt, [("c8y/s/us/child1", "503,c8y_DeviceProfile")]) + .await; + } } diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs index 66b7aaa294c..c16795f2151 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs @@ -136,7 +136,8 @@ impl OperationContext { .await } OperationType::DeviceProfile => { - Ok(OperationOutcome::Ignored) // to do handle state change for device profile + self.handle_device_profile_state_change(&entity, &cmd_id, &message) + .await } }; From 5358ddc728e853595d3efd6f0fabad843804c0d8 Mon Sep 17 00:00:00 2001 From: Krzysztof Piotrowski Date: Tue, 6 Aug 2024 13:06:41 +0000 Subject: [PATCH 8/8] feat: register the device profile operation Signed-off-by: Krzysztof Piotrowski --- .../c8y_mapper_ext/src/converter.rs | 2 + .../c8y_mapper_ext/src/operations/convert.rs | 21 +++++ .../src/operations/handlers/device_profile.rs | 84 +++++++++++++++++++ 3 files changed, 107 insertions(+) diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index b85704f432b..f4c55c40567 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -1159,6 +1159,7 @@ impl CumulocityConverter { OperationType::FirmwareUpdate => { self.register_firmware_update_operation(&source) } + OperationType::DeviceProfile => self.register_device_profile_operation(&source), OperationType::Custom(c8y_op_name) => { self.register_custom_operation(&source, c8y_op_name) } @@ -2770,6 +2771,7 @@ pub(crate) mod tests { #[test_case("log_upload")] #[test_case("config_snapshot")] #[test_case("config_update")] + #[test_case("device_profile")] #[test_case("custom_op")] #[tokio::test] async fn operations_not_supported_for_services(op_type: &str) { diff --git a/crates/extensions/c8y_mapper_ext/src/operations/convert.rs b/crates/extensions/c8y_mapper_ext/src/operations/convert.rs index db155e8ec36..18bfbaed606 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/convert.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/convert.rs @@ -353,4 +353,25 @@ impl CumulocityConverter { MqttMessage::new(&topic, request.to_json()).with_retain() ]) } + + /// Converts a device_profile metadata message to supported operation "c8y_DeviceProfile" + pub fn register_device_profile_operation( + &mut self, + topic_id: &EntityTopicId, + ) -> Result, ConversionError> { + if !self.config.capabilities.device_profile { + warn!("Received device_profile metadata, however, device_profile feature is disabled"); + return Ok(vec![]); + } + + match self.register_operation(topic_id, "c8y_DeviceProfile") { + Err(err) => { + error!( + "Failed to register `device_profile` operation for {topic_id} due to: {err}" + ); + Ok(vec![]) + } + Ok(messages) => Ok(messages), + } + } } diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs index 85f5d07790d..f0535c53e08 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs @@ -79,6 +79,7 @@ mod tests { use serde_json::json; use std::time::Duration; use tedge_actors::test_helpers::MessageReceiverExt; + use tedge_actors::MessageReceiver; use tedge_actors::Sender; use tedge_mqtt_ext::test_helpers::assert_received_contains_str; use tedge_mqtt_ext::test_helpers::assert_received_includes_json; @@ -88,6 +89,89 @@ mod tests { const TEST_TIMEOUT_MS: Duration = Duration::from_millis(3000); + #[tokio::test] + async fn create_device_profile_operation_file_for_main_device() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate device_profile cmd metadata message + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/main///cmd/device_profile"), + "{}", + )) + .await + .expect("Send failed"); + + assert_received_contains_str(&mut mqtt, [("c8y/s/us", "114,c8y_DeviceProfile")]).await; + + // Validate if the supported operation file is created + assert!(ttd.path().join("operations/c8y/c8y_DeviceProfile").exists()); + } + + #[tokio::test] + async fn create_device_profile_operation_file_for_child_device() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate device_profile cmd metadata message + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1///cmd/device_profile"), + "{}", + )) + .await + .expect("Send failed"); + + // Expect auto-registration message + assert_received_includes_json( + &mut mqtt, + [( + "te/device/child1//", + json!({"@type":"child-device","@id":"test-device:device:child1"}), + )], + ) + .await; + + assert_received_contains_str( + &mut mqtt, + [ + ( + "c8y/s/us", + "101,test-device:device:child1,child1,thin-edge.io-child", + ), + ( + "c8y/s/us/test-device:device:child1", + "114,c8y_DeviceProfile", + ), + ], + ) + .await; + + // Validate if the supported operation file is created + assert!(ttd + .path() + .join("operations/c8y/test-device:device:child1/c8y_DeviceProfile") + .exists()); + + // Duplicate device_profile cmd metadata message + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1///cmd/device_profile"), + "{}", + )) + .await + .expect("Send failed"); + + // Assert that the supported ops message is not duplicated + assert_eq!(mqtt.recv().await, None); + } + #[tokio::test] async fn mapper_converts_device_profile_operation_for_main_device() { let ttd = TempTedgeDir::new();