Skip to content

Commit

Permalink
Merge pull request #2280 from reubenmiller/feat-tedge-inventory-topic
Browse files Browse the repository at this point in the history
Cumulocity inventory updates using entity twin data
  • Loading branch information
albinsuresh authored Oct 19, 2023
2 parents f811fad + d5f35fc commit 8948451
Show file tree
Hide file tree
Showing 12 changed files with 974 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ define_tedge_config! {

/// Set of MQTT topics the Cumulocity mapper should subscribe to
#[tedge_config(example = "te/+/+/+/+/a/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+")]
#[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+,te/+/+/+/+/a/+,te/+/+/+/+/status/health"))]
#[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/twin/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+,te/+/+/+/+/a/+,te/+/+/+/+/status/health"))]
topics: TemplatesSet,

enable: {
Expand Down
8 changes: 7 additions & 1 deletion crates/core/tedge_api/src/mqtt_topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,9 @@ pub enum TopicIdError {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Channel {
EntityMetadata,
EntityTwinData {
fragment_key: String,
},
Measurement {
measurement_type: String,
},
Expand Down Expand Up @@ -488,7 +491,9 @@ impl FromStr for Channel {
fn from_str(channel: &str) -> Result<Self, ChannelError> {
match channel.split('/').collect::<Vec<&str>>()[..] {
[""] => Ok(Channel::EntityMetadata),

["twin", fragment_key] => Ok(Channel::EntityTwinData {
fragment_key: fragment_key.to_string(),
}),
["m", measurement_type] => Ok(Channel::Measurement {
measurement_type: measurement_type.to_string(),
}),
Expand Down Expand Up @@ -528,6 +533,7 @@ impl Display for Channel {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Channel::EntityMetadata => Ok(()),
Channel::EntityTwinData { fragment_key } => write!(f, "twin/{fragment_key}"),

Channel::Measurement { measurement_type } => write!(f, "m/{measurement_type}"),
Channel::MeasurementMetadata { measurement_type } => {
Expand Down
12 changes: 12 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ use camino::Utf8PathBuf;
use std::net::IpAddr;
use std::path::Path;
use std::path::PathBuf;
use std::str::FromStr;
use tedge_api::mqtt_topics::ChannelFilter::Command;
use tedge_api::mqtt_topics::ChannelFilter::CommandMetadata;
use tedge_api::mqtt_topics::EntityFilter::AnyEntity;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::mqtt_topics::TopicIdError;
use tedge_api::path::DataDir;
use tedge_api::topic::ResponseTopic;
use tedge_config::ConfigNotSet;
Expand All @@ -26,6 +29,7 @@ pub struct C8yMapperConfig {
pub logs_path: Utf8PathBuf,
pub data_dir: DataDir,
pub device_id: String,
pub device_topic_id: EntityTopicId,
pub device_type: String,
pub service_type: String,
pub ops_dir: PathBuf,
Expand All @@ -44,6 +48,7 @@ impl C8yMapperConfig {
logs_path: Utf8PathBuf,
data_dir: DataDir,
device_id: String,
device_topic_id: EntityTopicId,
device_type: String,
service_type: String,
c8y_host: String,
Expand All @@ -60,6 +65,7 @@ impl C8yMapperConfig {
logs_path,
data_dir,
device_id,
device_topic_id,
device_type,
service_type,
ops_dir,
Expand All @@ -82,6 +88,7 @@ impl C8yMapperConfig {
let data_dir: DataDir = tedge_config.data.path.clone().into();
let device_id = tedge_config.device.id.try_read(tedge_config)?.to_string();
let device_type = tedge_config.device.ty.clone();
let device_topic_id = EntityTopicId::from_str(&tedge_config.mqtt.device_topic_id)?;
let service_type = tedge_config.service.ty.clone();
let c8y_host = tedge_config.c8y.http.or_config_not_set()?.to_string();
let tedge_http_address = tedge_config.http.bind.address;
Expand Down Expand Up @@ -129,6 +136,7 @@ impl C8yMapperConfig {
logs_path,
data_dir,
device_id,
device_topic_id,
device_type,
service_type,
c8y_host,
Expand Down Expand Up @@ -166,6 +174,7 @@ impl C8yMapperConfig {
pub fn default_external_topic_filter() -> TopicFilter {
vec![
"te/+/+/+/+",
"te/+/+/+/+/twin/+",
"te/+/+/+/+/m/+",
"te/+/+/+/+/e/+",
"te/+/+/+/+/a/+",
Expand All @@ -186,6 +195,9 @@ pub enum C8yMapperConfigBuildError {

#[error(transparent)]
FromC8yMapperConfigError(#[from] C8yMapperConfigError),

#[error(transparent)]
FromTopicIdError(#[from] TopicIdError),
}

#[derive(thiserror::Error, Debug)]
Expand Down
118 changes: 28 additions & 90 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use super::alarm_converter::AlarmConverter;
use super::config::C8yMapperConfig;
use super::config::MQTT_MESSAGE_SIZE_THRESHOLD;
use super::error::CumulocityMapperError;
use super::fragments::C8yAgentFragment;
use super::fragments::C8yDeviceDataFragment;
use super::service_monitor;
use crate::actor::CmdId;
use crate::actor::IdDownloadRequest;
Expand Down Expand Up @@ -55,8 +53,6 @@ use service_monitor::convert_health_status_message;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fs;
use std::fs::File;
use std::io::Read;
use std::path::Path;
use std::path::PathBuf;
use tedge_actors::LoggingSender;
Expand Down Expand Up @@ -95,14 +91,11 @@ use thiserror::Error;
use time::format_description::well_known::Rfc3339;
use tokio::time::Duration;
use tracing::debug;
use tracing::info;
use tracing::log::error;
use tracing::trace;

const C8Y_CLOUD: &str = "c8y";
const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "device/inventory.json";
const SUPPORTED_OPERATIONS_DIRECTORY: &str = "operations";
const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update";
const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/";
const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create";
const TEDGE_AGENT_LOG_DIR: &str = "tedge/agent";
Expand Down Expand Up @@ -171,7 +164,8 @@ pub struct CumulocityConverter {
pub config: C8yMapperConfig,
pub(crate) mapper_config: MapperConfig,
pub device_name: String,
device_type: String,
pub(crate) device_topic_id: EntityTopicId,
pub(crate) device_type: String,
alarm_converter: AlarmConverter,
pub operations: Operations,
operation_logs: OperationLogs,
Expand All @@ -187,6 +181,7 @@ pub struct CumulocityConverter {
pub auth_proxy: ProxyUrlGenerator,
pub downloader_sender: LoggingSender<IdDownloadRequest>,
pub pending_operations: HashMap<CmdId, SmartRestOperationVariant>,
pub inventory_model: Value, // Holds a live view of aggregated inventory, derived from various twin data
}

impl CumulocityConverter {
Expand All @@ -198,6 +193,7 @@ impl CumulocityConverter {
downloader_sender: LoggingSender<IdDownloadRequest>,
) -> Result<Self, CumulocityConverterBuildError> {
let device_id = config.device_id.clone();
let device_topic_id = config.device_topic_id.clone();
let device_type = config.device_type.clone();
let service_type = config.service_type.clone();
let c8y_host = config.c8y_host.clone();
Expand Down Expand Up @@ -230,11 +226,17 @@ impl CumulocityConverter {
)
.unwrap();

let inventory_model = json!({
"name": device_id.clone(),
"type": device_type.clone(),
});

Ok(CumulocityConverter {
size_threshold,
config,
mapper_config,
device_name: device_id,
device_topic_id,
device_type,
alarm_converter,
operations,
Expand All @@ -251,6 +253,7 @@ impl CumulocityConverter {
auth_proxy,
downloader_sender,
pending_operations: HashMap::new(),
inventory_model,
})
}

Expand Down Expand Up @@ -893,6 +896,10 @@ impl CumulocityConverter {
}

let mut messages = match &channel {
Channel::EntityTwinData { fragment_key } => {
self.try_convert_entity_twin_data(&source, message, fragment_key)?
}

Channel::Measurement { measurement_type } => {
self.try_convert_measurement(&source, message, measurement_type)?
}
Expand Down Expand Up @@ -1060,31 +1067,24 @@ impl CumulocityConverter {
}

fn try_init_messages(&mut self) -> Result<Vec<Message>, ConversionError> {
let inventory_fragments_message = self.wrap_error(create_inventory_fragments_message(
&self.device_name,
&self.cfg_dir,
));
let mut messages = self.parse_base_inventory_file()?;

let supported_operations_message = self.wrap_error(
self.create_supported_operations(&self.cfg_dir.join("operations").join("c8y")),
);
let supported_operations_message =
self.create_supported_operations(&self.cfg_dir.join("operations").join("c8y"))?;

let cloud_child_devices_message = create_request_for_cloud_child_devices();
let device_data_message = self.inventory_device_type_update_message()?;

let device_data_message = self.wrap_error(create_device_data_fragments(
&self.device_name,
&self.device_type,
));
let pending_operations_message = create_get_pending_operations_message()?;

let pending_operations_message = self.wrap_error(create_get_pending_operations_message());
let cloud_child_devices_message = create_request_for_cloud_child_devices();

Ok(vec![
inventory_fragments_message,
messages.append(&mut vec![
supported_operations_message,
device_data_message,
pending_operations_message,
cloud_child_devices_message,
])
]);
Ok(messages)
}

fn create_supported_operations(&self, path: &Path) -> Result<Message, ConversionError> {
Expand Down Expand Up @@ -1159,17 +1159,6 @@ fn get_child_id(dir_path: &PathBuf) -> Result<String, ConversionError> {
}
}

fn create_device_data_fragments(
device_name: &str,
device_type: &str,
) -> Result<Message, ConversionError> {
let device_data = C8yDeviceDataFragment::from_type(device_type)?;
let ops_msg = device_data.to_json()?;

let topic = Topic::new_unchecked(&format!("{INVENTORY_MANAGED_OBJECTS_TOPIC}/{device_name}",));
Ok(Message::new(&topic, ops_msg.to_string()))
}

fn create_get_software_list_message() -> Result<Message, ConversionError> {
let request = SoftwareListRequest::default();
let topic = Topic::new(RequestTopic::SoftwareListRequest.as_str())?;
Expand Down Expand Up @@ -1202,17 +1191,6 @@ fn create_request_for_cloud_child_devices() -> Message {
Message::new(&Topic::new_unchecked("c8y/s/us"), "105")
}

fn create_inventory_fragments_message(
device_name: &str,
cfg_dir: &Path,
) -> Result<Message, ConversionError> {
let inventory_file_path = format!("{}/{INVENTORY_FRAGMENTS_FILE_LOCATION}", cfg_dir.display());
let ops_msg = get_inventory_fragments(&inventory_file_path)?;

let topic = Topic::new_unchecked(&format!("{INVENTORY_MANAGED_OBJECTS_TOPIC}/{device_name}"));
Ok(Message::new(&topic, ops_msg.to_string()))
}

impl CumulocityConverter {
async fn register_restart_operation(
&self,
Expand Down Expand Up @@ -1395,48 +1373,6 @@ pub fn get_local_child_devices_list(
.collect::<std::collections::HashSet<String>>())
}

/// reads a json file to serde_json::Value
fn read_json_from_file(file_path: &str) -> Result<serde_json::Value, ConversionError> {
let mut file = File::open(Path::new(file_path))?;
let mut data = String::new();
file.read_to_string(&mut data)?;
let json: serde_json::Value = serde_json::from_str(&data)?;
info!("Read the fragments from {file_path} file");
Ok(json)
}

/// gets a serde_json::Value of inventory
fn get_inventory_fragments(
inventory_file_path: &str,
) -> Result<serde_json::Value, ConversionError> {
let agent_fragment = C8yAgentFragment::new()?;
let json_fragment = agent_fragment.to_json()?;

match read_json_from_file(inventory_file_path) {
Ok(mut json) => {
json.as_object_mut()
.ok_or(ConversionError::FromOptionError)?
.insert(
"c8y_Agent".to_string(),
json_fragment
.get("c8y_Agent")
.ok_or(ConversionError::FromOptionError)?
.to_owned(),
);
Ok(json)
}
Err(ConversionError::FromStdIo(_)) => {
info!("Could not read inventory fragments from file {inventory_file_path}");
Ok(json_fragment)
}
Err(ConversionError::FromSerdeJson(e)) => {
info!("Could not parse the {inventory_file_path} file due to: {e}");
Ok(json_fragment)
}
Err(_) => Ok(json_fragment),
}
}

async fn create_tedge_agent_supported_ops(ops_dir: &Path) -> Result<(), ConversionError> {
create_file_with_defaults(ops_dir.join("c8y_SoftwareUpdate"), None)?;

Expand All @@ -1451,7 +1387,7 @@ pub struct HealthStatus {
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::CumulocityConverter;
use crate::actor::IdDownloadRequest;
use crate::actor::IdDownloadResult;
Expand Down Expand Up @@ -2458,7 +2394,7 @@ mod tests {
assert!(!second_registration_message_mapped);
}

async fn create_c8y_converter(
pub(crate) async fn create_c8y_converter(
tmp_dir: &TempTedgeDir,
) -> (
CumulocityConverter,
Expand All @@ -2468,6 +2404,7 @@ mod tests {
tmp_dir.dir("tedge").dir("agent");

let device_id = "test-device".into();
let device_topic_id = EntityTopicId::default_main_device();
let device_type = "test-device-type".into();
let service_type = "service".into();
let c8y_host = "test.c8y.io".into();
Expand All @@ -2485,6 +2422,7 @@ mod tests {
tmp_dir.utf8_path_buf(),
tmp_dir.utf8_path_buf().into(),
device_id,
device_topic_id,
device_type,
service_type,
c8y_host,
Expand Down
Loading

1 comment on commit 8948451

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
321 0 3 321 100 1h0m11.64s

Please sign in to comment.