Skip to content

Commit

Permalink
Register inventory.json contents as twin data
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh committed Oct 18, 2023
1 parent cb1863d commit f0c96fc
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 107 deletions.
111 changes: 22 additions & 89 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use super::alarm_converter::AlarmConverter;
use super::config::C8yMapperConfig;
use super::config::MQTT_MESSAGE_SIZE_THRESHOLD;
use super::error::CumulocityMapperError;
use super::fragments::C8yAgentFragment;
use super::fragments::C8yDeviceDataFragment;
use super::service_monitor;
use crate::actor::CmdId;
use crate::actor::IdDownloadRequest;
Expand Down Expand Up @@ -56,8 +54,6 @@ use service_monitor::convert_health_status_message;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fs;
use std::fs::File;
use std::io::Read;
use std::path::Path;
use std::path::PathBuf;
use tedge_actors::LoggingSender;
Expand Down Expand Up @@ -96,13 +92,10 @@ use thiserror::Error;
use time::format_description::well_known::Rfc3339;
use tokio::time::Duration;
use tracing::debug;
use tracing::info;
use tracing::log::error;

const C8Y_CLOUD: &str = "c8y";
const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "device/inventory.json";
const SUPPORTED_OPERATIONS_DIRECTORY: &str = "operations";
pub const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update";
const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/";
const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create";
const TEDGE_AGENT_LOG_DIR: &str = "tedge/agent";
Expand Down Expand Up @@ -172,7 +165,8 @@ pub struct CumulocityConverter {
pub config: C8yMapperConfig,
pub(crate) mapper_config: MapperConfig,
pub device_name: String,
device_type: String,
pub(crate) device_type: String,
pub(crate) device_topic_id: EntityTopicId,
alarm_converter: AlarmConverter,
pub operations: Operations,
operation_logs: OperationLogs,
Expand All @@ -188,6 +182,7 @@ pub struct CumulocityConverter {
pub auth_proxy: ProxyUrlGenerator,
pub downloader_sender: LoggingSender<IdDownloadRequest>,
pub pending_operations: HashMap<CmdId, SmartRestOperationVariant>,
pub inventory_model: Value,
}

impl CumulocityConverter {
Expand Down Expand Up @@ -230,12 +225,18 @@ impl CumulocityConverter {
)
.unwrap();

let inventory_model = json!({
"name": device_id.clone(),
"type": device_type.clone(),
});

Ok(CumulocityConverter {
size_threshold,
config,
mapper_config,
device_name: device_id,
device_type,
device_topic_id: EntityTopicId::default_main_device(),
alarm_converter,
operations,
operation_logs,
Expand All @@ -251,6 +252,7 @@ impl CumulocityConverter {
auth_proxy,
downloader_sender,
pending_operations: HashMap::new(),
inventory_model,
})
}

Expand Down Expand Up @@ -1072,31 +1074,26 @@ impl CumulocityConverter {
}

fn try_init_messages(&mut self) -> Result<Vec<Message>, ConversionError> {
let inventory_fragments_message = self.wrap_error(create_inventory_fragments_message(
&self.device_name,
&self.cfg_dir,
));
let mut messages = vec![];

let mut inventory_fragments_message = self.create_base_inventory_message()?;
messages.append(&mut inventory_fragments_message);

let supported_operations_message = self.wrap_error(
self.create_supported_operations(&self.cfg_dir.join("operations").join("c8y")),
);
messages.push(supported_operations_message);

let cloud_child_devices_message = create_request_for_cloud_child_devices();
let device_data_message = self.inventory_device_type_update_message()?;
messages.push(device_data_message);

let device_data_message = self.wrap_error(create_device_data_fragments(
&self.device_name,
&self.device_type,
));
let pending_operations_message = create_get_pending_operations_message()?;
messages.push(pending_operations_message);

let pending_operations_message = self.wrap_error(create_get_pending_operations_message());
let cloud_child_devices_message = create_request_for_cloud_child_devices();
messages.push(cloud_child_devices_message);

Ok(vec![
inventory_fragments_message,
supported_operations_message,
device_data_message,
pending_operations_message,
cloud_child_devices_message,
])
Ok(messages)
}

fn create_supported_operations(&self, path: &Path) -> Result<Message, ConversionError> {
Expand Down Expand Up @@ -1171,17 +1168,6 @@ fn get_child_id(dir_path: &PathBuf) -> Result<String, ConversionError> {
}
}

fn create_device_data_fragments(
device_name: &str,
device_type: &str,
) -> Result<Message, ConversionError> {
let device_data = C8yDeviceDataFragment::from_type(device_type)?;
let ops_msg = device_data.to_json()?;

let topic = Topic::new_unchecked(&format!("{INVENTORY_MANAGED_OBJECTS_TOPIC}/{device_name}",));
Ok(Message::new(&topic, ops_msg.to_string()))
}

fn create_get_software_list_message() -> Result<Message, ConversionError> {
let request = SoftwareListRequest::default();
let topic = Topic::new(RequestTopic::SoftwareListRequest.as_str())?;
Expand Down Expand Up @@ -1227,17 +1213,6 @@ fn add_external_device_registration_message(
false
}

fn create_inventory_fragments_message(
device_name: &str,
cfg_dir: &Path,
) -> Result<Message, ConversionError> {
let inventory_file_path = format!("{}/{INVENTORY_FRAGMENTS_FILE_LOCATION}", cfg_dir.display());
let ops_msg = get_inventory_fragments(&inventory_file_path)?;

let topic = Topic::new_unchecked(&format!("{INVENTORY_MANAGED_OBJECTS_TOPIC}/{device_name}"));
Ok(Message::new(&topic, ops_msg.to_string()))
}

impl CumulocityConverter {
async fn register_restart_operation(
&self,
Expand Down Expand Up @@ -1404,48 +1379,6 @@ pub fn get_local_child_devices_list(
.collect::<std::collections::HashSet<String>>())
}

/// reads a json file to serde_json::Value
fn read_json_from_file(file_path: &str) -> Result<serde_json::Value, ConversionError> {
let mut file = File::open(Path::new(file_path))?;
let mut data = String::new();
file.read_to_string(&mut data)?;
let json: serde_json::Value = serde_json::from_str(&data)?;
info!("Read the fragments from {file_path} file");
Ok(json)
}

/// gets a serde_json::Value of inventory
fn get_inventory_fragments(
inventory_file_path: &str,
) -> Result<serde_json::Value, ConversionError> {
let agent_fragment = C8yAgentFragment::new()?;
let json_fragment = agent_fragment.to_json()?;

match read_json_from_file(inventory_file_path) {
Ok(mut json) => {
json.as_object_mut()
.ok_or(ConversionError::FromOptionError)?
.insert(
"c8y_Agent".to_string(),
json_fragment
.get("c8y_Agent")
.ok_or(ConversionError::FromOptionError)?
.to_owned(),
);
Ok(json)
}
Err(ConversionError::FromStdIo(_)) => {
info!("Could not read inventory fragments from file {inventory_file_path}");
Ok(json_fragment)
}
Err(ConversionError::FromSerdeJson(e)) => {
info!("Could not parse the {inventory_file_path} file due to: {e}");
Ok(json_fragment)
}
Err(_) => Ok(json_fragment),
}
}

async fn create_tedge_agent_supported_ops(ops_dir: PathBuf) -> Result<(), ConversionError> {
create_file_with_defaults(ops_dir.join("c8y_SoftwareUpdate"), None)?;

Expand Down
Loading

0 comments on commit f0c96fc

Please sign in to comment.