Skip to content

Commit

Permalink
Merge pull request #3012 from Bravo555/improve/operations-module-reorg
Browse files Browse the repository at this point in the history
refactor: Reorganise c8y_mapper_ext::operations module
  • Loading branch information
Bravo555 authored Jul 30, 2024
2 parents c531450 + e419058 commit 2a9eed4
Show file tree
Hide file tree
Showing 15 changed files with 1,182 additions and 1,204 deletions.
24 changes: 6 additions & 18 deletions crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::operations::OperationHandler;
use crate::Capabilities;
use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic;
use c8y_api::smartrest::error::OperationsError;
Expand Down Expand Up @@ -198,24 +199,11 @@ impl C8yMapperConfig {
topics.add_all(mqtt_schema.topics(AnyEntity, CommandMetadata(cmd)));
}

if capabilities.log_upload {
topics.add_all(crate::operations::log_upload::log_upload_topic_filter(
&mqtt_schema,
));
}
if capabilities.config_snapshot {
topics.add_all(crate::operations::config_snapshot::topic_filter(
&mqtt_schema,
));
}
if capabilities.config_update {
topics.add_all(crate::operations::config_update::topic_filter(&mqtt_schema));
}
if capabilities.firmware_update {
topics.add_all(
crate::operations::firmware_update::firmware_update_topic_filter(&mqtt_schema),
);
}
let operation_topics = OperationHandler::topic_filter(&capabilities)
.into_iter()
.map(|(e, c)| mqtt_schema.topics(e, c))
.collect();
topics.add_all(operation_topics);

