Skip to content

Commit

Permalink
Merge pull request #2340 from rina23q/improve/2303/expand-external-id…
Browse files Browse the repository at this point in the history
…-api

Add try_get() and try_get_by_external_id() to entity_store
  • Loading branch information
rina23q authored Oct 12, 2023
2 parents 1dffbb6 + 35057d9 commit 2724cc9
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 93 deletions.
19 changes: 18 additions & 1 deletion crates/core/tedge_api/src/entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,29 @@ impl EntityStore {
self.entities.get(entity_topic_id)
}

/// Returns information for an entity under a given device/service id .
/// Tries to get information about an entity using its `EntityTopicId`,
/// returning an error if the entity is not registered.
pub fn try_get(&self, entity_topic_id: &EntityTopicId) -> Result<&EntityMetadata, Error> {
self.get(entity_topic_id)
.ok_or_else(|| Error::UnknownEntity(entity_topic_id.to_string()))
}

/// Returns information for an entity under a given device/service id.
pub fn get_by_external_id(&self, external_id: &EntityExternalId) -> Option<&EntityMetadata> {
let topic_id = self.entity_id_index.get(external_id)?;
self.get(topic_id)
}

/// Tries to get information about an entity using its `EntityExternalId`,
/// returning an error if the entity is not registered.
pub fn try_get_by_external_id(
&self,
external_id: &EntityExternalId,
) -> Result<&EntityMetadata, Error> {
self.get_by_external_id(external_id)
.ok_or_else(|| Error::UnknownEntity(external_id.into()))
}

/// Returns the MQTT identifier of the main device.
///
/// The main device is an entity with `@type: "device"`.
Expand Down
75 changes: 25 additions & 50 deletions crates/extensions/c8y_mapper_ext/src/config_operations.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::converter::CumulocityConverter;
use crate::error::ConversionError;
use crate::error::CumulocityMapperError;
use crate::error::CumulocityMapperError::UnknownDevice;
use c8y_api::smartrest::smartrest_deserializer::SmartRestConfigDownloadRequest;
use c8y_api::smartrest::smartrest_deserializer::SmartRestConfigUploadRequest;
use c8y_api::smartrest::smartrest_deserializer::SmartRestOperationVariant;
Expand Down Expand Up @@ -77,24 +76,20 @@ impl CumulocityConverter {
let snapshot_request = SmartRestConfigUploadRequest::from_smartrest(smartrest)?;
let target = self
.entity_store
.get_by_external_id(&snapshot_request.device.clone().into())
.ok_or_else(|| UnknownDevice {
device_id: snapshot_request.device.to_string(),
})?;
.try_get_by_external_id(&snapshot_request.device.clone().into())?;

let cmd_id = nanoid!();
let channel = Channel::Command {
operation: OperationType::ConfigSnapshot,
cmd_id: cmd_id.clone(),
};
let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel);
let external_id: String = target.external_id.clone().into();

// Replace '/' with ':' to avoid creating unexpected directories in file transfer repo
let tedge_url = format!(
"http://{}/tedge/file-transfer/{}/config_snapshot/{}-{}",
&self.config.tedge_http_host,
external_id,
target.external_id.as_ref(),
snapshot_request.config_type.replace('/', ":"),
cmd_id
);
Expand Down Expand Up @@ -127,13 +122,7 @@ impl CumulocityConverter {
return Ok(vec![]);
}

