Skip to content

Commit

Permalink
fix(#3297): Collect multiple smartrest messages from c8y devicecontro…
Browse files Browse the repository at this point in the history
…l topic

c8y/devicecontrol/notifications topic, as many others, can contain in
its smartrest payload multiple records. For now ensure that multiple
records are parsed from this topic in particular, as C8y sends multiple
operations in a single message if e.g. device is offline, multiple
operations are started, and then the device connects, it then receives
all those operations in a single MQTT message.

But a longer term solution would be to refactor the code to ensure that
we're supporting collecting multiple smartrest messages from all the
possible MQTT topics.

Signed-off-by: Marcel Guzik <[email protected]>
  • Loading branch information
Bravo555 committed Dec 16, 2024
1 parent dbb71b3 commit 1ef4041
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 16 deletions.
44 changes: 28 additions & 16 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,24 +632,36 @@ impl CumulocityConverter {
&mut self,
message: &MqttMessage,
) -> Result<Vec<MqttMessage>, ConversionError> {
let operation = C8yOperation::from_json(message.payload.as_str()?)?;
let device_xid = operation.external_source.external_id;
let cmd_id = self.command_id.new_id_with_str(&operation.op_id);
// Smartrest messages on c8y/devicecontrol/notifications can contain multiple operations in a single MQTT
// message, so split them
let operation_payloads = collect_smartrest_messages(message.payload_str()?);

let mut output = vec![];
for operation_payload in operation_payloads {
let operation = C8yOperation::from_json(operation_payload.as_str())?;
let device_xid = operation.external_source.external_id;
let cmd_id = self.command_id.new_id_with_str(&operation.op_id);

if self.active_commands.contains(&cmd_id) {
info!("{cmd_id} is already addressed");
return Ok(vec![]);
}

if self.active_commands.contains(&cmd_id) {
info!("{cmd_id} is already addressed");
return Ok(vec![]);
}
// wrap operation payload in a dummy MqttMessage wrapper because the code below assumes 1 MQTT message = 1 operation
// TODO: refactor to avoid this immediate step and extra copies
let operation_message = MqttMessage::new(&message.topic, operation_payload);

let result = self
.process_json_over_mqtt(
device_xid,
operation.op_id.clone(),
&operation.extras,
message,
)
.await;
let output = self.handle_c8y_operation_result(&result, Some(operation.op_id));
let result = self
.process_json_over_mqtt(
device_xid,
operation.op_id.clone(),
&operation.extras,
&operation_message,
)
.await;
let result = self.handle_c8y_operation_result(&result, Some(operation.op_id));
output.extend(result);
}

Ok(output)
}
Expand Down
78 changes: 78 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2017,6 +2017,84 @@ async fn json_custom_operation_status_update_with_operation_id() {
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,1234,\"do something\n\"")]).await;
}

#[tokio::test]
async fn json_custom_operation_status_multiple_operations_in_one_mqtt_message() {
let ttd = TempTedgeDir::new();
ttd.dir("operations")
.dir("c8y")
.file("c8y_Command")
.with_raw_content(
r#"[exec]
command = "echo ${.payload.c8y_Command.text}"
on_fragment = "c8y_Command"
"#,
);

let config = C8yMapperConfig {
smartrest_use_operation_id: true,
..test_mapper_config(&ttd)
};
let test_handle = spawn_c8y_mapper_actor_with_config(&ttd, config, true).await;
let TestHandle { mqtt, http, .. } = test_handle;
spawn_dummy_c8y_http_proxy(http);

let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);

skip_init_messages(&mut mqtt).await;

// Simulate c8y_Command SmartREST request
let operation_1 = json!({
"status":"PENDING",
"id": "111",
"c8y_Command": {
"text": "do something 1"
},
"externalSource":{
"externalId":"test-device",
"type":"c8y_Serial"
}
})
.to_string();
let operation_2 = json!({
"status":"PENDING",
"id": "222",
"c8y_Command": {
"text": "do something 2"
},
"externalSource":{
"externalId":"test-device",
"type":"c8y_Serial"
}
})
.to_string();
let operation_3 = json!({
"status":"PENDING",
"id": "333",
"c8y_Command": {
"text": "do something 3"
},
"externalSource":{
"externalId":"test-device",
"type":"c8y_Serial"
}
})
.to_string();

let input_message = MqttMessage::new(
&Topic::new_unchecked("c8y/devicecontrol/notifications"),
[operation_1, operation_2, operation_3].join("\n"),
);
mqtt.send(input_message).await.expect("Send failed");

assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,111")]).await;
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,222")]).await;
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,333")]).await;

assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,111,\"do something 1\n\"")]).await;
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,222,\"do something 2\n\"")]).await;
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,333,\"do something 3\n\"")]).await;
}

#[tokio::test]
async fn json_custom_operation_status_update_with_operation_name() {
let ttd = TempTedgeDir::new();
Expand Down

0 comments on commit 1ef4041

Please sign in to comment.