// Add user configurable external topic filters
for topic in tedge_config.c8y.topics.0.clone() {
Expand Down
8 changes: 4 additions & 4 deletions crates/extensions/c8y_mapper_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ mod tests;

#[derive(Debug, Clone, Copy, serde::Deserialize)]
pub struct Capabilities {
log_upload: bool,
config_snapshot: bool,
config_update: bool,
firmware_update: bool,
pub log_upload: bool,
pub config_snapshot: bool,
pub config_update: bool,
pub firmware_update: bool,
}

#[cfg(test)]
Expand Down
312 changes: 312 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/operations/convert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
//! Converting Cumulocity Smartrest operation messages into local thin-edge operation messages.
use std::sync::Arc;

use c8y_api::json_c8y_deserializer::C8yDownloadConfigFile;
use c8y_api::json_c8y_deserializer::C8yFirmware;
use c8y_api::json_c8y_deserializer::C8yLogfileRequest;
use c8y_api::json_c8y_deserializer::C8yUploadConfigFile;
use tedge_api::commands::CommandStatus;
use tedge_api::commands::ConfigMetadata;
use tedge_api::commands::ConfigSnapshotCmdPayload;
use tedge_api::commands::ConfigUpdateCmdPayload;
use tedge_api::commands::FirmwareUpdateCmdPayload;
use tedge_api::commands::LogMetadata;
use tedge_api::commands::LogUploadCmdPayload;
use tedge_api::entity_store::EntityExternalId;
use tedge_api::entity_store::EntityMetadata;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::Jsonify;
use tedge_mqtt_ext::MqttMessage;
use tracing::error;
use tracing::warn;

use crate::converter::CumulocityConverter;
use crate::error::ConversionError;
use crate::error::CumulocityMapperError;

impl CumulocityConverter {
/// Converts a config_snapshot metadata message to
/// - supported operation "c8y_UploadConfigFile"
/// - supported config types
pub fn convert_config_snapshot_metadata(
&mut self,
topic_id: &EntityTopicId,
message: &MqttMessage,
) -> Result<Vec<MqttMessage>, ConversionError> {
if !self.config.capabilities.config_snapshot {
warn!(
"Received config_snapshot metadata, however, config_snapshot feature is disabled"
);
}
self.convert_config_metadata(topic_id, message, "c8y_UploadConfigFile")
}

/// Converts a config_update metadata message to
/// - supported operation "c8y_DownloadConfigFile"
/// - supported config types
pub fn convert_config_update_metadata(
&mut self,
topic_id: &EntityTopicId,
message: &MqttMessage,
) -> Result<Vec<MqttMessage>, ConversionError> {
if !self.config.capabilities.config_update {
warn!("Received config_update metadata, however, config_update feature is disabled");
return Ok(vec![]);
}
self.convert_config_metadata(topic_id, message, "c8y_DownloadConfigFile")
}

/// Convert c8y_UploadConfigFile JSON over MQTT operation to ThinEdge config_snapshot command
pub fn convert_config_snapshot_request(
&self,
device_xid: String,
cmd_id: String,
config_upload_request: C8yUploadConfigFile,
) -> Result<Vec<MqttMessage>, CumulocityMapperError> {
let target = self
.entity_store
.try_get_by_external_id(&device_xid.into())?;

let channel = Channel::Command {
operation: OperationType::ConfigSnapshot,
cmd_id: cmd_id.clone(),
};
let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel);

// Replace '/' with ':' to avoid creating unexpected directories in file transfer repo
let tedge_url = format!(
"http://{}/tedge/file-transfer/{}/config_snapshot/{}-{}",
&self.config.tedge_http_host,
target.external_id.as_ref(),
config_upload_request.config_type.replace('/', ":"),
cmd_id
);

let request = ConfigSnapshotCmdPayload {
status: CommandStatus::Init,
tedge_url: Some(tedge_url),
config_type: config_upload_request.config_type,
path: None,
log_path: None,
};

// Command messages must be retained
Ok(vec![
MqttMessage::new(&topic, request.to_json()).with_retain()
])
}

/// Convert c8y_LogfileRequest operation to a ThinEdge log_upload command
pub fn convert_log_upload_request(
&self,
device_xid: String,
cmd_id: String,
log_request: C8yLogfileRequest,
) -> Result<Vec<MqttMessage>, CumulocityMapperError> {
let target = self
.entity_store
.try_get_by_external_id(&device_xid.into())?;

let channel = Channel::Command {
operation: OperationType::LogUpload,
cmd_id: cmd_id.clone(),
};
let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel);

let tedge_url = format!(
"http://{}/tedge/file-transfer/{}/log_upload/{}-{}",
&self.config.tedge_http_host,
target.external_id.as_ref(),
log_request.log_file,
cmd_id
);

let request = LogUploadCmdPayload {
status: CommandStatus::Init,
tedge_url,
log_type: log_request.log_file,
date_from: log_request.date_from,
date_to: log_request.date_to,
search_text: Some(log_request.search_text).filter(|s| !s.is_empty()),
lines: log_request.maximum_lines,
log_path: None,
};

// Command messages must be retained
Ok(vec![
MqttMessage::new(&topic, request.to_json()).with_retain()
])
}

/// Converts a log_upload metadata message to
/// - supported operation "c8y_LogfileRequest"
/// - supported log types
pub fn convert_log_metadata(
&mut self,
topic_id: &EntityTopicId,
message: &MqttMessage,
) -> Result<Vec<MqttMessage>, ConversionError> {
if !self.config.capabilities.log_upload {
warn!("Received log_upload metadata, however, log_upload feature is disabled");
return Ok(vec![]);
}

let mut messages = match self.register_operation(topic_id, "c8y_LogfileRequest") {
Err(err) => {
error!(
"Failed to register `c8y_LogfileRequest` operation for {topic_id} due to: {err}"
);
return Ok(vec![]);
}
Ok(messages) => messages,
};

// To SmartREST supported log types
let metadata = LogMetadata::from_json(message.payload_str()?)?;
let mut types = metadata.types;
types.sort();
let supported_log_types = types.join(",");
let payload = format!("118,{supported_log_types}");
let c8y_topic = self.smartrest_publish_topic_for_entity(topic_id)?;
messages.push(MqttMessage::new(&c8y_topic, payload));

Ok(messages)
}

