Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Collect multiple smartrest messages from c8y devicecontrol topic #3301

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions crates/core/c8y_api/src/smartrest/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ pub fn get_failure_reason_for_smartrest(input: &[u8], max_size: usize) -> String
/// Split MQTT message payload to multiple SmartREST messages.
///
/// ```
/// use c8y_api::smartrest::message::collect_smartrest_messages;
/// use c8y_api::smartrest::message::collect_c8y_messages;
/// let data = "511,device,echo hello\n511,device,\"echo hello\necho world\"";
/// let messages = collect_smartrest_messages(data);
/// let messages = collect_c8y_messages(data);
/// assert_eq!(messages[0], "511,device,echo hello");
/// assert_eq!(messages[1], "511,device,\"echo hello\necho world\"");
/// ```
pub fn collect_smartrest_messages(data: &str) -> Vec<String> {
pub fn collect_c8y_messages(data: &str) -> Vec<String> {
let mut stack: Vec<char> = Vec::new();
let mut smartrest_messages: Vec<String> = Vec::new();
let mut is_inside = false; // Inside an outermost double quote block or not.
Expand Down Expand Up @@ -228,7 +228,7 @@ mod tests {
#[test]
fn split_single_smartrest_message() {
let data = r#"528,DeviceSerial,softwareA,1.0,url1,install,softwareB,2.0,url2,install"#;
let message = collect_smartrest_messages(data);
let message = collect_c8y_messages(data);
assert_eq!(
message[0],
r#"528,DeviceSerial,softwareA,1.0,url1,install,softwareB,2.0,url2,install"#
Expand All @@ -250,7 +250,7 @@ echo world"
524,DeviceSerial,"something",http://www.my.url,type
511,device,511,rina0005,echo \\\"#;

let messages = collect_smartrest_messages(data);
let messages = collect_c8y_messages(data);

assert_eq!(messages[0], r#"511,device,echo hello"#);
assert_eq!(
Expand Down
4 changes: 2 additions & 2 deletions crates/extensions/c8y_firmware_manager/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::worker::IdDownloadRequest;
use crate::worker::IdDownloadResult;
use crate::worker::OperationOutcome;
use async_trait::async_trait;
use c8y_api::smartrest::message::collect_smartrest_messages;
use c8y_api::smartrest::message::collect_c8y_messages;
use c8y_api::smartrest::message::get_smartrest_template_id;
use c8y_api::smartrest::message_ids::FIRMWARE;
use c8y_api::smartrest::smartrest_deserializer::SmartRestFirmwareRequest;
Expand Down Expand Up @@ -131,7 +131,7 @@ impl FirmwareManagerActor {
&mut self,
message: MqttMessage,
) -> Result<(), FirmwareManagementError> {
for smartrest_message in collect_smartrest_messages(message.payload_str()?) {
for smartrest_message in collect_c8y_messages(message.payload_str()?) {
let smartrest_template_id = get_smartrest_template_id(&smartrest_message);
let result = match smartrest_template_id.as_str().parse::<usize>() {
Ok(id) if id == FIRMWARE => {
Expand Down
48 changes: 30 additions & 18 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use c8y_api::json_c8y_deserializer::C8ySoftwareUpdate;
use c8y_api::smartrest::error::SmartRestDeserializerError;
use c8y_api::smartrest::inventory::child_device_creation_message;
use c8y_api::smartrest::inventory::service_creation_message;
use c8y_api::smartrest::message::collect_smartrest_messages;
use c8y_api::smartrest::message::collect_c8y_messages;
use c8y_api::smartrest::message::get_failure_reason_for_smartrest;
use c8y_api::smartrest::message::get_smartrest_device_id;
use c8y_api::smartrest::message::get_smartrest_template_id;
Expand Down Expand Up @@ -632,24 +632,36 @@ impl CumulocityConverter {
&mut self,
message: &MqttMessage,
) -> Result<Vec<MqttMessage>, ConversionError> {
let operation = C8yOperation::from_json(message.payload.as_str()?)?;
let device_xid = operation.external_source.external_id;
let cmd_id = self.command_id.new_id_with_str(&operation.op_id);
// JSON over MQTT messages on c8y/devicecontrol/notifications can contain multiple operations in a single MQTT
// message, so split them
let operation_payloads = collect_c8y_messages(message.payload_str()?);

let mut output = vec![];
for operation_payload in operation_payloads {
let operation = C8yOperation::from_json(operation_payload.as_str())?;
let device_xid = operation.external_source.external_id;
let cmd_id = self.command_id.new_id_with_str(&operation.op_id);

if self.active_commands.contains(&cmd_id) {
info!("{cmd_id} is already addressed");
return Ok(vec![]);
}

if self.active_commands.contains(&cmd_id) {
info!("{cmd_id} is already addressed");
return Ok(vec![]);
}
// wrap operation payload in a dummy MqttMessage wrapper because the code below assumes 1 MQTT message = 1 operation
// TODO: refactor to avoid this intermediate step and extra copies
let operation_message = MqttMessage::new(&message.topic, operation_payload);

let result = self
.process_json_over_mqtt(
device_xid,
operation.op_id.clone(),
&operation.extras,
message,
)
.await;
let output = self.handle_c8y_operation_result(&result, Some(operation.op_id));
let result = self
.process_json_over_mqtt(
device_xid,
operation.op_id.clone(),
&operation.extras,
&operation_message,
)
.await;
let result = self.handle_c8y_operation_result(&result, Some(operation.op_id));
output.extend(result);
}

Ok(output)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My opinion for the future development.

What I don't like from the current way (due to the converter design) is, this return value Ok(output) is a vector of MQTT messages that will be published later at the same time. If 5 operations are included in one message from c8y, 5 of 504 messages will be sent together. But it's definitely out of scope to fix now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I don't like from the current way (due to the converter design) is, this return value Ok(output) is a vector of MQTT messages that will be published later at the same time. If 5 operations are included in one message from c8y, 5 of 504 messages will be sent together.

What don't you like in the current design? That the value returned by this method is a vector of MQTT messages or that the five 504 messages are not packed into a single MQTT message? The later can be implemented using the former method signature.

Copy link
Member

@rina23q rina23q Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

504 changes an operation state to executing in c8y.

Let's say there are 3 operation (Op_A, Op_B, Op_C) are delivered at the same.
Ideally,

  1. Op_A start proceeding -> Publish 504 for Op_A
  2. Op_B start proceeding -> Publish 504 for Op_B
  3. Op_C start proceeding -> Publish 504 for Op_C

However, with the current design, when all of Op_A, Op_B, and Op_C start proceeding, mapper publishes three 504 messages for these 3 operations.

In short, what I don't like is the timing the 504 message is published by mapper. Not critical, but minor point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though this doesn't occur for operations being processed by a workflow right (as the workflow execution is controlled by the tedge-agent, and currently only processes one type of operation at a time of the same type).

}
Expand Down Expand Up @@ -872,7 +884,7 @@ impl CumulocityConverter {
message: &MqttMessage,
) -> Result<Vec<MqttMessage>, ConversionError> {
let mut output: Vec<MqttMessage> = Vec::new();
for smartrest_message in collect_smartrest_messages(message.payload_str()?) {
for smartrest_message in collect_c8y_messages(message.payload_str()?) {
let result = self.process_smartrest(smartrest_message.as_str()).await;
let mut msgs = self.handle_c8y_operation_result(&result, None);
output.append(&mut msgs)
Expand Down
78 changes: 78 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2017,6 +2017,84 @@ async fn json_custom_operation_status_update_with_operation_id() {
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,1234,\"do something\n\"")]).await;
}

#[tokio::test]
async fn json_custom_operation_status_multiple_operations_in_one_mqtt_message() {
let ttd = TempTedgeDir::new();
ttd.dir("operations")
.dir("c8y")
.file("c8y_Command")
.with_raw_content(
r#"[exec]
command = "echo ${.payload.c8y_Command.text}"
on_fragment = "c8y_Command"
"#,
);

let config = C8yMapperConfig {
smartrest_use_operation_id: true,
..test_mapper_config(&ttd)
};
let test_handle = spawn_c8y_mapper_actor_with_config(&ttd, config, true).await;
let TestHandle { mqtt, http, .. } = test_handle;
spawn_dummy_c8y_http_proxy(http);

let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);

skip_init_messages(&mut mqtt).await;

// Simulate c8y_Command SmartREST request
let operation_1 = json!({
"status":"PENDING",
"id": "111",
"c8y_Command": {
"text": "do something 1"
},
"externalSource":{
"externalId":"test-device",
"type":"c8y_Serial"
}
})
.to_string();
let operation_2 = json!({
"status":"PENDING",
"id": "222",
"c8y_Command": {
"text": "do something 2"
},
"externalSource":{
"externalId":"test-device",
"type":"c8y_Serial"
}
})
.to_string();
let operation_3 = json!({
"status":"PENDING",
"id": "333",
"c8y_Command": {
"text": "do something 3"
},
"externalSource":{
"externalId":"test-device",
"type":"c8y_Serial"
}
})
.to_string();

let input_message = MqttMessage::new(
&Topic::new_unchecked("c8y/devicecontrol/notifications"),
[operation_1, operation_2, operation_3].join("\n"),
);
mqtt.send(input_message).await.expect("Send failed");

assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,111")]).await;
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,222")]).await;
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,333")]).await;

assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,111,\"do something 1\n\"")]).await;
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,222,\"do something 2\n\"")]).await;
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,333,\"do something 3\n\"")]).await;
}

#[tokio::test]
async fn json_custom_operation_status_update_with_operation_name() {
let ttd = TempTedgeDir::new();
Expand Down
Loading