From 41a8f232f9968979393605e1edc52721e3dd777f Mon Sep 17 00:00:00 2001 From: Krzysztof Piotrowski Date: Wed, 24 Jul 2024 22:43:18 +0000 Subject: [PATCH] feat: convert device profile request to tedge json format Signed-off-by: Krzysztof Piotrowski --- crates/core/tedge_api/src/device_profile.rs | 32 ++ .../c8y_mapper_ext/src/converter.rs | 13 +- .../src/operations/handlers/device_profile.rs | 409 ++++++++++++++++++ .../src/operations/handlers/mod.rs | 1 + 4 files changed, 454 insertions(+), 1 deletion(-) create mode 100644 crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs diff --git a/crates/core/tedge_api/src/device_profile.rs b/crates/core/tedge_api/src/device_profile.rs index 2a0e6e6e47d..d55d3239e02 100644 --- a/crates/core/tedge_api/src/device_profile.rs +++ b/crates/core/tedge_api/src/device_profile.rs @@ -57,3 +57,35 @@ pub enum OperationPayload { #[serde(rename = "payload")] Config(ConfigInfo), } + +impl DeviceProfileCmdPayload { + pub fn add_firmware(&mut self, firmware: FirmwareInfo) { + let firmware_operation = DeviceProfileOperation { + operation: OperationType::FirmwareUpdate, + skip: false, + payload: OperationPayload::Firmware(firmware), + }; + + self.operations.push(firmware_operation); + } + + pub fn add_software(&mut self, software: SoftwareInfo) { + let software_operation = DeviceProfileOperation { + operation: OperationType::SoftwareUpdate, + skip: false, + payload: OperationPayload::Software(software), + }; + + self.operations.push(software_operation); + } + + pub fn add_config(&mut self, config: ConfigInfo) { + let config_operation = DeviceProfileOperation { + operation: OperationType::ConfigUpdate, + skip: false, + payload: OperationPayload::Config(config), + }; + + self.operations.push(config_operation); + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index fecf74d6a8f..b85704f432b 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -688,8 +688,19 @@ impl CumulocityConverter { } C8yDeviceControlOperation::DeviceProfile(request) => { if self.config.capabilities.device_profile { - vec![] + if let Some(profile_name) = extras.get("profileName") { + self.convert_device_profile_request( + device_xid, + cmd_id, + request, + serde_json::from_value(profile_name.clone())?, + )? + } else { + error!("Received a c8y_DeviceProfile without a profile name"); + vec![] + } } else { + warn!("Received a c8y_DeviceProfile operation, however, device_profile feature is disabled"); vec![] } } diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs new file mode 100644 index 00000000000..5b8b911d558 --- /dev/null +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs @@ -0,0 +1,409 @@ +use crate::converter::CumulocityConverter; +use crate::error::CumulocityMapperError; +use c8y_api::json_c8y_deserializer::C8yDeviceProfile; +use tedge_api::device_profile::DeviceProfileCmdPayload; +use tedge_api::entity_store::EntityExternalId; +use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::OperationType; +use tedge_api::CommandStatus; +use tedge_api::Jsonify; +use tedge_mqtt_ext::MqttMessage; + +impl CumulocityConverter { + /// Convert c8y_DeviceProfile JSON over MQTT operation to ThinEdge device_profile command. + pub fn convert_device_profile_request( + &self, + device_xid: String, + cmd_id: String, + device_profile_request: C8yDeviceProfile, + profile_name: String, + ) -> Result, CumulocityMapperError> { + let entity_xid: EntityExternalId = device_xid.into(); + + let target = self.entity_store.try_get_by_external_id(&entity_xid)?; + + let channel = Channel::Command { + operation: OperationType::DeviceProfile, + cmd_id, + }; + let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel); + + let mut request = DeviceProfileCmdPayload { + status: CommandStatus::Init, + name: profile_name, + operations: Vec::new(), + }; + + if let Some(firmware) = device_profile_request.firmware { + request.add_firmware(firmware.into()); + } + + if let Some(software) = device_profile_request.software { + request.add_software(software.try_into()?); + } + + for config in device_profile_request.configuration { + request.add_config(config.into()); + } + + // Command messages must be retained + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) + } +} + +#[cfg(test)] +mod tests { + use crate::tests::skip_init_messages; + use crate::tests::spawn_c8y_mapper_actor; + use crate::tests::TestHandle; + + use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; + use serde_json::json; + use std::time::Duration; + use tedge_actors::test_helpers::MessageReceiverExt; + use tedge_actors::Sender; + use tedge_mqtt_ext::test_helpers::assert_received_includes_json; + use tedge_mqtt_ext::MqttMessage; + use tedge_mqtt_ext::Topic; + use tedge_test_utils::fs::TempTedgeDir; + + const TEST_TIMEOUT_MS: Duration = Duration::from_millis(3000); + + #[tokio::test] + async fn mapper_converts_device_profile_operation_for_main_device_with_software_type() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "software": [ + { + "softwareType": "apt", + "name": "test-software-1", + "action": "install", + "version": "latest", + "url": " " + }, + { + "softwareType": "apt", + "name": "test-software-2", + "action": "install", + "version": "latest", + "url": " " + } + ], + "configuration": [ + { + "name": "test-software-1", + "type": "path/config/test-software-1", + "url": "http://www.my.url" + } + ], + "firmware": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + "externalSource": { + "externalId": "test-device", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/main///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }), + )], + ) + .await; + } + + #[tokio::test] + async fn mapper_converts_device_profile_operation_for_main_device_without_software_type() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "software": [ + { + "name": "test-software-1", + "action": "install", + "version": "latest::apt", + "url": " " + }, + { + "name": "test-software-2", + "action": "install", + "version": "latest::apt", + "url": " " + } + ], + "configuration": [ + { + "name": "test-software-1", + "type": "path/config/test-software-1", + "url": "http://www.my.url" + } + ], + "firmware": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + "externalSource": { + "externalId": "test-device", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/main///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }), + )], + ) + .await; + } + + #[tokio::test] + async fn mapper_converts_device_profile_operation_for_child_device() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // The child device must be registered first + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1//"), + r#"{ "@type":"child-device", "@id":"child1" }"#, + )) + .await + .expect("fail to register the child-device"); + + mqtt.skip(1).await; // Skip child device registration messages + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "software": [ + { + "name": "test-software-1", + "action": "install", + "version": "latest::apt", + "url": " " + }, + { + "name": "test-software-2", + "action": "install", + "version": "latest::apt", + "url": " " + } + ], + "configuration": [ + { + "name": "test-software-1", + "type": "path/config/test-software-1", + "url": "http://www.my.url" + } + ], + "firmware": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + "externalSource": { + "externalId": "child1", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/child1///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }), + )], + ) + .await; + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs index 02fc613581d..6e580d4cb5b 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs @@ -2,6 +2,7 @@ mod config_snapshot; mod config_update; +mod device_profile; mod firmware_update; mod log_upload; mod restart;