Skip to content

Commit

Permalink
OperationHandler add doc and tests about panics
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Guzik <[email protected]>
  • Loading branch information
Bravo555 committed Jul 23, 2024
1 parent d9f1663 commit 900b6d9
Showing 1 changed file with 209 additions and 17 deletions.
226 changes: 209 additions & 17 deletions crates/extensions/c8y_mapper_ext/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,22 @@ impl OperationHandler {
/// message will be published to the broker by the running operation task, but this message also
/// needs to be handled when an MQTT broker echoes it back to us, so that `OperationHandler` can
/// free the data associated with the operation.
///
/// # Panics
///
/// Will panic if a task that runs the operation has panicked. The task can panic if e.g. MQTT
/// send returns an error or the task encountered any other unexpected error that makes it
/// impossible to finish handling the operation (i.e. send MQTT clearing message and report
/// operation status to c8y).
///
/// The panic in the operation task has to happen first, and then another message with the same
/// command id has to be handled for the call to `.handle()` to panic.
// but there's a problem: in practice, when a panic in a child task happens, .handle() will
// never get called for that operation again. Operation task itself sends the messages, so if
// they can't be sent over MQTT because of a panic, they won't be handled, won't be joined, so
// we will not see that an exception has occurred.
// FIXME(marcel): ensure panics are always propagated without the caller having to ask for them
pub async fn handle(&mut self, entity: EntityTarget, message: MqttMessage) {
let Ok((_, channel)) = self.context.mqtt_schema.entity_channel_of(&message.topic) else {
return;
Expand Down Expand Up @@ -180,10 +196,10 @@ impl OperationHandler {
};

if let Some(terminated_operation) = terminated_operation {
let join_result = terminated_operation.handle.await;
if let Err(err) = join_result {
error!(%topic, ?err, "operation task could not be joined");
}
terminated_operation
.handle
.await
.expect("operation task should not panic");
}
}
}
Expand Down Expand Up @@ -514,23 +530,34 @@ fn get_smartrest_response_for_upload_result(

#[cfg(test)]
mod tests {
use std::time::Duration;

use c8y_auth_proxy::url::Protocol;
use c8y_http_proxy::messages::C8YRestRequest;
use c8y_http_proxy::messages::C8YRestResult;
use tedge_actors::test_helpers::FakeServerBox;
use tedge_actors::test_helpers::FakeServerBoxBuilder;
use tedge_actors::test_helpers::MessageReceiverExt;
use tedge_actors::Builder;
use tedge_actors::MessageReceiver;
use tedge_actors::MessageSink;
use tedge_actors::SimpleMessageBox;
use tedge_actors::SimpleMessageBoxBuilder;
use tedge_api::commands::ConfigSnapshotCmd;
use tedge_api::commands::ConfigSnapshotCmdPayload;
use tedge_api::CommandStatus;
use tedge_downloader_ext::DownloadResponse;
use tedge_test_utils::fs::TempTedgeDir;
use tedge_uploader_ext::UploadResponse;

use crate::tests::test_mapper_config;

use super::*;

const TEST_TIMEOUT_MS: Duration = Duration::from_millis(3000);

#[tokio::test]
async fn ignores_messages_that_are_not_operations() {
async fn handle_ignores_messages_that_are_not_operations() {
// system under test
let mut sut = setup_operation_handler().operation_handler;
let mqtt_schema = sut.context.mqtt_schema.clone();
Expand Down Expand Up @@ -560,6 +587,169 @@ mod tests {
assert_eq!(sut.running_operations.len(), 0);
}

#[tokio::test]
async fn handle_joins_terminated_operations() {
let TestHandle {
operation_handler: mut sut,
downloader: dl,
uploader: ul,
mqtt,
c8y_proxy,
ttd: _ttd,
..
} = setup_operation_handler();

let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);
let mut dl = dl.with_timeout(TEST_TIMEOUT_MS);
let mut ul = ul.with_timeout(TEST_TIMEOUT_MS);
let mut c8y_proxy = c8y_proxy.with_timeout(TEST_TIMEOUT_MS);

let mqtt_schema = sut.context.mqtt_schema.clone();

let entity_topic_id = EntityTopicId::default_main_device();
let entity_target = EntityTarget {
topic_id: entity_topic_id.clone(),
external_id: EntityExternalId::from("anything"),
smartrest_publish_topic: Topic::new("anything").unwrap(),
};

// spawn an operation to see if it's successfully joined when it's completed.
// particular operation used is not important, because we want to test only the handler.
// it would be even better if we could define some inline operation so test could be shorter
// TODO(marcel): don't assume operation implementations when testing the handler
let config_snapshot_operation = ConfigSnapshotCmd {
target: entity_topic_id,
cmd_id: "config-snapshot-1".to_string(),
payload: ConfigSnapshotCmdPayload {
status: CommandStatus::Successful,
tedge_url: Some("asdf".to_string()),
config_type: "typeA".to_string(),
path: None,
log_path: None,
},
};

sut.handle(
entity_target.clone(),
config_snapshot_operation.command_message(&mqtt_schema),
)
.await;
assert_eq!(sut.running_operations.len(), 1);

dl.recv()
.await
.expect("downloader should receive DownloadRequest");

dl.send((
"config-snapshot-1".to_string(),
Ok(DownloadResponse {
url: "asdf".to_string(),
file_path: "asdf".into(),
}),
))
.await
.unwrap();

c8y_proxy
.recv()
.await
.expect("C8yProxy should receive CreateEvent");

c8y_proxy
.send(Ok(c8y_http_proxy::messages::C8YRestResponse::EventId(
"asdf".to_string(),
)))
.await
.unwrap();

ul.recv()
.await
.expect("uploader should receive UploadRequest");

ul.send((
"config-snapshot-1".to_string(),
Ok(UploadResponse {
url: "asdf".to_string(),
file_path: "asdf".into(),
}),
))
.await
.unwrap();

assert_eq!(sut.running_operations.len(), 1);

// skip 503 smartrest
mqtt.skip(1).await;

let clearing_message = mqtt.recv().await.expect("MQTT should receive message");
assert_eq!(
clearing_message,
config_snapshot_operation.clearing_message(&mqtt_schema)
);

assert_eq!(sut.running_operations.len(), 1);

// finally, check that after handling clearing message, operation was joined
sut.handle(entity_target, clearing_message).await;

assert_eq!(sut.running_operations.len(), 0);
}

#[tokio::test]
#[should_panic]
async fn handle_should_panic_when_background_task_panics() {
// we're immediately dropping test's temporary directory, so we'll get an error that a
// directory for the operation could not be created
let TestHandle {
operation_handler: mut sut,
..
} = setup_operation_handler();

let mqtt_schema = sut.context.mqtt_schema.clone();

let entity_topic_id = EntityTopicId::default_main_device();
let entity_target = EntityTarget {
topic_id: entity_topic_id.clone(),
external_id: EntityExternalId::from("anything"),
smartrest_publish_topic: Topic::new("anything").unwrap(),
};

// spawn an operation to see if it's successfully joined when it's completed.
// particular operation used is not important, because we want to test only the handler.
// it would be even better if we could define some inline operation so test could be shorter
// TODO(marcel): don't assume operation implementations when testing the handler
let config_snapshot_operation = ConfigSnapshotCmd {
target: entity_topic_id,
cmd_id: "config-snapshot-1".to_string(),
payload: ConfigSnapshotCmdPayload {
status: CommandStatus::Successful,
tedge_url: Some("asdf".to_string()),
config_type: "typeA".to_string(),
path: None,
log_path: None,
},
};

sut.handle(
entity_target.clone(),
config_snapshot_operation.command_message(&mqtt_schema),
)
.await;
assert_eq!(sut.running_operations.len(), 1);

// give OperationHandler time to handle message
// TODO(marcel): remove sleeps
tokio::time::sleep(Duration::from_millis(50)).await;

// normally clearing message would be sent by operation task, but in this case we send it
// manually
sut.handle(
entity_target.clone(),
config_snapshot_operation.clearing_message(&mqtt_schema),
)
.await;
}

fn setup_operation_handler() -> TestHandle {
let ttd = TempTedgeDir::new();
let c8y_mapper_config = test_mapper_config(&ttd);
Expand Down Expand Up @@ -593,25 +783,27 @@ mod tests {
auth_proxy,
);

let _mqtt = mqtt_builder.build();
let _downloader = downloader_builder.build();
let _uploader = uploader_builder.build();
let _c8y_proxy = c8y_proxy_builder.build();
let mqtt = mqtt_builder.build();
let downloader = downloader_builder.build();
let uploader = uploader_builder.build();
let c8y_proxy = c8y_proxy_builder.build();

TestHandle {
_mqtt,
_downloader,
_uploader,
_c8y_proxy,
mqtt,
downloader,
uploader,
c8y_proxy,
operation_handler,
ttd,
}
}

struct TestHandle {
operation_handler: OperationHandler,
_mqtt: SimpleMessageBox<MqttMessage, MqttMessage>,
_c8y_proxy: FakeServerBox<C8YRestRequest, C8YRestResult>,
_uploader: FakeServerBox<IdUploadRequest, IdUploadResult>,
_downloader: FakeServerBox<IdDownloadRequest, IdDownloadResult>,
mqtt: SimpleMessageBox<MqttMessage, MqttMessage>,
c8y_proxy: FakeServerBox<C8YRestRequest, C8YRestResult>,
uploader: FakeServerBox<IdUploadRequest, IdUploadResult>,
downloader: FakeServerBox<IdDownloadRequest, IdDownloadResult>,
ttd: TempTedgeDir,
}
}

0 comments on commit 900b6d9

Please sign in to comment.