From 9a0ead2ec476e2ea3f9da3ad0017066666eed084 Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Fri, 26 May 2023 11:44:43 +0000 Subject: [PATCH] Extensible config management --- Cargo.lock | 10 + .../extensions/c8y_config_manager/Cargo.toml | 1 + .../c8y_config_manager/src/download.rs | 173 ++++++++++++------ .../c8y_config_manager/src/error.rs | 4 + .../c8y_config_manager/src/plugin_config.rs | 88 +++++++-- .../c8y_config_manager/src/upload.rs | 101 +++++++--- .../c8y-configuration-management.md | 50 +++++ 7 files changed, 322 insertions(+), 105 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 37ba77fa090..bcfd1ec4653 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -634,6 +634,7 @@ dependencies = [ "thiserror", "tokio", "toml 0.5.11", + "uuid", ] [[package]] @@ -4581,6 +4582,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "uuid" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "345444e32442451b267fc254ae85a209c64be56d2890e601a0c37ff0c3c5ecd2" +dependencies = [ + "getrandom", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/crates/extensions/c8y_config_manager/Cargo.toml b/crates/extensions/c8y_config_manager/Cargo.toml index 56129622032..a68fc1d154e 100644 --- a/crates/extensions/c8y_config_manager/Cargo.toml +++ b/crates/extensions/c8y_config_manager/Cargo.toml @@ -26,6 +26,7 @@ tedge_utils = { path = "../../common/tedge_utils" } thiserror = "1.0" tokio = { version = "1.23", features = ["macros"] } toml = "0.5" +uuid = { version = "1.3", features = ["v4"] } [dev-dependencies] tedge_actors = { path = "../../core/tedge_actors", features = ["test-helpers"] } diff --git a/crates/extensions/c8y_config_manager/src/download.rs b/crates/extensions/c8y_config_manager/src/download.rs index c8b688b4913..04ec32bf4a5 100644 --- a/crates/extensions/c8y_config_manager/src/download.rs +++ b/crates/extensions/c8y_config_manager/src/download.rs @@ -1,5 +1,7 @@ use crate::child_device::ChildConfigOperationKey; use crate::child_device::DEFAULT_OPERATION_TIMEOUT; +use crate::plugin_config::ConfigEntry; +use crate::plugin_config::ExtEntry; use crate::plugin_config::InvalidConfigTypeError; use super::actor::ActiveOperationState; @@ -32,12 +34,15 @@ use serde_json::json; use std::collections::HashMap; use std::path::Path; use std::path::PathBuf; +use std::process::Command; +use std::process::Stdio; use tedge_actors::Sender; use tedge_api::OperationStatus; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; use tedge_timer_ext::SetTimeout; use tedge_utils::file::PermissionEntry; +use uuid::Uuid; pub const CONFIG_CHANGE_TOPIC: &str = "tedge/configuration_change"; @@ -86,17 +91,23 @@ impl ConfigDownloadManager { let mut target_file_entry = FileEntry::default(); let plugin_config = PluginConfig::new(&self.config.plugin_config_path); - let download_result = match plugin_config.get_file_entry_from_type(&target_config_type) { - Ok(file_entry) => { - target_file_entry = file_entry; - self.download_config_file( - smartrest_request.url.as_str(), - PathBuf::from(&target_file_entry.path), - target_file_entry.file_permissions, - message_box, - ) - .await - } + let download_result = match plugin_config.get_config_entry_from_type(&target_config_type) { + Ok(config_entry) => match config_entry { + ConfigEntry::FileEntry(file_entry) => { + target_file_entry = file_entry; + self.download_config_file( + smartrest_request.url.as_str(), + PathBuf::from(&target_file_entry.path), + target_file_entry.file_permissions, + message_box, + ) + .await + } + ConfigEntry::ExtEntry(ext_entry) => { + self.run_extension(ext_entry, &smartrest_request.url, message_box) + .await + } + }, Err(err) => Err(err.into()), }; @@ -142,6 +153,43 @@ impl ConfigDownloadManager { Ok(()) } + async fn run_extension( + &mut self, + ext_entry: ExtEntry, + download_url: &str, + message_box: &mut ConfigManagerMessageBox, + ) -> Result<(), ConfigManagementError> { + let tmp_file_path = self.config.tmp_dir.join(Uuid::new_v4().to_string()); + self.download_config_file( + download_url, + tmp_file_path.clone(), + PermissionEntry::default(), + message_box, + ) + .await?; + + let config_type = ext_entry.config_type.clone(); + let output = Command::new(ext_entry.exec) + .args([ + "set".into(), + config_type, + tmp_file_path.to_string_lossy().into(), + ]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .output()?; + + if output.status.success() { + Ok(()) + } else { + Err(ConfigManagementError::ExecError( + output.status, + String::from_utf8(output.stderr) + .unwrap_or_else(|_| "non UTF-8 chars in stderr".into()), + )) + } + } + /// Map the c8y_DownloadConfigFile request into a tedge/commands/req/config_update command for the child device. /// The config file update is shared with the child device via the file transfer service. /// The configuration update is downloaded from Cumulocity and is uploaded to the file transfer service, @@ -167,57 +215,62 @@ impl ConfigDownloadManager { self.config.config_dir.display(), ))); - match plugin_config.get_file_entry_from_type(&config_type) { - Ok(file_entry) => { - let config_management = ConfigOperationRequest::Update { - child_id: child_id.clone(), - file_entry, - }; - - if let Err(err) = self - .download_config_file( - smartrest_request.url.as_str(), - config_management.file_transfer_repository_full_path( - self.config.file_transfer_dir.clone(), - ), - PermissionEntry::new(None, None, None), //no need to change ownership of downloaded file - message_box, - ) - .await - { - // Fail the operation in the cloud if the file download itself fails - // by sending EXECUTING and FAILED responses back to back - - let failure_reason = format!( - "Downloading the config file update from {} failed with {}", - smartrest_request.url, err - ); - ConfigManagerActor::fail_config_operation_in_c8y( - ConfigOperation::Update, - Some(child_id), - ActiveOperationState::Pending, - failure_reason, - message_box, - ) - .await?; - } else { - let config_update_req_msg = MqttMessage::new( - &config_management.operation_request_topic(), - config_management - .operation_request_payload(&self.config.tedge_http_host)?, - ); - message_box.send(config_update_req_msg.into()).await?; - info!("Config update request for config type: {config_type} sent to child device: {child_id}"); - - self.active_child_ops - .insert(operation_key.clone(), ActiveOperationState::Pending); - - // Start the timer for operation timeout - message_box - .send(SetTimeout::new(DEFAULT_OPERATION_TIMEOUT, operation_key).into()) + match plugin_config.get_config_entry_from_type(&config_type) { + Ok(config_entry) => match config_entry { + ConfigEntry::FileEntry(file_entry) => { + let config_management = ConfigOperationRequest::Update { + child_id: child_id.clone(), + file_entry, + }; + + if let Err(err) = self + .download_config_file( + smartrest_request.url.as_str(), + config_management.file_transfer_repository_full_path( + self.config.file_transfer_dir.clone(), + ), + PermissionEntry::new(None, None, None), //no need to change ownership of downloaded file + message_box, + ) + .await + { + // Fail the operation in the cloud if the file download itself fails + // by sending EXECUTING and FAILED responses back to back + + let failure_reason = format!( + "Downloading the config file update from {} failed with {}", + smartrest_request.url, err + ); + ConfigManagerActor::fail_config_operation_in_c8y( + ConfigOperation::Update, + Some(child_id), + ActiveOperationState::Pending, + failure_reason, + message_box, + ) .await?; + } else { + let config_update_req_msg = MqttMessage::new( + &config_management.operation_request_topic(), + config_management + .operation_request_payload(&self.config.tedge_http_host)?, + ); + message_box.send(config_update_req_msg.into()).await?; + info!("Config update request for config type: {config_type} sent to child device: {child_id}"); + + self.active_child_ops + .insert(operation_key.clone(), ActiveOperationState::Pending); + + // Start the timer for operation timeout + message_box + .send(SetTimeout::new(DEFAULT_OPERATION_TIMEOUT, operation_key).into()) + .await?; + } } - } + ConfigEntry::ExtEntry(_) => { + warn!("Exec entries not supported for child devices"); + } + }, Err(InvalidConfigTypeError { config_type }) => { warn!( "Ignoring the config operation request for unknown config type: {config_type}" diff --git a/crates/extensions/c8y_config_manager/src/error.rs b/crates/extensions/c8y_config_manager/src/error.rs index 15cd0afb881..9ac9d41268c 100644 --- a/crates/extensions/c8y_config_manager/src/error.rs +++ b/crates/extensions/c8y_config_manager/src/error.rs @@ -1,4 +1,5 @@ use std::io; +use std::process::ExitStatus; use tedge_actors::RuntimeError; use tedge_mqtt_ext::Topic; use tedge_utils::file::FileError; @@ -44,6 +45,9 @@ pub enum ConfigManagementError { #[error(transparent)] FromPathsError(#[from] PathsError), + + #[error("Command execution failed with exit code: {0:?}, reason: {1:?}")] + ExecError(ExitStatus, String), } impl From for RuntimeError { diff --git a/crates/extensions/c8y_config_manager/src/plugin_config.rs b/crates/extensions/c8y_config_manager/src/plugin_config.rs index 1f8797072e9..ae1c52c2c79 100644 --- a/crates/extensions/c8y_config_manager/src/plugin_config.rs +++ b/crates/extensions/c8y_config_manager/src/plugin_config.rs @@ -4,7 +4,7 @@ use log::error; use log::info; use serde::Deserialize; use std::borrow::Borrow; -use std::collections::HashSet; +use std::collections::HashMap; use std::fs; use std::hash::Hash; use std::hash::Hasher; @@ -18,6 +18,7 @@ use tedge_utils::file::PermissionEntry; #[serde(deny_unknown_fields)] struct RawPluginConfig { pub files: Vec, + pub exts: Option>, } #[derive(Deserialize, Debug, Default, Eq, PartialEq)] @@ -31,9 +32,23 @@ pub struct RawFileEntry { mode: Option, } -#[derive(Debug, Eq, PartialEq, Default, Clone)] +#[derive(Deserialize, Debug, Default, Eq, PartialEq)] +#[serde(deny_unknown_fields)] +pub struct RawExtEntry { + #[serde(rename = "type")] + config_type: String, + exec: String, +} + +#[derive(Debug, Clone)] pub struct PluginConfig { - pub files: HashSet, + pub configs: HashMap, +} + +#[derive(Debug, Clone)] +pub enum ConfigEntry { + FileEntry(FileEntry), + ExtEntry(ExtEntry), } #[derive(Debug, Eq, Default, Clone)] @@ -43,6 +58,12 @@ pub struct FileEntry { pub file_permissions: PermissionEntry, } +#[derive(Debug, Eq, Default, Clone)] +pub struct ExtEntry { + pub exec: String, + pub config_type: String, +} + impl Hash for FileEntry { fn hash(&self, state: &mut H) { self.config_type.hash(state); @@ -77,6 +98,18 @@ impl FileEntry { } } +impl Hash for ExtEntry { + fn hash(&self, state: &mut H) { + self.config_type.hash(state); + } +} + +impl PartialEq for ExtEntry { + fn eq(&self, other: &Self) -> bool { + self.config_type == other.config_type + } +} + impl RawPluginConfig { fn new(config_file_path: &Path) -> Self { let path_str = config_file_path.display().to_string(); @@ -112,8 +145,12 @@ impl PluginConfig { None, None, ); + Self { - files: HashSet::from([c8y_configuration_plugin]), + configs: HashMap::from([( + DEFAULT_PLUGIN_CONFIG_TYPE.into(), + ConfigEntry::FileEntry(c8y_configuration_plugin), + )]), } } @@ -139,11 +176,32 @@ impl PluginConfig { raw_entry.group, raw_entry.mode, ); - if !self.files.insert(entry) { + if self + .configs + .insert(config_type.clone(), ConfigEntry::FileEntry(entry)) + .is_some() + { error!("The config file has the duplicated type '{}'.", config_type); return original_plugin_config; } } + for raw_entry in raw_config.exts.into_iter().flatten() { + let config_type = raw_entry.config_type; + let entry = ExtEntry { + config_type: config_type.clone(), + exec: raw_entry.exec, + }; + + if self + .configs + .insert(config_type.clone(), ConfigEntry::ExtEntry(entry)) + .is_some() + { + error!("The config file has the duplicated type '{}'.", config_type); + return original_plugin_config; + } + } + self } @@ -161,30 +219,30 @@ impl PluginConfig { Ok(MqttMessage::new(&topic, self.to_smartrest_payload())) } - pub fn get_all_file_types(&self) -> Vec { - self.files - .iter() - .map(|x| x.config_type.to_string()) + pub fn get_all_config_types(&self) -> Vec { + self.configs + .keys() + .map(|x| x.to_owned()) .collect::>() } - pub fn get_file_entry_from_type( + pub fn get_config_entry_from_type( &self, config_type: &str, - ) -> Result { - let file_entry = self - .files + ) -> Result { + let config_entry = self + .configs .get(&config_type.to_string()) .ok_or(InvalidConfigTypeError { config_type: config_type.to_owned(), })? .to_owned(); - Ok(file_entry) + Ok(config_entry) } // 119,typeA,typeB,... fn to_smartrest_payload(&self) -> String { - let mut config_types = self.get_all_file_types(); + let mut config_types = self.get_all_config_types(); // Sort because hashset doesn't guarantee the order config_types.sort(); let supported_config_types = config_types.join(","); diff --git a/crates/extensions/c8y_config_manager/src/upload.rs b/crates/extensions/c8y_config_manager/src/upload.rs index 6bc7844066b..faa577b683b 100644 --- a/crates/extensions/c8y_config_manager/src/upload.rs +++ b/crates/extensions/c8y_config_manager/src/upload.rs @@ -2,6 +2,8 @@ use crate::child_device::try_cleanup_config_file_from_file_transfer_repositoy; use crate::child_device::ChildConfigOperationKey; use crate::child_device::ConfigOperationMessage; use crate::child_device::DEFAULT_OPERATION_TIMEOUT; +use crate::plugin_config::ConfigEntry; +use crate::plugin_config::ExtEntry; use crate::plugin_config::InvalidConfigTypeError; use super::actor::ActiveOperationState; @@ -27,7 +29,12 @@ use log::error; use log::info; use log::warn; use std::collections::HashMap; +use std::fs::File; +use std::io::Write; use std::path::Path; +use std::path::PathBuf; +use std::process::Command; +use std::process::Stdio; use tedge_actors::Sender; use tedge_api::OperationStatus; use tedge_mqtt_ext::MqttMessage; @@ -37,6 +44,7 @@ use tedge_utils::file::create_directory_with_user_group; use tedge_utils::file::create_file_with_user_group; use tedge_utils::file::move_file; use tedge_utils::file::PermissionEntry; +use uuid::Uuid; pub struct ConfigUploadManager { config: ConfigManagerConfig, @@ -82,12 +90,18 @@ impl ConfigUploadManager { let plugin_config = PluginConfig::new(&self.config.plugin_config_path); + let target_config_type = &config_upload_request.config_type; + let upload_result = - match plugin_config.get_file_entry_from_type(&config_upload_request.config_type) { - Ok(file_entry) => { - let config_file_path = file_entry.path; + match plugin_config.get_config_entry_from_type(&config_upload_request.config_type) { + Ok(config_entry) => { + let config_file_path = match config_entry { + ConfigEntry::FileEntry(file_entry) => PathBuf::from(file_entry.path), + ConfigEntry::ExtEntry(ext_entry) => self.run_extension(ext_entry).await?, + }; + self.upload_config_file( - Path::new(config_file_path.as_str()), + config_file_path, &config_upload_request.config_type, None, message_box, @@ -97,8 +111,6 @@ impl ConfigUploadManager { Err(err) => Err(err.into()), }; - let target_config_type = &config_upload_request.config_type; - match upload_result { Ok(upload_event_url) => { info!("The configuration upload for '{target_config_type}' is successful."); @@ -118,6 +130,27 @@ impl ConfigUploadManager { Ok(()) } + async fn run_extension(&self, ext_entry: ExtEntry) -> Result { + let config_type = ext_entry.config_type.clone(); + let output = Command::new(ext_entry.exec) + .args(["get".into(), config_type]) + .stdout(Stdio::piped()) + .output()?; + + if output.status.success() { + let out_file_path = self.config.tmp_dir.join(Uuid::new_v4().to_string()); + let mut out_file = File::create(&out_file_path)?; + out_file.write_all(&output.stdout)?; + Ok(out_file_path) + } else { + Err(ConfigManagementError::ExecError( + output.status, + String::from_utf8(output.stderr) + .unwrap_or_else(|_| "non UTF-8 chars in stderr".into()), + )) + } + } + /// Map the c8y_UploadConfigFile request into a tedge/commands/req/config_snapshot command for the child device. /// The child device is expected to upload the config fie snapshot to the file transfer service. /// A unique URL path for this config file, from the file transfer service, is shared with the child device in the command. @@ -140,27 +173,35 @@ impl ConfigUploadManager { self.config.config_dir.display() ))); - match plugin_config.get_file_entry_from_type(&config_type) { - Ok(file_entry) => { - let config_management = ConfigOperationRequest::Snapshot { - child_id: child_id.clone(), - file_entry: file_entry.clone(), - }; - - let msg = MqttMessage::new( - &config_management.operation_request_topic(), - config_management.operation_request_payload(&self.config.tedge_http_host)?, - ); - message_box.send(msg.into()).await?; - info!("Config snapshot request for config type: {config_type} sent to child device: {child_id}"); - - self.active_child_ops - .insert(operation_key.clone(), ActiveOperationState::Pending); - - // Start the timer for operation timeout - message_box - .send(SetTimeout::new(DEFAULT_OPERATION_TIMEOUT, operation_key).into()) - .await?; + match plugin_config.get_config_entry_from_type(&config_type) { + Ok(config_entry) => { + match config_entry { + ConfigEntry::FileEntry(file_entry) => { + let config_management = ConfigOperationRequest::Snapshot { + child_id: child_id.clone(), + file_entry: file_entry.clone(), + }; + + let msg = MqttMessage::new( + &config_management.operation_request_topic(), + config_management + .operation_request_payload(&self.config.tedge_http_host)?, + ); + message_box.send(msg.into()).await?; + info!("Config snapshot request for config type: {config_type} sent to child device: {child_id}"); + + self.active_child_ops + .insert(operation_key.clone(), ActiveOperationState::Pending); + + // Start the timer for operation timeout + message_box + .send(SetTimeout::new(DEFAULT_OPERATION_TIMEOUT, operation_key).into()) + .await?; + } + ConfigEntry::ExtEntry(_) => { + warn!("Exec entries not supported for child devices"); + } + } } Err(InvalidConfigTypeError { config_type }) => { warn!( @@ -350,16 +391,16 @@ impl ConfigUploadManager { Ok(message) } - pub async fn upload_config_file( + pub async fn upload_config_file>( &mut self, - config_file_path: &Path, + config_file_path: P, config_type: &str, child_device_id: Option, message_box: &mut ConfigManagerMessageBox, ) -> Result { let url = message_box .c8y_http_proxy - .upload_config_file(config_file_path, config_type, child_device_id) + .upload_config_file(config_file_path.as_ref(), config_type, child_device_id) .await?; Ok(url) } diff --git a/docs/src/references/c8y-configuration-management.md b/docs/src/references/c8y-configuration-management.md index cd3b72c0250..72662d9c789 100644 --- a/docs/src/references/c8y-configuration-management.md +++ b/docs/src/references/c8y-configuration-management.md @@ -174,6 +174,47 @@ files = [ ] ``` +Config files need not always be just plain files that can be read from or written on to the file system. +Some applications might have non-persistent configurations which can be read/updated using some commands. +Even for file based configurations, while updating them, just replacing the old config file with a new one may not be enough. +You might wanna do some pre-processing steps like validating the downloaded file before applying it and +post-processing steps like reloading the service after updating the config and so on. + +The `c8y-configuration-plugin` supports `extensions` to support such complex config management routines. +Extensions can be any executable file that conforms to the following requirements: + +1. Support a `get` subcommand that accepts the config type as an argument and write the config for that type to `stdout`. + The `c8y-configuration-plugin` will capture this output to a file and upload that to the cloud. + In case of command failure, the exit code and `stderr` contents are sent to the cloud as failure reason. +2. Support a `set` subcommand that accepts the config type and the downloaded config file path as arguments. + In case of command failure, the exit code and `stderr` contents are sent to the cloud as failure reason. + The `stdout` contents are ignored irrespective of success or failure (TODO: Log them as operation logs). + +Such extensions can be added to the `c8y-configuration-plugin.toml` file as `exts` as follows: + +```toml +exts = [ + { exec = '/etc/tedge/config-plugins/camera-agent.sh', type = 'traffic-cam-1' }, +] + +files = [ + ... +] +``` + +With the above configuration, when you fetch the configuration snapshot for the `hallway-cam-1` type, +the plugin will execute the executable provided in `exec` as follows: + +```shell +/etc/tedge/config-plugins/camera-agent.sh get traffic-cam-1 +``` + +Similarly, when an update is pushed for the `traffic-cam-1`, the following command will be executed: + +```shell +/etc/tedge/config-plugins/camera-agent.sh set traffic-cam-1 /tmp/path/to/downloaded/config/file +``` + Along this `c8y-configuration-plugin` configuration for the main device, the configuration plugin expects a configuration file per child device that needs to be configured from the cloud. @@ -262,6 +303,15 @@ by the configuration of thin-edge: * `tedge config get tmp.path`: the directory where the files are updated before being copied atomically to their targets. +### Plugin Script Specification + +The config-plugin script provided in the `plugin` key must satisfy the following requirements: + +1. Support a `get` and `set` subcommands +2. The `get` command takes the config type as the argument and must return the config for that type +3. The `set` command takes the config type as well as the path to the updated config downloaded by the c8y-config-plugin as the argument and must return the config for that type + + ## Usage ```shell