Skip to content

Commit

Permalink
Merge pull request #3302 from Bravo555/fix/3297/c8y-devicecontrol-spl…
Browse files Browse the repository at this point in the history
…it-on-newlines

fix: split c8y JSON over MQTT messages using newline delimiter
  • Loading branch information
reubenmiller authored Dec 17, 2024
2 parents db7b497 + 9eee7ce commit cdaf948
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 17 deletions.
10 changes: 5 additions & 5 deletions crates/core/c8y_api/src/smartrest/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ pub fn get_failure_reason_for_smartrest(input: &[u8], max_size: usize) -> String
/// Split MQTT message payload to multiple SmartREST messages.
///
/// ```
/// use c8y_api::smartrest::message::collect_c8y_messages;
/// use c8y_api::smartrest::message::collect_smartrest_messages;
/// let data = "511,device,echo hello\n511,device,\"echo hello\necho world\"";
/// let messages = collect_c8y_messages(data);
/// let messages = collect_smartrest_messages(data);
/// assert_eq!(messages[0], "511,device,echo hello");
/// assert_eq!(messages[1], "511,device,\"echo hello\necho world\"");
/// ```
pub fn collect_c8y_messages(data: &str) -> Vec<String> {
pub fn collect_smartrest_messages(data: &str) -> Vec<String> {
let mut stack: Vec<char> = Vec::new();
let mut smartrest_messages: Vec<String> = Vec::new();
let mut is_inside = false; // Inside an outermost double quote block or not.
Expand Down Expand Up @@ -228,7 +228,7 @@ mod tests {
#[test]
fn split_single_smartrest_message() {
let data = r#"528,DeviceSerial,softwareA,1.0,url1,install,softwareB,2.0,url2,install"#;
let message = collect_c8y_messages(data);
let message = collect_smartrest_messages(data);
assert_eq!(
message[0],
r#"528,DeviceSerial,softwareA,1.0,url1,install,softwareB,2.0,url2,install"#
Expand All @@ -250,7 +250,7 @@ echo world"
524,DeviceSerial,"something",http://www.my.url,type
511,device,511,rina0005,echo \\\"#;

let messages = collect_c8y_messages(data);
let messages = collect_smartrest_messages(data);

assert_eq!(messages[0], r#"511,device,echo hello"#);
assert_eq!(
Expand Down
4 changes: 2 additions & 2 deletions crates/extensions/c8y_firmware_manager/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::worker::IdDownloadRequest;
use crate::worker::IdDownloadResult;
use crate::worker::OperationOutcome;
use async_trait::async_trait;
use c8y_api::smartrest::message::collect_c8y_messages;
use c8y_api::smartrest::message::collect_smartrest_messages;
use c8y_api::smartrest::message::get_smartrest_template_id;
use c8y_api::smartrest::message_ids::FIRMWARE;
use c8y_api::smartrest::smartrest_deserializer::SmartRestFirmwareRequest;
Expand Down Expand Up @@ -131,7 +131,7 @@ impl FirmwareManagerActor {
&mut self,
message: MqttMessage,
) -> Result<(), FirmwareManagementError> {
for smartrest_message in collect_c8y_messages(message.payload_str()?) {
for smartrest_message in collect_smartrest_messages(message.payload_str()?) {
let smartrest_template_id = get_smartrest_template_id(&smartrest_message);
let result = match smartrest_template_id.as_str().parse::<usize>() {
Ok(id) if id == FIRMWARE => {
Expand Down
8 changes: 4 additions & 4 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use c8y_api::json_c8y_deserializer::C8ySoftwareUpdate;
use c8y_api::smartrest::error::SmartRestDeserializerError;
use c8y_api::smartrest::inventory::child_device_creation_message;
use c8y_api::smartrest::inventory::service_creation_message;
use c8y_api::smartrest::message::collect_c8y_messages;
use c8y_api::smartrest::message::collect_smartrest_messages;
use c8y_api::smartrest::message::get_failure_reason_for_smartrest;
use c8y_api::smartrest::message::get_smartrest_device_id;
use c8y_api::smartrest::message::get_smartrest_template_id;
Expand Down Expand Up @@ -634,11 +634,11 @@ impl CumulocityConverter {
) -> Result<Vec<MqttMessage>, ConversionError> {
// JSON over MQTT messages on c8y/devicecontrol/notifications can contain multiple operations in a single MQTT
// message, so split them
let operation_payloads = collect_c8y_messages(message.payload_str()?);
let operation_payloads = message.payload_str()?.lines();

let mut output = vec![];
for operation_payload in operation_payloads {
let operation = C8yOperation::from_json(operation_payload.as_str())?;
let operation = C8yOperation::from_json(operation_payload)?;
let device_xid = operation.external_source.external_id;
let cmd_id = self.command_id.new_id_with_str(&operation.op_id);

Expand Down Expand Up @@ -884,7 +884,7 @@ impl CumulocityConverter {
message: &MqttMessage,
) -> Result<Vec<MqttMessage>, ConversionError> {
let mut output: Vec<MqttMessage> = Vec::new();
for smartrest_message in collect_c8y_messages(message.payload_str()?) {
for smartrest_message in collect_smartrest_messages(message.payload_str()?) {
let result = self.process_smartrest(smartrest_message.as_str()).await;
let mut msgs = self.handle_c8y_operation_result(&result, None);
output.append(&mut msgs)
Expand Down
26 changes: 20 additions & 6 deletions crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2047,7 +2047,7 @@ async fn json_custom_operation_status_multiple_operations_in_one_mqtt_message()
"status":"PENDING",
"id": "111",
"c8y_Command": {
"text": "do something 1"
"text": "do something \"1\""
},
"externalSource":{
"externalId":"test-device",
Expand All @@ -2059,7 +2059,7 @@ async fn json_custom_operation_status_multiple_operations_in_one_mqtt_message()
"status":"PENDING",
"id": "222",
"c8y_Command": {
"text": "do something 2"
"text": "do something \"2\""
},
"externalSource":{
"externalId":"test-device",
Expand All @@ -2071,7 +2071,7 @@ async fn json_custom_operation_status_multiple_operations_in_one_mqtt_message()
"status":"PENDING",
"id": "333",
"c8y_Command": {
"text": "do something 3"
"text": "do something \"3\""
},
"externalSource":{
"externalId":"test-device",
Expand All @@ -2090,9 +2090,23 @@ async fn json_custom_operation_status_multiple_operations_in_one_mqtt_message()
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,222")]).await;
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,333")]).await;

assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,111,\"do something 1\n\"")]).await;
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,222,\"do something 2\n\"")]).await;
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,333,\"do something 3\n\"")]).await;
// escapes: we input JSON over MQTT, but emit Smartrest, thus initial: `do something "1"` becomes `"do something
// ""1""\n"` (outer "" for the Smartrest record field, and then inside double quotes escape a single quote)
assert_received_contains_str(
&mut mqtt,
[("c8y/s/us", "506,111,\"do something \"\"1\"\"\n\"")],
)
.await;
assert_received_contains_str(
&mut mqtt,
[("c8y/s/us", "506,222,\"do something \"\"2\"\"\n\"")],
)
.await;
assert_received_contains_str(
&mut mqtt,
[("c8y/s/us", "506,333,\"do something \"\"3\"\"\n\"")],
)
.await;
}

#[tokio::test]
Expand Down

0 comments on commit cdaf948

Please sign in to comment.