From d74a8c8f8eab05a7c93aea012c8d29cdf7aec6f5 Mon Sep 17 00:00:00 2001 From: Marcel Guzik Date: Fri, 19 Jul 2024 13:01:07 +0000 Subject: [PATCH] `OperationHandler` add doc and tests about panics Signed-off-by: Marcel Guzik --- .../c8y_mapper_ext/src/operations/mod.rs | 226 ++++++++++++++++-- 1 file changed, 209 insertions(+), 17 deletions(-) diff --git a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs index e27d1ce9c7e..9879e321119 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs @@ -142,6 +142,22 @@ impl OperationHandler { /// message will be published to the broker by the running operation task, but this message also /// needs to be handled when an MQTT broker echoes it back to us, so that `OperationHandler` can /// free the data associated with the operation. + /// + /// # Panics + /// + /// Will panic if a task that runs the operation has panicked. The task can panic if e.g. MQTT + /// send returns an error or the task encountered any other unexpected error that makes it + /// impossible to finish handling the operation (i.e. send MQTT clearing message and report + /// operation status to c8y). + /// + /// The panic in the operation task has to happen first, and then another message with the same + /// command id has to be handled for the call to `.handle()` to panic. + + // but there's a problem: in practice, when a panic in a child task happens, .handle() will + // never get called for that operation again. Operation task itself sends the messages, so if + // they can't be sent over MQTT because of a panic, they won't be handled, won't be joined, so + // we will not see that an exception has occurred. + // FIXME(marcel): ensure panics are always propagated without the caller having to ask for them pub async fn handle(&mut self, entity: EntityTarget, message: MqttMessage) { let Ok((_, channel)) = self.context.mqtt_schema.entity_channel_of(&message.topic) else { return; @@ -180,10 +196,10 @@ impl OperationHandler { }; if let Some(terminated_operation) = terminated_operation { - let join_result = terminated_operation.handle.await; - if let Err(err) = join_result { - error!(%topic, ?err, "operation task could not be joined"); - } + terminated_operation + .handle + .await + .expect("operation task should not panic"); } } } @@ -514,23 +530,34 @@ fn get_smartrest_response_for_upload_result( #[cfg(test)] mod tests { + use std::time::Duration; + use c8y_auth_proxy::url::Protocol; use c8y_http_proxy::messages::C8YRestRequest; use c8y_http_proxy::messages::C8YRestResult; use tedge_actors::test_helpers::FakeServerBox; use tedge_actors::test_helpers::FakeServerBoxBuilder; + use tedge_actors::test_helpers::MessageReceiverExt; use tedge_actors::Builder; + use tedge_actors::MessageReceiver; use tedge_actors::MessageSink; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; + use tedge_api::commands::ConfigSnapshotCmd; + use tedge_api::commands::ConfigSnapshotCmdPayload; + use tedge_api::CommandStatus; + use tedge_downloader_ext::DownloadResponse; use tedge_test_utils::fs::TempTedgeDir; + use tedge_uploader_ext::UploadResponse; use crate::tests::test_mapper_config; use super::*; + const TEST_TIMEOUT_MS: Duration = Duration::from_millis(3000); + #[tokio::test] - async fn ignores_messages_that_are_not_operations() { + async fn handle_ignores_messages_that_are_not_operations() { // system under test let mut sut = setup_operation_handler().operation_handler; let mqtt_schema = sut.context.mqtt_schema.clone(); @@ -560,6 +587,169 @@ mod tests { assert_eq!(sut.running_operations.len(), 0); } + #[tokio::test] + async fn handle_joins_terminated_operations() { + let TestHandle { + operation_handler: mut sut, + downloader: dl, + uploader: ul, + mqtt, + c8y_proxy, + ttd: _ttd, + .. + } = setup_operation_handler(); + + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + let mut dl = dl.with_timeout(TEST_TIMEOUT_MS); + let mut ul = ul.with_timeout(TEST_TIMEOUT_MS); + let mut c8y_proxy = c8y_proxy.with_timeout(TEST_TIMEOUT_MS); + + let mqtt_schema = sut.context.mqtt_schema.clone(); + + let entity_topic_id = EntityTopicId::default_main_device(); + let entity_target = EntityTarget { + topic_id: entity_topic_id.clone(), + external_id: EntityExternalId::from("anything"), + smartrest_publish_topic: Topic::new("anything").unwrap(), + }; + + // spawn an operation to see if it's successfully joined when it's completed. + // particular operation used is not important, because we want to test only the handler. + // it would be even better if we could define some inline operation so test could be shorter + // TODO(marcel): don't assume operation implementations when testing the handler + let config_snapshot_operation = ConfigSnapshotCmd { + target: entity_topic_id, + cmd_id: "config-snapshot-1".to_string(), + payload: ConfigSnapshotCmdPayload { + status: CommandStatus::Successful, + tedge_url: Some("asdf".to_string()), + config_type: "typeA".to_string(), + path: None, + log_path: None, + }, + }; + + sut.handle( + entity_target.clone(), + config_snapshot_operation.command_message(&mqtt_schema), + ) + .await; + assert_eq!(sut.running_operations.len(), 1); + + dl.recv() + .await + .expect("downloader should receive DownloadRequest"); + + dl.send(( + "config-snapshot-1".to_string(), + Ok(DownloadResponse { + url: "asdf".to_string(), + file_path: "asdf".into(), + }), + )) + .await + .unwrap(); + + c8y_proxy + .recv() + .await + .expect("C8yProxy should receive CreateEvent"); + + c8y_proxy + .send(Ok(c8y_http_proxy::messages::C8YRestResponse::EventId( + "asdf".to_string(), + ))) + .await + .unwrap(); + + ul.recv() + .await + .expect("uploader should receive UploadRequest"); + + ul.send(( + "config-snapshot-1".to_string(), + Ok(UploadResponse { + url: "asdf".to_string(), + file_path: "asdf".into(), + }), + )) + .await + .unwrap(); + + assert_eq!(sut.running_operations.len(), 1); + + // skip 503 smartrest + mqtt.skip(1).await; + + let clearing_message = mqtt.recv().await.expect("MQTT should receive message"); + assert_eq!( + clearing_message, + config_snapshot_operation.clearing_message(&mqtt_schema) + ); + + assert_eq!(sut.running_operations.len(), 1); + + // finally, check that after handling clearing message, operation was joined + sut.handle(entity_target, clearing_message).await; + + assert_eq!(sut.running_operations.len(), 0); + } + + #[tokio::test] + #[should_panic] + async fn handle_should_panic_when_background_task_panics() { + // we're immediately dropping test's temporary directory, so we'll get an error that a + // directory for the operation could not be created + let TestHandle { + operation_handler: mut sut, + .. + } = setup_operation_handler(); + + let mqtt_schema = sut.context.mqtt_schema.clone(); + + let entity_topic_id = EntityTopicId::default_main_device(); + let entity_target = EntityTarget { + topic_id: entity_topic_id.clone(), + external_id: EntityExternalId::from("anything"), + smartrest_publish_topic: Topic::new("anything").unwrap(), + }; + + // spawn an operation to see if it's successfully joined when it's completed. + // particular operation used is not important, because we want to test only the handler. + // it would be even better if we could define some inline operation so test could be shorter + // TODO(marcel): don't assume operation implementations when testing the handler + let config_snapshot_operation = ConfigSnapshotCmd { + target: entity_topic_id, + cmd_id: "config-snapshot-1".to_string(), + payload: ConfigSnapshotCmdPayload { + status: CommandStatus::Successful, + tedge_url: Some("asdf".to_string()), + config_type: "typeA".to_string(), + path: None, + log_path: None, + }, + }; + + sut.handle( + entity_target.clone(), + config_snapshot_operation.command_message(&mqtt_schema), + ) + .await; + assert_eq!(sut.running_operations.len(), 1); + + // give OperationHandler time to handle message + // TODO(marcel): remove sleeps + tokio::time::sleep(Duration::from_millis(50)).await; + + // normally clearing message would be sent by operation task. + // Using it here just as a dummy, to call `handle` with the same cmd-id, so that it panics + sut.handle( + entity_target.clone(), + config_snapshot_operation.clearing_message(&mqtt_schema), + ) + .await; + } + fn setup_operation_handler() -> TestHandle { let ttd = TempTedgeDir::new(); let c8y_mapper_config = test_mapper_config(&ttd); @@ -593,25 +783,27 @@ mod tests { auth_proxy, ); - let _mqtt = mqtt_builder.build(); - let _downloader = downloader_builder.build(); - let _uploader = uploader_builder.build(); - let _c8y_proxy = c8y_proxy_builder.build(); + let mqtt = mqtt_builder.build(); + let downloader = downloader_builder.build(); + let uploader = uploader_builder.build(); + let c8y_proxy = c8y_proxy_builder.build(); TestHandle { - _mqtt, - _downloader, - _uploader, - _c8y_proxy, + mqtt, + downloader, + uploader, + c8y_proxy, operation_handler, + ttd, } } struct TestHandle { operation_handler: OperationHandler, - _mqtt: SimpleMessageBox, - _c8y_proxy: FakeServerBox, - _uploader: FakeServerBox, - _downloader: FakeServerBox, + mqtt: SimpleMessageBox, + c8y_proxy: FakeServerBox, + uploader: FakeServerBox, + downloader: FakeServerBox, + ttd: TempTedgeDir, } }