Skip to content

Commit

Permalink
fixup! Apply review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Rina Fujino <[email protected]>
  • Loading branch information
rina23q committed Nov 7, 2023
1 parent b61b8b6 commit 9a06cc4
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 24 deletions.
21 changes: 13 additions & 8 deletions crates/extensions/c8y_mapper_ext/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use tedge_uploader_ext::UploadResult;
use tedge_utils::file::create_directory_with_defaults;
use tedge_utils::file::FileError;
use tracing::error;
use tracing::warn;

const SYNC_WINDOW: Duration = Duration::from_secs(3);

Expand Down Expand Up @@ -239,22 +240,26 @@ impl C8yMapperActor {
) -> Result<(), RuntimeError> {
match self.converter.pending_upload_operations.remove(&cmd_id) {
None => error!("Received an upload result for the unknown command ID: {cmd_id}"),
Some((smartrest_topic, clear_cmd_topic, binary_url, operation)) => {
let serialize_result = match operation {
Some(queued_data) => {
let serialize_result = match queued_data.operation {
CumulocitySupportedOperations::C8yLogFileRequest
| CumulocitySupportedOperations::C8yUploadConfigFile => self
.get_smartrest_response_for_upload_result(
upload_result,
binary_url,
operation,
queued_data.c8y_binary_url,
queued_data.operation,
),
_other_types => return Ok(()), // unsupported
other_type => {
warn!("Received unsupported operation {other_type:?} for uploading a file");
return Ok(());
}
};

match serialize_result {
Ok(sm_payload) => {
let c8y_notification = Message::new(&smartrest_topic, sm_payload);
let clear_local_cmd = Message::new(&clear_cmd_topic, "")
Ok(sr_payload) => {
let c8y_notification =
Message::new(&queued_data.smartrest_topic, sr_payload);
let clear_local_cmd = Message::new(&queued_data.clear_cmd_topic, "")
.with_retain()
.with_qos(QoS::AtLeastOnce);
for converted_message in [c8y_notification, clear_local_cmd] {
Expand Down
11 changes: 6 additions & 5 deletions crates/extensions/c8y_mapper_ext/src/config_operations.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::converter::CumulocityConverter;
use crate::converter::UploadOperationData;
use crate::error::ConversionError;
use crate::error::CumulocityMapperError;
use c8y_api::json_c8y::C8yCreateEvent;
Expand Down Expand Up @@ -180,12 +181,12 @@ impl CumulocityConverter {

self.pending_upload_operations.insert(
cmd_id.into(),
(
UploadOperationData {
smartrest_topic,
message.topic.clone(),
binary_upload_event_url.to_string(),
CumulocitySupportedOperations::C8yUploadConfigFile,
),
clear_cmd_topic: message.topic.clone(),
c8y_binary_url: binary_upload_event_url.to_string(),
operation: CumulocitySupportedOperations::C8yUploadConfigFile,
},
);

vec![] // No mqtt message can be published in this state
Expand Down
10 changes: 8 additions & 2 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ impl CumulocityConverter {
}
}

pub struct UploadOperationData {
pub smartrest_topic: Topic,
pub clear_cmd_topic: Topic,
pub c8y_binary_url: String,
pub operation: CumulocitySupportedOperations,
}

pub struct CumulocityConverter {
pub(crate) size_threshold: SizeThreshold,
pub config: C8yMapperConfig,
Expand All @@ -181,8 +188,7 @@ pub struct CumulocityConverter {
pub auth_proxy: ProxyUrlGenerator,
pub uploader_sender: LoggingSender<IdUploadRequest>,
pub downloader_sender: LoggingSender<IdDownloadRequest>,
pub pending_upload_operations:
HashMap<CmdId, (Topic, Topic, String, CumulocitySupportedOperations)>,
pub pending_upload_operations: HashMap<CmdId, UploadOperationData>,
pub pending_download_operations: HashMap<CmdId, SmartRestOperationVariant>,
pub command_id: IdGenerator,
}
Expand Down
13 changes: 7 additions & 6 deletions crates/extensions/c8y_mapper_ext/src/log_upload.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::converter::CumulocityConverter;
use crate::converter::UploadOperationData;
use crate::error::ConversionError;
use crate::error::CumulocityMapperError;
use c8y_api::json_c8y::C8yCreateEvent;
Expand Down Expand Up @@ -93,7 +94,7 @@ impl CumulocityConverter {
/// Address a received log_upload command. If its status is
/// - "executing", it converts the message to SmartREST "Executing".
/// - "successful", it creates an event in c8y, then creates an UploadRequest for the uploader actor.
/// - "failed", it converts the message to SmartREST "Failed".
/// - "failed", it converts the message to SmartREST "Failed" with that event URL.
pub async fn handle_log_upload_state_change(
&mut self,
topic_id: &EntityTopicId,
Expand Down Expand Up @@ -157,12 +158,12 @@ impl CumulocityConverter {

self.pending_upload_operations.insert(
cmd_id.into(),
(
UploadOperationData {
smartrest_topic,
message.topic.clone(),
binary_upload_event_url.to_string(),
CumulocitySupportedOperations::C8yLogFileRequest,
),
clear_cmd_topic: message.topic.clone(),
c8y_binary_url: binary_upload_event_url.to_string(),
operation: CumulocitySupportedOperations::C8yLogFileRequest,
},
);

vec![] // No mqtt message can be published in this state
Expand Down
6 changes: 3 additions & 3 deletions crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2436,7 +2436,7 @@ async fn handle_log_upload_executing_and_failed_cmd_for_main_device() {
&Topic::new_unchecked("te/device/main///cmd/log_upload/c8y-mapper-1234"),
json!({
"status": "executing",
"tedgeUrl": format!("http://localhost:8888/tedge/file-transfer/main/log_upload/typeA-c8y-mapper-1234"),
"tedgeUrl": format!("http://localhost:8888/tedge/file-transfer/test-device/log_upload/typeA-c8y-mapper-1234"),
"type": "typeA",
"dateFrom": "2013-06-22T17:03:14.123+02:00",
"dateTo": "2013-06-23T18:03:14.123+02:00",
Expand All @@ -2456,7 +2456,7 @@ async fn handle_log_upload_executing_and_failed_cmd_for_main_device() {
&Topic::new_unchecked("te/device/main///cmd/log_upload/c8y-mapper-1234"),
json!({
"status": "failed",
"tedgeUrl": format!("http://localhost:8888/tedge/file-transfer/main/log_upload/typeA-c8y-mapper-1234"),
"tedgeUrl": format!("http://localhost:8888/tedge/file-transfer/test-device/log_upload/typeA-c8y-mapper-1234"),
"type": "typeA",
"dateFrom": "2013-06-22T17:03:14.123+02:00",
"dateTo": "2013-06-23T18:03:14.123+02:00",
Expand Down Expand Up @@ -2585,7 +2585,7 @@ async fn handle_log_upload_successful_cmd_for_main_device() {
&Topic::new_unchecked("te/device/main///cmd/log_upload/c8y-mapper-1234"),
json!({
"status": "successful",
"tedgeUrl": "http://localhost:8888/tedge/file-transfer/main/log_upload/typeA-c8y-mapper-1234",
"tedgeUrl": "http://localhost:8888/tedge/file-transfer/test-device/log_upload/typeA-c8y-mapper-1234",
"type": "typeA",
"dateFrom": "2013-06-22T17:03:14.123+02:00",
"dateTo": "2013-06-23T18:03:14.123+02:00",
Expand Down

0 comments on commit 9a06cc4

Please sign in to comment.