/// Convert c8y_Firmware JSON over MQTT operation to ThinEdge firmware_update command.
pub fn convert_firmware_update_request(
&self,
device_xid: String,
cmd_id: String,
firmware_request: C8yFirmware,
) -> Result<Vec<MqttMessage>, CumulocityMapperError> {
let entity_xid: EntityExternalId = device_xid.into();

let target = self.entity_store.try_get_by_external_id(&entity_xid)?;

let channel = Channel::Command {
operation: OperationType::FirmwareUpdate,
cmd_id,
};
let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel);

let tedge_url =
if let Some(c8y_url) = self.c8y_endpoint.maybe_tenant_url(&firmware_request.url) {
self.auth_proxy.proxy_url(c8y_url).to_string()
} else {
firmware_request.url.clone()
};

let request = FirmwareUpdateCmdPayload {
status: CommandStatus::Init,
tedge_url: Some(tedge_url),
remote_url: firmware_request.url,
name: firmware_request.name,
version: firmware_request.version,
log_path: None,
};

// Command messages must be retained
Ok(vec![
MqttMessage::new(&topic, request.to_json()).with_retain()
])
}

pub fn register_firmware_update_operation(
&mut self,
topic_id: &EntityTopicId,
) -> Result<Vec<MqttMessage>, ConversionError> {
if !self.config.capabilities.firmware_update {
warn!(
"Received firmware_update metadata, however, firmware_update feature is disabled"
);
return Ok(vec![]);
}

match self.register_operation(topic_id, "c8y_Firmware") {
Err(err) => {
error!("Failed to register `c8y_Firmware` operation for {topic_id} due to: {err}");
Ok(vec![])
}
Ok(messages) => Ok(messages),
}
}

/// Upon receiving a SmartREST c8y_DownloadConfigFile request, convert it to a message on the
/// command channel.
pub async fn convert_config_update_request(
&self,
device_xid: String,
cmd_id: String,
config_download_request: C8yDownloadConfigFile,
) -> Result<Vec<MqttMessage>, CumulocityMapperError> {
let entity_xid: EntityExternalId = device_xid.into();
let target = self.entity_store.try_get_by_external_id(&entity_xid)?;

let message =
self.create_config_update_cmd(cmd_id.into(), &config_download_request, target);
Ok(message)
}

fn create_config_update_cmd(
&self,
cmd_id: Arc<str>,
config_download_request: &C8yDownloadConfigFile,
target: &EntityMetadata,
) -> Vec<MqttMessage> {
let channel = Channel::Command {
operation: OperationType::ConfigUpdate,
cmd_id: cmd_id.to_string(),
};
let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel);

let proxy_url = self
.c8y_endpoint
.maybe_tenant_url(&config_download_request.url)
.map(|cumulocity_url| self.auth_proxy.proxy_url(cumulocity_url).into());

let remote_url = proxy_url.unwrap_or(config_download_request.url.to_string());

let request = ConfigUpdateCmdPayload {
status: CommandStatus::Init,
tedge_url: None,
remote_url,
config_type: config_download_request.config_type.clone(),
path: None,
log_path: None,
};

// Command messages must be retained
vec![MqttMessage::new(&topic, request.to_json()).with_retain()]
}

fn convert_config_metadata(
&mut self,
topic_id: &EntityTopicId,
message: &MqttMessage,
c8y_op_name: &str,
) -> Result<Vec<MqttMessage>, ConversionError> {
let metadata = ConfigMetadata::from_json(message.payload_str()?)?;

let mut messages = match self.register_operation(topic_id, c8y_op_name) {
Err(err) => {
error!("Failed to register {c8y_op_name} operation for {topic_id} due to: {err}");
return Ok(vec![]);
}
Ok(messages) => messages,
};

// To SmartREST supported config types
let mut types = metadata.types;
types.sort();
let supported_config_types = types.join(",");
let payload = format!("119,{supported_config_types}");
let sm_topic = self.smartrest_publish_topic_for_entity(topic_id)?;
messages.push(MqttMessage::new(&sm_topic, payload));

Ok(messages)
}
}
Loading

0 comments on commit 2a9eed4

Please sign in to comment.