Skip to content

Commit

Permalink
Merge pull request #2335 from PradeepKiruvale/service_measurement
Browse files Browse the repository at this point in the history
send measurement to main/child device services
  • Loading branch information
didier-wenzek authored Oct 23, 2023
2 parents 463903d + aee467a commit d3ed45f
Show file tree
Hide file tree
Showing 5 changed files with 1,020 additions and 7 deletions.
229 changes: 229 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,235 @@ pub(crate) mod tests {
);
}

#[tokio::test]
async fn convert_measurement_with_nested_child_device() {
let tmp_dir = TempTedgeDir::new();
let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await;
let reg_message = Message::new(
&Topic::new_unchecked("te/device/immediate_child//"),
json!({
"@type":"child-device",
"@parent":"device/main//",
"@id":"immediate_child"
})
.to_string(),
);
let _ = converter.convert(&reg_message).await;

let reg_message = Message::new(
&Topic::new_unchecked("te/device/nested_child//"),
json!({
"@type":"child-device",
"@parent":"device/immediate_child//",
"@id":"nested_child"
})
.to_string(),
);
let _ = converter.convert(&reg_message).await;

let in_topic = "te/device/nested_child///m/";
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload);

let expected_c8y_json_message = Message::new(
&Topic::new_unchecked("c8y/measurement/measurements/create"),
json!({
"externalSource":{"externalId":"nested_child","type":"c8y_Serial"},
"temp":{"temp":{"value":1.0}},
"time":"2021-11-16T17:45:40.571760714+01:00",
"type":"ThinEdgeMeasurement"
})
.to_string(),
);

// Test the first output messages contains SmartREST and C8Y JSON.
let out_first_messages = converter.convert(&in_message).await;
assert_eq!(out_first_messages, vec![expected_c8y_json_message.clone()]);
}

#[tokio::test]
async fn convert_measurement_with_nested_child_service() {
let tmp_dir = TempTedgeDir::new();
let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await;
let reg_message = Message::new(
&Topic::new_unchecked("te/device/immediate_child//"),
json!({
"@type":"child-device",
"@parent":"device/main//",
"@id":"immediate_child"
})
.to_string(),
);
let _ = converter.convert(&reg_message).await;

let reg_message = Message::new(
&Topic::new_unchecked("te/device/nested_child//"),
json!({
"@type":"child-device",
"@parent":"device/immediate_child//",
"@id":"nested_child"
})
.to_string(),
);
let _ = converter.convert(&reg_message).await;

let reg_message = Message::new(
&Topic::new_unchecked("te/device/nested_child/service/nested_service"),
json!({
"@type":"service",
"@parent":"device/nested_child//",
"@id":"nested_service"
})
.to_string(),
);
let _ = converter.convert(&reg_message).await;

let in_topic = "te/device/nested_child/service/nested_service/m/";
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload);

let expected_c8y_json_message = Message::new(
&Topic::new_unchecked("c8y/measurement/measurements/create"),
json!({
"externalSource":{"externalId":"nested_service","type":"c8y_Serial"},
"temp":{"temp":{"value":1.0}},
"time":"2021-11-16T17:45:40.571760714+01:00",
"type":"ThinEdgeMeasurement"
})
.to_string(),
);

// Test the first output messages contains SmartREST and C8Y JSON.
let out_first_messages = converter.convert(&in_message).await;
assert_eq!(out_first_messages, vec![expected_c8y_json_message.clone()]);
}

#[tokio::test]
async fn convert_measurement_for_child_device_service() {
let tmp_dir = TempTedgeDir::new();
let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await;

let in_topic = "te/device/child1/service/app1/m/m_type";
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload);

let expected_child_create_msg = Message::new(
&Topic::new_unchecked("te/device/child1//"),
json!({
"@id":"test-device:device:child1",
"@type":"child-device",
"name":"child1",
})
.to_string(),
)
.with_retain();

let expected_smart_rest_message_child = Message::new(
&Topic::new_unchecked("c8y/s/us"),
"101,test-device:device:child1,child1,thin-edge.io-child",
);
let expected_service_create_msg = Message::new(
&Topic::new_unchecked("te/device/child1/service/app1"),
json!({
"@id":"test-device:device:child1:service:app1",
"@parent":"device/child1//",
"@type":"service",
"name":"app1",
"type":"service"
})
.to_string(),
)
.with_retain();

