Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send measurement to main/child device services #2335

Merged
merged 1 commit into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 229 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,235 @@ pub(crate) mod tests {
);
}

#[tokio::test]
async fn convert_measurement_with_nested_child_device() {
let tmp_dir = TempTedgeDir::new();
let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await;
let reg_message = Message::new(
&Topic::new_unchecked("te/device/immediate_child//"),
json!({
"@type":"child-device",
"@parent":"device/main//",
"@id":"immediate_child"
})
.to_string(),
);
let _ = converter.convert(&reg_message).await;

let reg_message = Message::new(
&Topic::new_unchecked("te/device/nested_child//"),
json!({
"@type":"child-device",
"@parent":"device/immediate_child//",
"@id":"nested_child"
})
.to_string(),
);
let _ = converter.convert(&reg_message).await;

let in_topic = "te/device/nested_child///m/";
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload);

let expected_c8y_json_message = Message::new(
&Topic::new_unchecked("c8y/measurement/measurements/create"),
json!({
"externalSource":{"externalId":"nested_child","type":"c8y_Serial"},
"temp":{"temp":{"value":1.0}},
"time":"2021-11-16T17:45:40.571760714+01:00",
"type":"ThinEdgeMeasurement"
})
.to_string(),
);

// Test the first output messages contains SmartREST and C8Y JSON.
let out_first_messages = converter.convert(&in_message).await;
assert_eq!(out_first_messages, vec![expected_c8y_json_message.clone()]);
}

#[tokio::test]
async fn convert_measurement_with_nested_child_service() {
let tmp_dir = TempTedgeDir::new();
let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await;
let reg_message = Message::new(
&Topic::new_unchecked("te/device/immediate_child//"),
json!({
"@type":"child-device",
"@parent":"device/main//",
"@id":"immediate_child"
})
.to_string(),
);
let _ = converter.convert(&reg_message).await;

let reg_message = Message::new(
&Topic::new_unchecked("te/device/nested_child//"),
json!({
"@type":"child-device",
"@parent":"device/immediate_child//",
"@id":"nested_child"
})
.to_string(),
);
let _ = converter.convert(&reg_message).await;

let reg_message = Message::new(
&Topic::new_unchecked("te/device/nested_child/service/nested_service"),
json!({
"@type":"service",
"@parent":"device/nested_child//",
"@id":"nested_service"
})
.to_string(),
);
let _ = converter.convert(&reg_message).await;

let in_topic = "te/device/nested_child/service/nested_service/m/";
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload);

let expected_c8y_json_message = Message::new(
&Topic::new_unchecked("c8y/measurement/measurements/create"),
json!({
"externalSource":{"externalId":"nested_service","type":"c8y_Serial"},
"temp":{"temp":{"value":1.0}},
"time":"2021-11-16T17:45:40.571760714+01:00",
"type":"ThinEdgeMeasurement"
})
.to_string(),
);

// Test the first output messages contains SmartREST and C8Y JSON.
let out_first_messages = converter.convert(&in_message).await;
assert_eq!(out_first_messages, vec![expected_c8y_json_message.clone()]);
}

#[tokio::test]
async fn convert_measurement_for_child_device_service() {
let tmp_dir = TempTedgeDir::new();
let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await;

let in_topic = "te/device/child1/service/app1/m/m_type";
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload);

let expected_child_create_msg = Message::new(
&Topic::new_unchecked("te/device/child1//"),
json!({
"@id":"test-device:device:child1",
"@type":"child-device",
"name":"child1",
})
.to_string(),
)
.with_retain();

let expected_smart_rest_message_child = Message::new(
&Topic::new_unchecked("c8y/s/us"),
"101,test-device:device:child1,child1,thin-edge.io-child",
);
let expected_service_create_msg = Message::new(
&Topic::new_unchecked("te/device/child1/service/app1"),
json!({
"@id":"test-device:device:child1:service:app1",
"@parent":"device/child1//",
"@type":"service",
"name":"app1",
"type":"service"
})
.to_string(),
)
.with_retain();

