From 14eddc8e48b23df4439b8319a81bfb0edd8c3d0d Mon Sep 17 00:00:00 2001 From: Krzysztof Piotrowski Date: Wed, 24 Jul 2024 22:43:18 +0000 Subject: [PATCH] feat: convert device profile request to tedge json format Signed-off-by: Krzysztof Piotrowski --- crates/core/tedge_api/src/device_profile.rs | 32 + .../c8y_mapper_ext/src/converter.rs | 13 +- .../c8y_mapper_ext/src/operations/convert.rs | 44 ++ .../src/operations/handlers/device_profile.rs | 726 ++++++++++++++++++ .../src/operations/handlers/mod.rs | 1 + 5 files changed, 815 insertions(+), 1 deletion(-) create mode 100644 crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs diff --git a/crates/core/tedge_api/src/device_profile.rs b/crates/core/tedge_api/src/device_profile.rs index 2a0e6e6e47d..d55d3239e02 100644 --- a/crates/core/tedge_api/src/device_profile.rs +++ b/crates/core/tedge_api/src/device_profile.rs @@ -57,3 +57,35 @@ pub enum OperationPayload { #[serde(rename = "payload")] Config(ConfigInfo), } + +impl DeviceProfileCmdPayload { + pub fn add_firmware(&mut self, firmware: FirmwareInfo) { + let firmware_operation = DeviceProfileOperation { + operation: OperationType::FirmwareUpdate, + skip: false, + payload: OperationPayload::Firmware(firmware), + }; + + self.operations.push(firmware_operation); + } + + pub fn add_software(&mut self, software: SoftwareInfo) { + let software_operation = DeviceProfileOperation { + operation: OperationType::SoftwareUpdate, + skip: false, + payload: OperationPayload::Software(software), + }; + + self.operations.push(software_operation); + } + + pub fn add_config(&mut self, config: ConfigInfo) { + let config_operation = DeviceProfileOperation { + operation: OperationType::ConfigUpdate, + skip: false, + payload: OperationPayload::Config(config), + }; + + self.operations.push(config_operation); + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index fecf74d6a8f..b85704f432b 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -688,8 +688,19 @@ impl CumulocityConverter { } C8yDeviceControlOperation::DeviceProfile(request) => { if self.config.capabilities.device_profile { - vec![] + if let Some(profile_name) = extras.get("profileName") { + self.convert_device_profile_request( + device_xid, + cmd_id, + request, + serde_json::from_value(profile_name.clone())?, + )? + } else { + error!("Received a c8y_DeviceProfile without a profile name"); + vec![] + } } else { + warn!("Received a c8y_DeviceProfile operation, however, device_profile feature is disabled"); vec![] } } diff --git a/crates/extensions/c8y_mapper_ext/src/operations/convert.rs b/crates/extensions/c8y_mapper_ext/src/operations/convert.rs index 7d95e737172..db155e8ec36 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/convert.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/convert.rs @@ -2,6 +2,7 @@ use std::sync::Arc; +use c8y_api::json_c8y_deserializer::C8yDeviceProfile; use c8y_api::json_c8y_deserializer::C8yDownloadConfigFile; use c8y_api::json_c8y_deserializer::C8yFirmware; use c8y_api::json_c8y_deserializer::C8yLogfileRequest; @@ -13,6 +14,7 @@ use tedge_api::commands::ConfigUpdateCmdPayload; use tedge_api::commands::FirmwareUpdateCmdPayload; use tedge_api::commands::LogMetadata; use tedge_api::commands::LogUploadCmdPayload; +use tedge_api::device_profile::DeviceProfileCmdPayload; use tedge_api::entity_store::EntityExternalId; use tedge_api::entity_store::EntityMetadata; use tedge_api::mqtt_topics::Channel; @@ -309,4 +311,46 @@ impl CumulocityConverter { Ok(messages) } + + /// Convert c8y_DeviceProfile JSON over MQTT operation to ThinEdge device_profile command. + pub fn convert_device_profile_request( + &self, + device_xid: String, + cmd_id: String, + device_profile_request: C8yDeviceProfile, + profile_name: String, + ) -> Result, CumulocityMapperError> { + let entity_xid: EntityExternalId = device_xid.into(); + + let target = self.entity_store.try_get_by_external_id(&entity_xid)?; + + let channel = Channel::Command { + operation: OperationType::DeviceProfile, + cmd_id, + }; + let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel); + + let mut request = DeviceProfileCmdPayload { + status: CommandStatus::Init, + name: profile_name, + operations: Vec::new(), + }; + + if let Some(firmware) = device_profile_request.firmware { + request.add_firmware(firmware.into()); + } + + if let Some(software) = device_profile_request.software { + request.add_software(software.try_into()?); + } + + for config in device_profile_request.configuration { + request.add_config(config.into()); + } + + // Command messages must be retained + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) + } } diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs new file mode 100644 index 00000000000..13e9d9ea407 --- /dev/null +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs @@ -0,0 +1,726 @@ +#[cfg(test)] +mod tests { + use crate::tests::skip_init_messages; + use crate::tests::spawn_c8y_mapper_actor; + use crate::tests::TestHandle; + + use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; + use serde_json::json; + use std::time::Duration; + use tedge_actors::test_helpers::MessageReceiverExt; + use tedge_actors::Sender; + use tedge_mqtt_ext::test_helpers::assert_received_includes_json; + use tedge_mqtt_ext::MqttMessage; + use tedge_mqtt_ext::Topic; + use tedge_test_utils::fs::TempTedgeDir; + + const TEST_TIMEOUT_MS: Duration = Duration::from_millis(3000); + + #[tokio::test] + async fn mapper_converts_device_profile_operation_for_main_device() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "software": [ + { + "softwareType": "apt", + "name": "test-software-1", + "action": "install", + "version": "latest", + "url": " " + }, + { + "softwareType": "apt", + "name": "test-software-2", + "action": "install", + "version": "latest", + "url": " " + } + ], + "configuration": [ + { + "name": "test-software-1", + "type": "path/config/test-software-1", + "url": "http://www.my.url" + } + ], + "firmware": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + "externalSource": { + "externalId": "test-device", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/main///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }), + )], + ) + .await; + } + + #[tokio::test] + async fn mapper_converts_device_profile_operation_for_child_device() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // The child device must be registered first + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1//"), + r#"{ "@type":"child-device", "@id":"child1" }"#, + )) + .await + .expect("fail to register the child-device"); + + mqtt.skip(1).await; // Skip child device registration messages + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "software": [ + { + "softwareType": "apt", + "name": "test-software-1", + "action": "install", + "version": "latest", + "url": " " + }, + { + "softwareType": "apt", + "name": "test-software-2", + "action": "install", + "version": "latest", + "url": " " + } + ], + "configuration": [ + { + "name": "test-software-1", + "type": "path/config/test-software-1", + "url": "http://www.my.url" + } + ], + "firmware": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + "externalSource": { + "externalId": "child1", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/child1///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }), + )], + ) + .await; + } + + #[tokio::test] + async fn mapper_converts_device_profile_operation_with_type_in_version() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "software": [ + { + "name": "test-software-1", + "action": "install", + "version": "latest::apt", + "url": " " + }, + { + "name": "test-software-2", + "action": "install", + "version": "latest::apt", + "url": " " + } + ], + "configuration": [ + { + "name": "test-software-1", + "type": "path/config/test-software-1", + "url": "http://www.my.url" + } + ], + "firmware": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + "externalSource": { + "externalId": "test-device", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/main///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }), + )], + ) + .await; + } + + #[tokio::test] + async fn mapper_converts_device_profile_operation_with_missing_software_type() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "software": [ + { + "name": "test-software-1", + "action": "install", + "version": "latest", + "url": " " + }, + { + "name": "test-software-2", + "action": "install", + "version": "latest", + "url": " " + } + ], + "configuration": [ + { + "name": "test-software-1", + "type": "path/config/test-software-1", + "url": "http://www.my.url" + } + ], + "firmware": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + "externalSource": { + "externalId": "test-device", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/main///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "default", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }), + )], + ) + .await; + } + + #[tokio::test] + async fn mapper_converts_device_profile_operation_with_missing_firmware() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "software": [ + { + "softwareType": "apt", + "name": "test-software-1", + "action": "install", + "version": "latest", + "url": " " + }, + { + "softwareType": "apt", + "name": "test-software-2", + "action": "install", + "version": "latest", + "url": " " + } + ], + "configuration": [ + { + "name": "test-software-1", + "type": "path/config/test-software-1", + "url": "http://www.my.url" + } + ] + }, + "externalSource": { + "externalId": "test-device", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/main///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }), + )], + ) + .await; + } + + #[tokio::test] + async fn mapper_converts_device_profile_operation_with_missing_software() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "configuration": [ + { + "name": "test-software-1", + "type": "path/config/test-software-1", + "url": "http://www.my.url" + } + ], + "firmware": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + "externalSource": { + "externalId": "test-device", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/main///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }), + )], + ) + .await; + } + + #[tokio::test] + async fn mapper_converts_device_profile_operation_with_missing_configuration() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate c8y_DeviceProfile operation delivered via JSON over MQTT + mqtt.send(MqttMessage::new( + &C8yDeviceControlTopic::topic(&"c8y".try_into().unwrap()), + json!({ + "id": "123456", + "profileName": "test-profile", + "c8y_DeviceProfile": { + "software": [ + { + "softwareType": "apt", + "name": "test-software-1", + "action": "install", + "version": "latest", + "url": " " + }, + { + "softwareType": "apt", + "name": "test-software-2", + "action": "install", + "version": "latest", + "url": " " + } + ], + "firmware": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + "externalSource": { + "externalId": "test-device", + "type": "c8y_Serial" + } + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/main///cmd/device_profile/c8y-mapper-123456", + json!({ + "status": "init", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + } + ] + }), + )], + ) + .await; + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs index e489eda8340..66b7aaa294c 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs @@ -2,6 +2,7 @@ mod config_snapshot; mod config_update; +mod device_profile; mod firmware_update; mod log_upload; mod restart;