// get the device metadata from its id
let device = self.entity_store.get(topic_id).ok_or_else(|| {
CumulocityMapperError::UnregisteredDevice {
topic_id: topic_id.to_string(),
}
})?;
let external_id = &device.external_id;
let external_id = self.entity_store.try_get(topic_id)?.external_id.as_ref();
let smartrest_topic = self.smartrest_publish_topic_for_entity(topic_id)?;
let payload = message.payload_str()?;
let response = &ConfigSnapshotCmdPayload::from_json(payload)?;
Expand All @@ -151,7 +140,7 @@ impl CumulocityConverter {
.config
.data_dir
.file_transfer_dir()
.join(device.external_id.as_ref())
.join(external_id)
.join("config_snapshot")
.join(format!(
"{}-{}",
Expand All @@ -164,7 +153,7 @@ impl CumulocityConverter {
.upload_file(
uploaded_file_path.as_std_path(),
&response.config_type,
external_id.as_ref().to_string(),
external_id.to_string(),
)
.await; // We need to get rid of this await, otherwise it blocks

Expand Down Expand Up @@ -239,12 +228,9 @@ impl CumulocityConverter {
}

let smartrest = SmartRestConfigDownloadRequest::from_smartrest(smartrest)?;
let entity = self
let target = self
.entity_store
.get_by_external_id(&smartrest.device.clone().into())
.ok_or_else(|| UnknownDevice {
device_id: smartrest.device.clone(),
})?;
.try_get_by_external_id(&smartrest.device.clone().into())?;

let cmd_id = nanoid!();
let remote_url = smartrest.url.as_str();
Expand All @@ -255,13 +241,13 @@ impl CumulocityConverter {
// No download. Create a symlink and config_update command.
info!("Hit the file cache={file_cache_path}. Create a symlink to the file");
self.create_symlink_for_config_update(
entity,
target,
&smartrest.config_type,
&cmd_id,
file_cache_path,
)?;

let message = self.create_config_update_cmd(cmd_id.into(), &smartrest, entity);
let message = self.create_config_update_cmd(cmd_id.into(), &smartrest, target);
Ok(message)
} else {
// Require file download
Expand Down Expand Up @@ -300,26 +286,23 @@ impl CumulocityConverter {
smartrest: &SmartRestConfigDownloadRequest,
download_result: DownloadResult,
) -> Result<Vec<Message>, ConversionError> {
let device = self
let target = self
.entity_store
.get_by_external_id(&smartrest.device.clone().into())
.ok_or_else(|| UnknownDevice {
device_id: smartrest.device.to_string(),
})?;
.try_get_by_external_id(&smartrest.device.clone().into())?;

match download_result {
Ok(download_response) => {
self.create_symlink_for_config_update(
device,
target,
&smartrest.config_type,
&cmd_id,
download_response.file_path,
)?;
let message = self.create_config_update_cmd(cmd_id, smartrest, device);
let message = self.create_config_update_cmd(cmd_id, smartrest, target);
Ok(message)
}
Err(download_err) => {
let sm_topic = self.smartrest_publish_topic_for_entity(&device.topic_id)?;
let sm_topic = self.smartrest_publish_topic_for_entity(&target.topic_id)?;
let smartrest_executing = SmartRestSetOperationToExecuting::new(
CumulocitySupportedOperations::C8yDownloadConfigFile,
)
Expand Down Expand Up @@ -357,11 +340,7 @@ impl CumulocityConverter {
return Ok(vec![]);
}

let device = self.entity_store.get(topic_id).ok_or_else(|| {
CumulocityMapperError::UnregisteredDevice {
topic_id: topic_id.to_string(),
}
})?;
let target = self.entity_store.try_get(topic_id)?;
let sm_topic = self.smartrest_publish_topic_for_entity(topic_id)?;
let payload = message.payload_str()?;
let response = &ConfigUpdateCmdPayload::from_json(payload)?;
Expand All @@ -384,7 +363,7 @@ impl CumulocityConverter {
.with_retain()
.with_qos(QoS::AtLeastOnce);

self.delete_symlink_for_config_update(device, &response.config_type, cmd_id)?;
self.delete_symlink_for_config_update(target, &response.config_type, cmd_id)?;

vec![c8y_notification, clear_local_cmd]
}
Expand All @@ -399,7 +378,7 @@ impl CumulocityConverter {
.with_retain()
.with_qos(QoS::AtLeastOnce);

self.delete_symlink_for_config_update(device, &response.config_type, cmd_id)?;
self.delete_symlink_for_config_update(target, &response.config_type, cmd_id)?;

vec![c8y_notification, clear_local_cmd]
}
Expand Down Expand Up @@ -435,20 +414,16 @@ impl CumulocityConverter {
let metadata = ConfigMetadata::from_json(message.payload_str()?)?;

// get the device metadata from its id
let device = self.entity_store.get(topic_id).ok_or_else(|| {
CumulocityMapperError::UnregisteredDevice {
topic_id: topic_id.to_string(),
}
})?;
let target = self.entity_store.try_get(topic_id)?;

// Create a c8y operation file
let dir_path = match device.r#type {
let dir_path = match target.r#type {
EntityType::MainDevice => self.ops_dir.clone(),
EntityType::ChildDevice => {
match &device.parent {
match &target.parent {
Some(parent) if parent.is_default_main_device() => {
// Support only first level child devices due to the limitation of our file system supported operations scheme.
self.ops_dir.join(device.external_id.as_ref())
self.ops_dir.join(target.external_id.as_ref())
}
_ => {
warn!("config_snapshot and config_update features for nested child devices are currently unsupported");
Expand Down Expand Up @@ -510,7 +485,7 @@ impl CumulocityConverter {

fn create_symlink_for_config_update(
&self,
entity: &EntityMetadata,
target: &EntityMetadata,
config_type: &str,
cmd_id: &str,
original: impl AsRef<Path>,
Expand All @@ -519,7 +494,7 @@ impl CumulocityConverter {
.config
.data_dir
.file_transfer_dir()
.join(entity.external_id.as_ref())
.join(target.external_id.as_ref())
.join("config_update");
let symlink_path =
symlink_dir_path.join(format!("{}-{cmd_id}", config_type.replace('/', ":")));
Expand All @@ -534,15 +509,15 @@ impl CumulocityConverter {

fn delete_symlink_for_config_update(
&self,
entity: &EntityMetadata,
target: &EntityMetadata,
config_type: &str,
cmd_id: &str,
) -> Result<(), io::Error> {
let symlink_dir_path = self
.config
.data_dir
.file_transfer_dir()
.join(entity.external_id.as_ref())
.join(target.external_id.as_ref())
.join("config_update");
let symlink_path =
symlink_dir_path.join(format!("{}-{cmd_id}", config_type.replace('/', ":")));
Expand Down
17 changes: 3 additions & 14 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,7 @@ impl CumulocityConverter {
&self,
entity_topic_id: &EntityTopicId,
) -> Result<Topic, ConversionError> {
let entity = self.entity_store.get(entity_topic_id).ok_or_else(|| {
CumulocityMapperError::UnregisteredDevice {
topic_id: entity_topic_id.to_string(),
}
})?;
let entity = self.entity_store.try_get(entity_topic_id)?;

let mut ancestors_external_ids =
self.entity_store.ancestors_external_ids(entity_topic_id)?;
Expand Down Expand Up @@ -633,12 +629,7 @@ impl CumulocityConverter {
) -> Result<Vec<Message>, CumulocityMapperError> {
let request = SmartRestRestartRequest::from_smartrest(smartrest)?;
let device_id = &request.device.into();
let target = self
.entity_store
.get_by_external_id(device_id)
.ok_or_else(|| CumulocityMapperError::UnknownDevice {
device_id: device_id.as_ref().to_string(),
})?;
let target = self.entity_store.try_get_by_external_id(device_id)?;
let command = RestartCommand::new(target.topic_id.clone());
let message = command.command_message(&self.mqtt_schema);
Ok(vec![message])
Expand Down Expand Up @@ -1295,9 +1286,7 @@ impl CumulocityConverter {
.entity_store
.get(target)
.and_then(C8yTopic::smartrest_response_topic)
.ok_or_else(|| CumulocityMapperError::UnregisteredDevice {
topic_id: target.to_string(),
})?;
.ok_or_else(|| Error::UnknownEntity(target.to_string()))?;

match command.status() {
CommandStatus::Executing => {
Expand Down
7 changes: 2 additions & 5 deletions crates/extensions/c8y_mapper_ext/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,8 @@ pub enum ConversionError {
#[derive(thiserror::Error, Debug)]
#[allow(clippy::enum_variant_names)]
pub enum CumulocityMapperError {
#[error("Unknown device id: '{device_id}'")]
UnknownDevice { device_id: String },

#[error("Unregistered device topic: '{topic_id}'")]
UnregisteredDevice { topic_id: String },
#[error(transparent)]
FromEntityStore(#[from] tedge_api::entity_store::Error),

#[error(transparent)]
InvalidTopicError(#[from] tedge_api::TopicError),
Expand Down
30 changes: 7 additions & 23 deletions crates/extensions/c8y_mapper_ext/src/log_upload.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::converter::CumulocityConverter;
use crate::error::ConversionError;
use crate::error::CumulocityMapperError;
use crate::error::CumulocityMapperError::UnknownDevice;
use c8y_api::smartrest::smartrest_deserializer::SmartRestLogRequest;
use c8y_api::smartrest::smartrest_deserializer::SmartRestRequestGeneric;
use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations;
Expand Down Expand Up @@ -53,10 +52,7 @@ impl CumulocityConverter {
let device_external_id = log_request.device.into();
let target = self
.entity_store
.get_by_external_id(&device_external_id)
.ok_or_else(|| UnknownDevice {
device_id: device_external_id.into(),
})?;
.try_get_by_external_id(&device_external_id)?;

let cmd_id = nanoid!();
let channel = Channel::Command {
Expand Down Expand Up @@ -102,16 +98,8 @@ impl CumulocityConverter {
return Ok(vec![]);
}

// get the device metadata from its id
let device = self.entity_store.get(topic_id).ok_or_else(|| {
CumulocityMapperError::UnregisteredDevice {
topic_id: topic_id.to_string(),
}
})?;
let external_id = &device.external_id;

let external_id = self.entity_store.try_get(topic_id)?.external_id.as_ref();
let smartrest_topic = self.smartrest_publish_topic_for_entity(topic_id)?;

let payload = message.payload_str()?;
let response = &LogUploadCmdPayload::from_json(payload)?;

Expand All @@ -128,15 +116,15 @@ impl CumulocityConverter {
.config
.data_dir
.file_transfer_dir()
.join(device.external_id.as_ref())
.join(external_id)
.join("log_upload")
.join(format!("{}-{}", response.log_type, cmd_id));
let result = self
.http_proxy
.upload_file(
uploaded_file_path.as_std_path(),
&response.log_type,
external_id.as_ref().to_string(),
external_id.to_string(),
)
.await; // We need to get rid of this await, otherwise it blocks

Expand Down Expand Up @@ -195,17 +183,13 @@ impl CumulocityConverter {
let metadata = LogMetadata::from_json(message.payload_str()?)?;

// get the device metadata from its id
let device = self.entity_store.get(topic_id).ok_or_else(|| {
CumulocityMapperError::UnregisteredDevice {
topic_id: topic_id.to_string(),
}
})?;
let target = self.entity_store.try_get(topic_id)?;

// Create a c8y_LogfileRequest operation file
let dir_path = match device.r#type {
let dir_path = match target.r#type {
EntityType::MainDevice => self.ops_dir.clone(),
EntityType::ChildDevice => {
let child_dir_name = device.external_id.as_ref();
let child_dir_name = target.external_id.as_ref();
self.ops_dir.clone().join(child_dir_name)
}
EntityType::Service => {
Expand Down

0 comments on commit 2724cc9

Please sign in to comment.