Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT twin/ topic support for Cumulocity inventory updates #2280

Merged
merged 4 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ define_tedge_config! {

/// Set of MQTT topics the Cumulocity mapper should subscribe to
#[tedge_config(example = "te/+/+/+/+/a/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+")]
#[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+,te/+/+/+/+/a/+,te/+/+/+/+/status/health"))]
#[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/twin/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+,te/+/+/+/+/a/+,te/+/+/+/+/status/health"))]
topics: TemplatesSet,

enable: {
Expand Down
8 changes: 7 additions & 1 deletion crates/core/tedge_api/src/mqtt_topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,9 @@ pub enum TopicIdError {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Channel {
EntityMetadata,
EntityTwinData {
fragment_key: String,
},
Measurement {
measurement_type: String,
},
Expand Down Expand Up @@ -488,7 +491,9 @@ impl FromStr for Channel {
fn from_str(channel: &str) -> Result<Self, ChannelError> {
match channel.split('/').collect::<Vec<&str>>()[..] {
[""] => Ok(Channel::EntityMetadata),

["twin", fragment_key] => Ok(Channel::EntityTwinData {
fragment_key: fragment_key.to_string(),
}),
["m", measurement_type] => Ok(Channel::Measurement {
measurement_type: measurement_type.to_string(),
}),
Expand Down Expand Up @@ -528,6 +533,7 @@ impl Display for Channel {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Channel::EntityMetadata => Ok(()),
Channel::EntityTwinData { fragment_key } => write!(f, "twin/{fragment_key}"),

Channel::Measurement { measurement_type } => write!(f, "m/{measurement_type}"),
Channel::MeasurementMetadata { measurement_type } => {
Expand Down
12 changes: 12 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ use camino::Utf8PathBuf;
use std::net::IpAddr;
use std::path::Path;
use std::path::PathBuf;
use std::str::FromStr;
use tedge_api::mqtt_topics::ChannelFilter::Command;
use tedge_api::mqtt_topics::ChannelFilter::CommandMetadata;
use tedge_api::mqtt_topics::EntityFilter::AnyEntity;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::mqtt_topics::TopicIdError;
use tedge_api::path::DataDir;
use tedge_api::topic::ResponseTopic;
use tedge_config::ConfigNotSet;
Expand All @@ -26,6 +29,7 @@ pub struct C8yMapperConfig {
pub logs_path: Utf8PathBuf,
pub data_dir: DataDir,
pub device_id: String,
pub device_topic_id: EntityTopicId,
pub device_type: String,
pub service_type: String,
pub ops_dir: PathBuf,
Expand All @@ -44,6 +48,7 @@ impl C8yMapperConfig {
logs_path: Utf8PathBuf,
data_dir: DataDir,
device_id: String,
device_topic_id: EntityTopicId,
device_type: String,
service_type: String,
c8y_host: String,
Expand All @@ -60,6 +65,7 @@ impl C8yMapperConfig {
logs_path,
data_dir,
device_id,
device_topic_id,
device_type,
service_type,
ops_dir,
Expand All @@ -82,6 +88,7 @@ impl C8yMapperConfig {
let data_dir: DataDir = tedge_config.data.path.clone().into();
let device_id = tedge_config.device.id.try_read(tedge_config)?.to_string();
let device_type = tedge_config.device.ty.clone();
let device_topic_id = EntityTopicId::from_str(&tedge_config.mqtt.device_topic_id)?;
let service_type = tedge_config.service.ty.clone();
let c8y_host = tedge_config.c8y.http.or_config_not_set()?.to_string();
let tedge_http_address = tedge_config.http.bind.address;
Expand Down Expand Up @@ -129,6 +136,7 @@ impl C8yMapperConfig {
logs_path,
data_dir,
device_id,
device_topic_id,
device_type,
service_type,
c8y_host,
Expand Down Expand Up @@ -166,6 +174,7 @@ impl C8yMapperConfig {
pub fn default_external_topic_filter() -> TopicFilter {
vec![
"te/+/+/+/+",
"te/+/+/+/+/twin/+",
"te/+/+/+/+/m/+",
"te/+/+/+/+/e/+",
"te/+/+/+/+/a/+",
Expand All @@ -186,6 +195,9 @@ pub enum C8yMapperConfigBuildError {

#[error(transparent)]
FromC8yMapperConfigError(#[from] C8yMapperConfigError),

#[error(transparent)]
FromTopicIdError(#[from] TopicIdError),
}

#[derive(thiserror::Error, Debug)]
Expand Down
118 changes: 28 additions & 90 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use super::alarm_converter::AlarmConverter;
use super::config::C8yMapperConfig;
use super::config::MQTT_MESSAGE_SIZE_THRESHOLD;
use super::error::CumulocityMapperError;
use super::fragments::C8yAgentFragment;
use super::fragments::C8yDeviceDataFragment;
use super::service_monitor;
use crate::actor::CmdId;
use crate::actor::IdDownloadRequest;
Expand Down Expand Up @@ -55,8 +53,6 @@ use service_monitor::convert_health_status_message;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fs;
use std::fs::File;
use std::io::Read;
use std::path::Path;
use std::path::PathBuf;
use tedge_actors::LoggingSender;
Expand Down Expand Up @@ -95,14 +91,11 @@ use thiserror::Error;
use time::format_description::well_known::Rfc3339;
use tokio::time::Duration;
use tracing::debug;
use tracing::info;
use tracing::log::error;
use tracing::trace;

const C8Y_CLOUD: &str = "c8y";
const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "device/inventory.json";
const SUPPORTED_OPERATIONS_DIRECTORY: &str = "operations";
const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update";
const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/";
const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create";
const TEDGE_AGENT_LOG_DIR: &str = "tedge/agent";
Expand Down Expand Up @@ -171,7 +164,8 @@ pub struct CumulocityConverter {
pub config: C8yMapperConfig,
pub(crate) mapper_config: MapperConfig,
pub device_name: String,
device_type: String,
pub(crate) device_topic_id: EntityTopicId,
pub(crate) device_type: String,
alarm_converter: AlarmConverter,
pub operations: Operations,
operation_logs: OperationLogs,
Expand All @@ -187,6 +181,7 @@ pub struct CumulocityConverter {
pub auth_proxy: ProxyUrlGenerator,
pub downloader_sender: LoggingSender<IdDownloadRequest>,
pub pending_operations: HashMap<CmdId, SmartRestOperationVariant>,
pub inventory_model: Value, // Holds a live view of aggregated inventory, derived from various twin data
}

impl CumulocityConverter {
Expand All @@ -198,6 +193,7 @@ impl CumulocityConverter {
downloader_sender: LoggingSender<IdDownloadRequest>,
) -> Result<Self, CumulocityConverterBuildError> {
let device_id = config.device_id.clone();
let device_topic_id = config.device_topic_id.clone();
let device_type = config.device_type.clone();
let service_type = config.service_type.clone();
let c8y_host = config.c8y_host.clone();
Expand Down Expand Up @@ -230,11 +226,17 @@ impl CumulocityConverter {
)
.unwrap();

let inventory_model = json!({
"name": device_id.clone(),
"type": device_type.clone(),
});

Ok(CumulocityConverter {
size_threshold,
config,
mapper_config,
device_name: device_id,
device_topic_id,
device_type,
alarm_converter,
operations,
Expand All @@ -251,6 +253,7 @@ impl CumulocityConverter {
auth_proxy,
downloader_sender,
pending_operations: HashMap::new(),
inventory_model,
})
}

Expand Down Expand Up @@ -893,6 +896,10 @@ impl CumulocityConverter {
}

let mut messages = match &channel {
Channel::EntityTwinData { fragment_key } => {
self.try_convert_entity_twin_data(&source, message, fragment_key)?
}

Channel::Measurement { measurement_type } => {
self.try_convert_measurement(&source, message, measurement_type)?
}
Expand Down Expand Up @@ -1060,31 +1067,24 @@ impl CumulocityConverter {
}

fn try_init_messages(&mut self) -> Result<Vec<Message>, ConversionError> {
let inventory_fragments_message = self.wrap_error(create_inventory_fragments_message(
&self.device_name,
&self.cfg_dir,
));
let mut messages = self.parse_base_inventory_file()?;

let supported_operations_message = self.wrap_error(
self.create_supported_operations(&self.cfg_dir.join("operations").join("c8y")),
);
let supported_operations_message =
self.create_supported_operations(&self.cfg_dir.join("operations").join("c8y"))?;

let cloud_child_devices_message = create_request_for_cloud_child_devices();
let device_data_message = self.inventory_device_type_update_message()?;

let device_data_message = self.wrap_error(create_device_data_fragments(
&self.device_name,
&self.device_type,
));
let pending_operations_message = create_get_pending_operations_message()?;

let pending_operations_message = self.wrap_error(create_get_pending_operations_message());
let cloud_child_devices_message = create_request_for_cloud_child_devices();

Ok(vec![
inventory_fragments_message,
messages.append(&mut vec![
supported_operations_message,
device_data_message,
pending_operations_message,
cloud_child_devices_message,
])
]);
Ok(messages)
}

fn create_supported_operations(&self, path: &Path) -> Result<Message, ConversionError> {
Expand Down Expand Up @@ -1159,17 +1159,6 @@ fn get_child_id(dir_path: &PathBuf) -> Result<String, ConversionError> {
}
}

fn create_device_data_fragments(
device_name: &str,
device_type: &str,
) -> Result<Message, ConversionError> {
let device_data = C8yDeviceDataFragment::from_type(device_type)?;
let ops_msg = device_data.to_json()?;

let topic = Topic::new_unchecked(&format!("{INVENTORY_MANAGED_OBJECTS_TOPIC}/{device_name}",));
Ok(Message::new(&topic, ops_msg.to_string()))
}

fn create_get_software_list_message() -> Result<Message, ConversionError> {
let request = SoftwareListRequest::default();
let topic = Topic::new(RequestTopic::SoftwareListRequest.as_str())?;
Expand Down Expand Up @@ -1202,17 +1191,6 @@ fn create_request_for_cloud_child_devices() -> Message {
Message::new(&Topic::new_unchecked("c8y/s/us"), "105")
}

fn create_inventory_fragments_message(
device_name: &str,
cfg_dir: &Path,
) -> Result<Message, ConversionError> {
let inventory_file_path = format!("{}/{INVENTORY_FRAGMENTS_FILE_LOCATION}", cfg_dir.display());
let ops_msg = get_inventory_fragments(&inventory_file_path)?;

let topic = Topic::new_unchecked(&format!("{INVENTORY_MANAGED_OBJECTS_TOPIC}/{device_name}"));
Ok(Message::new(&topic, ops_msg.to_string()))
}

impl CumulocityConverter {
async fn register_restart_operation(
&self,
Expand Down Expand Up @@ -1395,48 +1373,6 @@ pub fn get_local_child_devices_list(
.collect::<std::collections::HashSet<String>>())
}

/// reads a json file to serde_json::Value
fn read_json_from_file(file_path: &str) -> Result<serde_json::Value, ConversionError> {
let mut file = File::open(Path::new(file_path))?;
let mut data = String::new();
file.read_to_string(&mut data)?;
let json: serde_json::Value = serde_json::from_str(&data)?;
info!("Read the fragments from {file_path} file");
Ok(json)
}

/// gets a serde_json::Value of inventory
fn get_inventory_fragments(
inventory_file_path: &str,
) -> Result<serde_json::Value, ConversionError> {
let agent_fragment = C8yAgentFragment::new()?;
let json_fragment = agent_fragment.to_json()?;

match read_json_from_file(inventory_file_path) {
Ok(mut json) => {
json.as_object_mut()
.ok_or(ConversionError::FromOptionError)?
.insert(
"c8y_Agent".to_string(),
json_fragment
.get("c8y_Agent")
.ok_or(ConversionError::FromOptionError)?
.to_owned(),
);
Ok(json)
}
Err(ConversionError::FromStdIo(_)) => {
info!("Could not read inventory fragments from file {inventory_file_path}");
Ok(json_fragment)
}
Err(ConversionError::FromSerdeJson(e)) => {
info!("Could not parse the {inventory_file_path} file due to: {e}");
Ok(json_fragment)
}
Err(_) => Ok(json_fragment),
}
}

async fn create_tedge_agent_supported_ops(ops_dir: &Path) -> Result<(), ConversionError> {
create_file_with_defaults(ops_dir.join("c8y_SoftwareUpdate"), None)?;

Expand All @@ -1451,7 +1387,7 @@ pub struct HealthStatus {
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::CumulocityConverter;
use crate::actor::IdDownloadRequest;
use crate::actor::IdDownloadResult;
Expand Down Expand Up @@ -2458,7 +2394,7 @@ mod tests {
assert!(!second_registration_message_mapped);
}

async fn create_c8y_converter(
pub(crate) async fn create_c8y_converter(
tmp_dir: &TempTedgeDir,
) -> (
CumulocityConverter,
Expand All @@ -2468,6 +2404,7 @@ mod tests {
tmp_dir.dir("tedge").dir("agent");

let device_id = "test-device".into();
let device_topic_id = EntityTopicId::default_main_device();
let device_type = "test-device-type".into();
let service_type = "service".into();
let c8y_host = "test.c8y.io".into();
Expand All @@ -2485,6 +2422,7 @@ mod tests {
tmp_dir.utf8_path_buf(),
tmp_dir.utf8_path_buf().into(),
device_id,
device_topic_id,
device_type,
service_type,
c8y_host,
Expand Down
Loading