let expected_smart_rest_message_service = Message::new(
&Topic::new_unchecked("c8y/s/us/test-device:device:child1"),
"102,test-device:device:child1:service:app1,service,app1,up",
);
let expected_c8y_json_message = Message::new(
&Topic::new_unchecked("c8y/measurement/measurements/create"),
json!({
"externalSource":{
"externalId":"test-device:device:child1:service:app1",
"type":"c8y_Serial"
},
"temp":{"temp":{"value":1.0}},
"time":"2021-11-16T17:45:40.571760714+01:00",
"type":"m_type"})
.to_string(),
);

// Test the first output messages contains SmartREST and C8Y JSON.
let out_first_messages = converter.convert(&in_message).await;
assert_eq!(
out_first_messages,
vec![
expected_child_create_msg,
expected_smart_rest_message_child,
expected_service_create_msg,
expected_smart_rest_message_service,
expected_c8y_json_message.clone()
]
);

// Test the second output messages doesn't contain SmartREST child device creation.
let out_second_messages = converter.convert(&in_message).await;
assert_eq!(out_second_messages, vec![expected_c8y_json_message]);
}

#[tokio::test]
async fn convert_measurement_for_main_device_service() {
let tmp_dir = TempTedgeDir::new();
let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await;

let in_topic = "te/device/main/service/appm/m/m_type";
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload);

let expected_create_service_msg = Message::new(
&Topic::new_unchecked("te/device/main/service/appm"),
json!({
"@id":"test-device:device:main:service:appm",
"@parent":"device/main//",
"@type":"service",
"name":"appm",
"type":"service"})
.to_string(),
)
.with_retain();

let expected_c8y_json_message = Message::new(
&Topic::new_unchecked("c8y/measurement/measurements/create"),
json!({
"externalSource":{
"externalId":"test-device:device:main:service:appm",
"type":"c8y_Serial"
},
"temp":{"temp":{"value":1.0}},
"time":"2021-11-16T17:45:40.571760714+01:00",
"type":"m_type"})
.to_string(),
);

let expected_smart_rest_message_service = Message::new(
&Topic::new_unchecked("c8y/s/us"),
"102,test-device:device:main:service:appm,service,appm,up",
);

// Test the first output messages contains SmartREST and C8Y JSON.
let out_first_messages = converter.convert(&in_message).await;
assert_eq!(
out_first_messages,
vec![
expected_create_service_msg,
expected_smart_rest_message_service,
expected_c8y_json_message.clone()
]
);

let out_second_messages = converter.convert(&in_message).await;
assert_eq!(out_second_messages, vec![expected_c8y_json_message]);
}

#[tokio::test]
#[ignore = "FIXME: the registration is currently done even if the message is ill-formed"]
async fn convert_first_measurement_invalid_then_valid_with_child_id() {
Expand Down
10 changes: 5 additions & 5 deletions crates/extensions/c8y_mapper_ext/src/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ impl C8yJsonSerializer {

json.write_open_obj();

if entity.r#type == EntityType::ChildDevice {
let child_id = &entity.external_id;
// In case the measurement is addressed to a child-device use fragment
// "externalSource" to tell c8Y identity API to use child-device
if entity.r#type == EntityType::ChildDevice || entity.r#type == EntityType::Service {
let entity_id = &entity.external_id;
// In case the measurement is addressed to a child-device or a service, use fragment
// "externalSource" to tell c8Y identity API to use child-device or for service
// object referenced by "externalId", instead of root device object
// referenced by MQTT client's Device ID.
let _ = json.write_key("externalSource");
json.write_open_obj();
let _ = json.write_key("externalId");
let _ = json.write_str(child_id.as_ref());
let _ = json.write_str(entity_id.as_ref());
let _ = json.write_key("type");
let _ = json.write_str("c8y_Serial");
json.write_close_obj();
Expand Down
Loading

1 comment on commit d3ed45f

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
341 0 3 341 100 50m56.389999999s

Please sign in to comment.