From 717c94ba7b706a8124e8fcc01384f7d8530d5ac6 Mon Sep 17 00:00:00 2001 From: Krzysztof Piotrowski Date: Wed, 31 Jul 2024 18:07:53 +0000 Subject: [PATCH] feat: handle state change for device profile Signed-off-by: Krzysztof Piotrowski --- .../src/operations/handlers/device_profile.rs | 477 ++++++++++++++++++ .../src/operations/handlers/mod.rs | 3 +- 2 files changed, 479 insertions(+), 1 deletion(-) diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs index 13e9d9ea407..373da33ed6a 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/device_profile.rs @@ -1,3 +1,62 @@ +use anyhow::Context; +use c8y_api::smartrest; +use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; +use tedge_api::device_profile::DeviceProfileCmd; +use tedge_api::CommandStatus; +use tedge_mqtt_ext::MqttMessage; +use tracing::warn; + +use super::EntityTarget; +use super::OperationContext; +use super::OperationError; +use super::OperationOutcome; + +impl OperationContext { + pub async fn handle_device_profile_state_change( + &self, + target: &EntityTarget, + cmd_id: &str, + message: &MqttMessage, + ) -> Result { + if !self.capabilities.device_profile { + warn!("Received a device_profile command, however, device_profile feature is disabled"); + return Ok(OperationOutcome::Ignored); + } + + let command = match DeviceProfileCmd::try_from_bytes( + target.topic_id.to_owned(), + cmd_id.into(), + message.payload_bytes(), + ) + .context("Could not parse command as a device profile command")? + { + Some(command) => command, + None => { + // The command has been fully processed + return Ok(OperationOutcome::Ignored); + } + }; + + let sm_topic = &target.smartrest_publish_topic; + + match command.status() { + CommandStatus::Executing => Ok(OperationOutcome::Executing), + CommandStatus::Successful => { + let smartrest_set_operation = + smartrest::smartrest_serializer::succeed_operation_no_payload( + CumulocitySupportedOperations::C8yDeviceProfile, + ); + + Ok(OperationOutcome::Finished { + messages: vec![MqttMessage::new(sm_topic, smartrest_set_operation)], + }) + } + CommandStatus::Failed { reason } => Err(anyhow::anyhow!(reason).into()), + _ => Ok(OperationOutcome::Ignored), + } + } +} + #[cfg(test)] mod tests { use crate::tests::skip_init_messages; @@ -9,6 +68,7 @@ mod tests { use std::time::Duration; use tedge_actors::test_helpers::MessageReceiverExt; use tedge_actors::Sender; + use tedge_mqtt_ext::test_helpers::assert_received_contains_str; use tedge_mqtt_ext::test_helpers::assert_received_includes_json; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; @@ -723,4 +783,421 @@ mod tests { ) .await; } + + #[tokio::test] + async fn handle_config_update_executing_and_failed_cmd_for_main_device() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate config_snapshot command with "executing" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/main///cmd/device_profile/c8y-mapper-123456"), + json!({ + "status": "executing", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `501` smartrest message on `c8y/s/us`. + assert_received_contains_str(&mut mqtt, [("c8y/s/us", "501,c8y_DeviceProfile")]).await; + + // Simulate config_snapshot command with "failed" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/main///cmd/device_profile/c8y-mapper-123456"), + json!({ + "status": "failed", + "reason": "Something went wrong", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `502` smartrest message on `c8y/s/us`. + assert_received_contains_str( + &mut mqtt, + [("c8y/s/us", "502,c8y_DeviceProfile,Something went wrong")], + ) + .await; + } + + #[tokio::test] + async fn handle_config_update_executing_and_failed_cmd_for_child_device() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // The child device must be registered first + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1//"), + r#"{ "@type":"child-device", "@id":"child1" }"#, + )) + .await + .expect("fail to register the child-device"); + + mqtt.skip(1).await; // Skip child device registration messages + + // Simulate config_snapshot command with "executing" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1///cmd/device_profile/c8y-mapper-123456"), + json!({ + "status": "executing", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `501` smartrest message on `c8y/s/us`. + assert_received_contains_str(&mut mqtt, [("c8y/s/us/child1", "501,c8y_DeviceProfile")]) + .await; + + // Simulate config_snapshot command with "failed" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1///cmd/device_profile/c8y-mapper-123456"), + json!({ + "status": "failed", + "reason": "Something went wrong", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `502` smartrest message on `c8y/s/us`. + assert_received_contains_str( + &mut mqtt, + [( + "c8y/s/us/child1", + "502,c8y_DeviceProfile,Something went wrong", + )], + ) + .await; + } + + #[tokio::test] + async fn handle_device_profile_successful_cmd_for_main_device() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate config_update command with "successful" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/main///cmd/device_profile/c8y-mapper-123456"), + json!({ + "status": "successful", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `503` smartrest message on `c8y/s/us`. + assert_received_contains_str(&mut mqtt, [("c8y/s/us", "503,c8y_DeviceProfile")]).await; + } + + #[tokio::test] + async fn handle_device_profile_successful_cmd_for_child_device() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { mqtt, .. } = test_handle; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // The child device must be registered first + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1//"), + r#"{ "@type":"child-device", "@id":"child1" }"#, + )) + .await + .expect("fail to register the child-device"); + + mqtt.skip(1).await; // Skip child device registration messages + + // Simulate config_update command with "successful" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1///cmd/device_profile/c8y-mapper-123456"), + json!({ + "status": "successful", + "name": "test-profile", + "operations": [ + { + "operation": "firmware_update", + "skip": false, + "payload": { + "name": "test-firmware", + "version": "1.0", + "url": "http://www.my.url" + } + }, + { + "operation": "software_update", + "skip": false, + "payload": { + "updateList": [ + { + "type": "apt", + "modules": [ + { + "name": "test-software-1", + "version": "latest", + "action": "install" + }, + { + "name": "test-software-2", + "version": "latest", + "action": "install" + } + ] + } + ] + } + }, + { + "operation": "config_update", + "skip": false, + "payload": { + "type": "path/config/test-software-1", + "remoteUrl":"http://www.my.url" + } + } + ] + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `503` smartrest message on `c8y/s/us`. + assert_received_contains_str(&mut mqtt, [("c8y/s/us/child1", "503,c8y_DeviceProfile")]) + .await; + } } diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs index 6e580d4cb5b..3506e587755 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs @@ -136,7 +136,8 @@ impl OperationContext { .await } OperationType::DeviceProfile => { - Ok(OperationOutcome::Ignored) // to do handle state change for device profile + self.handle_device_profile_state_change(&entity, &cmd_id, &message) + .await } };