let expected_smart_rest_message_service = Message::new(
&Topic::new_unchecked("c8y/s/us/test-device:device:child1"),
"102,test-device:device:child1:service:app1,service,app1,up",
);
let expected_c8y_json_message = Message::new(
&Topic::new_unchecked("c8y/measurement/measurements/create"),
json!({
"externalSource":{
"externalId":"test-device:device:child1:service:app1",
"type":"c8y_Serial"
},
"temp":{"temp":{"value":1.0}},
"time":"2021-11-16T17:45:40.571760714+01:00",
"type":"m_type"})
.to_string(),
);

// Test the first output messages contains SmartREST and C8Y JSON.
let out_first_messages = converter.convert(&in_message).await;
assert_eq!(
out_first_messages,
vec![
expected_child_create_msg,
expected_smart_rest_message_child,
expected_service_create_msg,
expected_smart_rest_message_service,
expected_c8y_json_message.clone()
]
);

// Test the second output messages doesn't contain SmartREST child device creation.
let out_second_messages = converter.convert(&in_message).await;
assert_eq!(out_second_messages, vec![expected_c8y_json_message]);
}

albinsuresh marked this conversation as resolved.
Show resolved Hide resolved
#[tokio::test]
async fn convert_measurement_for_main_device_service() {
let tmp_dir = TempTedgeDir::new();
let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await;

let in_topic = "te/device/main/service/appm/m/m_type";
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload);

let expected_create_service_msg = Message::new(
&Topic::new_unchecked("te/device/main/service/appm"),
json!({
"@id":"test-device:device:main:service:appm",
"@parent":"device/main//",
"@type":"service",
"name":"appm",
"type":"service"})
.to_string(),
)
.with_retain();

let expected_c8y_json_message = Message::new(
&Topic::new_unchecked("c8y/measurement/measurements/create"),
json!({
"externalSource":{
"externalId":"test-device:device:main:service:appm",
"type":"c8y_Serial"
},
"temp":{"temp":{"value":1.0}},
"time":"2021-11-16T17:45:40.571760714+01:00",
"type":"m_type"})
.to_string(),
);

let expected_smart_rest_message_service = Message::new(
&Topic::new_unchecked("c8y/s/us"),
"102,test-device:device:main:service:appm,service,appm,up",
);

// Test the first output messages contains SmartREST and C8Y JSON.
let out_first_messages = converter.convert(&in_message).await;
assert_eq!(
out_first_messages,
vec![
expected_create_service_msg,
expected_smart_rest_message_service,
expected_c8y_json_message.clone()
]
);

let out_second_messages = converter.convert(&in_message).await;
assert_eq!(out_second_messages, vec![expected_c8y_json_message]);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some tests for events and alarms of services as well? They were working magically already, but we never had test coverage. The existing child device tests for the same could be copied and modified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in tests.rs

#[tokio::test]
#[ignore = "FIXME: the registration is currently done even if the message is ill-formed"]
async fn convert_first_measurement_invalid_then_valid_with_child_id() {
Expand Down
10 changes: 5 additions & 5 deletions crates/extensions/c8y_mapper_ext/src/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ impl C8yJsonSerializer {

json.write_open_obj();

if entity.r#type == EntityType::ChildDevice {
let child_id = &entity.external_id;
// In case the measurement is addressed to a child-device use fragment
// "externalSource" to tell c8Y identity API to use child-device
if entity.r#type == EntityType::ChildDevice || entity.r#type == EntityType::Service {
let entity_id = &entity.external_id;
// In case the measurement is addressed to a child-device or a service, use fragment
// "externalSource" to tell c8Y identity API to use child-device or for service
// object referenced by "externalId", instead of root device object
// referenced by MQTT client's Device ID.
let _ = json.write_key("externalSource");
json.write_open_obj();
let _ = json.write_key("externalId");
let _ = json.write_str(child_id.as_ref());
let _ = json.write_str(entity_id.as_ref());
let _ = json.write_key("type");
let _ = json.write_str("c8y_Serial");
json.write_close_obj();
Expand Down
Loading