Skip to content

Commit

Permalink
test: verify that mapper still processes operations when downloads an…
Browse files Browse the repository at this point in the history
…d uploads are ongoing

Signed-off-by: Marcel Guzik <[email protected]>
  • Loading branch information
Bravo555 committed Jul 18, 2024
1 parent a79ea80 commit 105af44
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 1 deletion.
3 changes: 2 additions & 1 deletion crates/extensions/c8y_mapper_ext/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ impl RunningOperation {

let mut mqtt_publisher = context.mqtt_publisher.clone();

// at this point all local operations that are not regular c8y operations should be handled above
// unwrap is safe: at this point all local operations that are not regular c8y
// operations should be handled above
let c8y_operation = to_c8y_operation(&operation).unwrap();

match to_response(
Expand Down
101 changes: 101 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use tedge_api::mqtt_topics::MqttSchema;
use tedge_config::AutoLogUpload;
use tedge_config::SoftwareManagementApiFlag;
use tedge_config::TEdgeConfig;
use tedge_downloader_ext::DownloadResponse;
use tedge_file_system_ext::FsWatchEvent;
use tedge_mqtt_ext::test_helpers::assert_received_contains_str;
use tedge_mqtt_ext::test_helpers::assert_received_includes_json;
Expand Down Expand Up @@ -2350,6 +2351,106 @@ async fn mapper_processes_operations_concurrently() {
}
}

#[tokio::test]
async fn mapper_processes_other_operations_while_uploads_and_downloads_are_ongoing() {
let cfg_dir = TempTedgeDir::new();
let TestHandle {
mqtt,
http,
dl,
ul,
mut timer,
..
} = spawn_c8y_mapper_actor(&cfg_dir, true).await;
let mut dl = dl.with_timeout(TEST_TIMEOUT_MS);
let mut ul = ul.with_timeout(TEST_TIMEOUT_MS);

spawn_dummy_c8y_http_proxy(http);

// Complete sync phase so that alarm mapping starts
trigger_timeout(&mut timer).await;

let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);
skip_init_messages(&mut mqtt).await;

// simulate many successful operations that needs to be handled by the mapper
mqtt.send(MqttMessage::new(
&Topic::new_unchecked("te/device/main///cmd/log_upload/c8y-mapper-1"),
json!({
"status": "successful",
"tedgeUrl": "http://localhost:8888/tedge/file-transfer/test-device/log_upload/c8y-mapper-1",
"type": "mosquitto",
"dateFrom": "2023-11-28T16:33:50+01:00",
"dateTo": "2023-11-29T16:33:50+01:00",
"searchText": "ERROR",
"lines": 1000
})
.to_string(),
))
.await.unwrap();

let (download_id, download_request) = dl
.recv()
.await
.expect("DownloadRequest for log_upload should be sent");
assert_eq!(download_id, "c8y-mapper-1");
assert_eq!(
download_request.url,
"http://localhost:8888/tedge/file-transfer/test-device/log_upload/c8y-mapper-1"
);

// here it would be good to assert that upload message hasn't been sent yet, but due to the
// behaviour of message channels it can't be easily done

dl.send((
"c8y-mapper-1".to_string(),
Ok(DownloadResponse {
url: "http://localhost:8888/tedge/file-transfer/test-device/log_upload/c8y-mapper-1"
.to_string(),
file_path: "whatever".into(),
}),
))
.await
.unwrap();

let (upload_id, _) = ul
.recv()
.await
.expect("UploadRequest for log_upload should be sent");
assert_eq!(upload_id, "c8y-mapper-1");

// now that an upload is ongoing, check that downloads can also be triggered
mqtt.send(MqttMessage::new(
&Topic::new_unchecked("te/device/main///cmd/config_snapshot/c8y-mapper-2"),
json!({
"status": "successful",
"tedgeUrl": "http://localhost:8888/tedge/file-transfer/test-device/config_snapshot/c8y-mapper-2",
"type": "typeA",
})
.to_string(),
))
.await.unwrap();

let (config_snapshot_id, _) = dl
.recv()
.await
.expect("DownloadRequest for config snapshot should be sent");
assert_eq!(config_snapshot_id, "c8y-mapper-2");

// while download and upload are ongoing, try some other operation that doesn't do download or upload
mqtt.send(MqttMessage::new(
&Topic::new_unchecked("te/device/main///cmd/restart/c8y-mapper-3"),
json!({
"status": "successful",
})
.to_string(),
))
.await
.unwrap();

assert_received_contains_str(&mut mqtt, [("c8y/s/us", "503,c8y_Restart")]).await;
}

#[tokio::test]
async fn mapper_converts_config_metadata_to_supported_op_and_types_for_main_device() {
let ttd = TempTedgeDir::new();
Expand Down

0 comments on commit 105af44

Please sign in to comment.