Skip to content

Commit

Permalink
C8y inventory updates via twin/ topic channel
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh committed Oct 13, 2023
1 parent 12819ca commit 5a14baa
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ define_tedge_config! {

/// Set of MQTT topics the Cumulocity mapper should subscribe to
#[tedge_config(example = "te/+/+/+/+/a/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+")]
#[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+,te/+/+/+/+/a/+,tedge/health/+,tedge/health/+/+"))]
#[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/twin/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+,te/+/+/+/+/a/+,tedge/health/+,tedge/health/+/+"))]
topics: TemplatesSet,

enable: {
Expand Down
8 changes: 7 additions & 1 deletion crates/core/tedge_api/src/mqtt_topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ pub enum TopicIdError {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Channel {
EntityMetadata,
EntityTwinData {
fragment_key: String,
},
Measurement {
measurement_type: String,
},
Expand Down Expand Up @@ -372,7 +375,9 @@ impl FromStr for Channel {
fn from_str(channel: &str) -> Result<Self, ChannelError> {
match channel.split('/').collect::<Vec<&str>>()[..] {
[""] => Ok(Channel::EntityMetadata),

["twin", fragment_key] => Ok(Channel::EntityTwinData {
fragment_key: fragment_key.to_string(),
}),
["m", measurement_type] => Ok(Channel::Measurement {
measurement_type: measurement_type.to_string(),
}),
Expand Down Expand Up @@ -411,6 +416,7 @@ impl Display for Channel {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Channel::EntityMetadata => Ok(()),
Channel::EntityTwinData { fragment_key } => write!(f, "twin/{fragment_key}"),

Channel::Measurement { measurement_type } => write!(f, "m/{measurement_type}"),
Channel::MeasurementMetadata { measurement_type } => {
Expand Down
1 change: 1 addition & 0 deletions crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ impl C8yMapperConfig {
pub fn default_external_topic_filter() -> TopicFilter {
vec![
"te/+/+/+/+",
"te/+/+/+/+/twin/+",
"te/+/+/+/+/m/+",
"te/+/+/+/+/e/+",
"te/+/+/+/+/a/+",
Expand Down
10 changes: 7 additions & 3 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ use tracing::log::error;
const C8Y_CLOUD: &str = "c8y";
const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "device/inventory.json";
const SUPPORTED_OPERATIONS_DIRECTORY: &str = "operations";
const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update";
pub const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update";
const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/";
const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create";
const TEDGE_AGENT_LOG_DIR: &str = "tedge/agent";
Expand Down Expand Up @@ -899,6 +899,10 @@ impl CumulocityConverter {
}

let mut messages = match &channel {
Channel::EntityTwinData { fragment_key } => {
self.try_convert_entity_twin_data(&source, message, fragment_key)?
}

Channel::Measurement { measurement_type } => {
self.try_convert_measurement(&source, message, measurement_type)?
}
Expand Down Expand Up @@ -1464,7 +1468,7 @@ pub fn check_tedge_agent_status(message: &Message) -> Result<bool, ConversionErr
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use crate::actor::IdDownloadRequest;
use crate::actor::IdDownloadResult;
use crate::Capabilities;
Expand Down Expand Up @@ -2419,7 +2423,7 @@ mod tests {
assert!(!second_registration_message_mapped);
}

async fn create_c8y_converter(
pub(crate) async fn create_c8y_converter(
tmp_dir: &TempTedgeDir,
) -> (
CumulocityConverter,
Expand Down
150 changes: 150 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/inventory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
use crate::converter::INVENTORY_MANAGED_OBJECTS_TOPIC;
use serde_json::json;
use serde_json::Value as JsonValue;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_mqtt_ext::Message;
use tedge_mqtt_ext::Topic;
use tracing::warn;

use crate::{converter::CumulocityConverter, error::ConversionError};

impl CumulocityConverter {
pub fn try_convert_entity_twin_data(
&mut self,
source: &EntityTopicId,
message: &Message,
fragment_key: &str,
) -> Result<Vec<Message>, ConversionError> {
let target_entity = self.entity_store.try_get(source)?;
let entity_external_id = target_entity.external_id.as_ref();
let payload = serde_json::from_slice::<JsonValue>(message.payload_bytes())?;

let sanitized_payload = if let JsonValue::Object(mut properties) = payload {
if properties.contains_key("name") {
warn!(
"Updating the entity `name` field via the twin/ topic channel is not supported"
);
properties.remove("name");
}
if properties.contains_key("type") {
warn!(
"Updating the entity `type` field via the twin/ topic channel is not supported"
);
properties.remove("name");
}
JsonValue::Object(properties)
} else {
payload
};

let mapped_json = json!({ fragment_key: sanitized_payload });

let topic = Topic::new_unchecked(&format!(
"{INVENTORY_MANAGED_OBJECTS_TOPIC}/{entity_external_id}"
));
Ok(vec![Message::new(&topic, mapped_json.to_string())])
}
}

#[cfg(test)]
mod tests {
use crate::converter::tests::create_c8y_converter;
use serde_json::json;
use tedge_mqtt_ext::test_helpers::assert_messages_includes_json;
use tedge_mqtt_ext::{Message, Topic};
use tedge_test_utils::fs::TempTedgeDir;

#[tokio::test]
async fn convert_entity_twin_data_json_object() {
let tmp_dir = TempTedgeDir::new();
let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await;

let twin_topic = "te/device/main///twin/device_os";
let twin_payload = json!({
"family": "Debian",
"version": "11"
});
let twin_message =
Message::new(&Topic::new_unchecked(twin_topic), twin_payload.to_string());
let inventory_messages = converter.convert(&twin_message).await;

assert_messages_includes_json(
inventory_messages,
[(
"c8y/inventory/managedObjects/update/test-device",
json!({
"device_os": {
"family": "Debian",
"version": "11"
}
}),
)],
);
}

#[tokio::test]
async fn convert_entity_twin_data_string_value() {
let tmp_dir = TempTedgeDir::new();
let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await;

let twin_message = Message::new(
&Topic::new_unchecked("te/device/main///twin/foo"),
r#""bar""#,
);
let inventory_messages = converter.convert(&twin_message).await;

assert_messages_includes_json(
inventory_messages,
[(
"c8y/inventory/managedObjects/update/test-device",
json!({
"foo": "bar"
}),
)],
);
}

#[tokio::test]
async fn convert_entity_twin_data_numeric_value() {
let tmp_dir = TempTedgeDir::new();
let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await;

let twin_message = Message::new(
&Topic::new_unchecked("te/device/main///twin/foo"),
r#"5.6789"#,
);
let inventory_messages = converter.convert(&twin_message).await;

assert_messages_includes_json(
inventory_messages,
[(
"c8y/inventory/managedObjects/update/test-device",
json!({
"foo": 5.6789
}),
)],
);
}

#[tokio::test]
async fn convert_entity_twin_data_boolean_value() {
let tmp_dir = TempTedgeDir::new();
let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await;

let twin_message = Message::new(
&Topic::new_unchecked("te/device/main///twin/enabled"),
r#"false"#,
);
let inventory_messages = converter.convert(&twin_message).await;

assert_messages_includes_json(
inventory_messages,
[(
"c8y/inventory/managedObjects/update/test-device",
json!({
"enabled": false
}),
)],
);
}
}
1 change: 1 addition & 0 deletions crates/extensions/c8y_mapper_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod converter;
pub mod dynamic_discovery;
pub mod error;
mod fragments;
mod inventory;
pub mod json;
mod log_upload;
mod serializer;
Expand Down
44 changes: 44 additions & 0 deletions crates/extensions/tedge_mqtt_ext/src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,47 @@ pub async fn assert_received_includes_json<I, S>(
);
}
}

pub fn assert_messages_contains_str<M, I, S>(messages: M, expected: I)
where
M: IntoIterator<Item = MqttMessage>,
I: IntoIterator<Item = (S, S)>,
S: AsRef<str> + Debug,
{
for (message, expected_msg) in messages.into_iter().zip(expected) {
let expected_topic = expected_msg.0.as_ref();
let expected_payload = expected_msg.1.as_ref();
assert_eq!(
message.topic,
Topic::new_unchecked(expected_topic),
"\nReceived unexpected message: {:?}",
message
);
let payload = message.payload_str().expect("non UTF-8 payload");
assert!(
payload.contains(expected_payload),
"Payload assertion failed.\n Actual: {} \n Expected: {}",
payload,
expected_payload
)
}
}

pub fn assert_messages_includes_json<M, I, S>(messages: M, expected: I)
where
M: IntoIterator<Item = MqttMessage>,
I: IntoIterator<Item = (S, serde_json::Value)>,
S: AsRef<str>,
{
for (message, expected_msg) in messages.into_iter().zip(expected) {
assert_eq!(message.topic, Topic::new_unchecked(expected_msg.0.as_ref()));
let payload = serde_json::from_str::<serde_json::Value>(
message.payload_str().expect("non UTF-8 payload"),
)
.expect("non JSON payload");
assert_json_include!(
actual: payload,
expected: expected_msg.1
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ Thin-edge device support sending inventory data via tedge topic


Thin-edge device supports sending inventory data via tedge topic to root fragments
Execute Command tedge mqtt pub "te/device/main///twin/subtype" "LinuxDeviceA"
Execute Command tedge mqtt pub "te/device/main///twin/type" "ShouldBeIgnored"
Execute Command tedge mqtt pub "te/device/main///twin/subtype" "\"LinuxDeviceA\""
Execute Command tedge mqtt pub "te/device/main///twin/type" "\"ShouldBeIgnored\""
Cumulocity.Set Device ${DEVICE_SN}
${mo}= Device Should Have Fragments subtype
Should Be Equal ${mo["subtype"]} LinuxDeviceA
Expand Down

0 comments on commit 5a14baa

Please sign in to comment.