diff --git a/Cargo.lock b/Cargo.lock index ffa019d8f81..da6db733485 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3626,6 +3626,7 @@ dependencies = [ "anyhow", "clap", "tedge_actors", + "tedge_api", "tedge_config", "tedge_file_system_ext", "tedge_health_ext", diff --git a/crates/extensions/tedge_log_manager/src/actor.rs b/crates/extensions/tedge_log_manager/src/actor.rs index bae2552b5b0..6f2580837dd 100644 --- a/crates/extensions/tedge_log_manager/src/actor.rs +++ b/crates/extensions/tedge_log_manager/src/actor.rs @@ -88,9 +88,6 @@ impl LogManagerActor { Ok(Some(request)) => match request.status { CommandStatus::Init => { info!("Log request received: {request:?}"); - self.config - .current_operations - .insert(message.topic.name.clone()); self.start_executing_logfile_request(&message.topic, request) .await?; } @@ -99,9 +96,7 @@ impl LogManagerActor { self.handle_logfile_request_operation(&message.topic, request) .await?; } - CommandStatus::Successful | CommandStatus::Failed { .. } => { - self.config.current_operations.remove(&message.topic.name); - } + CommandStatus::Successful | CommandStatus::Failed { .. } => {} }, Ok(None) => {} Err(err) => { diff --git a/crates/extensions/tedge_log_manager/src/config.rs b/crates/extensions/tedge_log_manager/src/config.rs index 384800b4758..4b8ac862c79 100644 --- a/crates/extensions/tedge_log_manager/src/config.rs +++ b/crates/extensions/tedge_log_manager/src/config.rs @@ -1,6 +1,10 @@ -use std::collections::HashSet; use std::path::PathBuf; -use std::sync::Arc; +use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::ChannelFilter; +use tedge_api::mqtt_topics::EntityFilter; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::OperationType; use tedge_config::ReadError; use tedge_mqtt_ext::Topic; use tedge_mqtt_ext::TopicFilter; @@ -12,56 +16,45 @@ pub const DEFAULT_PLUGIN_CONFIG_DIR_NAME: &str = "plugins/"; #[derive(Clone, Debug)] pub struct LogManagerConfig { pub config_dir: PathBuf, - pub mqtt_topic_root: String, - pub mqtt_device_topic_id: String, pub plugin_config_dir: PathBuf, pub plugin_config_path: PathBuf, pub logtype_reload_topic: Topic, pub logfile_request_topic: TopicFilter, - pub current_operations: HashSet, } pub struct LogManagerOptions { pub config_dir: PathBuf, - pub mqtt_topic_root: Arc, - pub mqtt_device_topic_id: Arc, + pub mqtt_schema: MqttSchema, + pub mqtt_device_topic_id: EntityTopicId, } impl LogManagerConfig { pub fn from_options(cliopts: LogManagerOptions) -> Result { let config_dir = cliopts.config_dir; - let mqtt_topic_root = cliopts.mqtt_topic_root; + let mqtt_schema = cliopts.mqtt_schema; let mqtt_device_topic_id = cliopts.mqtt_device_topic_id; let plugin_config_dir = config_dir.join(DEFAULT_PLUGIN_CONFIG_DIR_NAME); let plugin_config_path = plugin_config_dir.join(DEFAULT_PLUGIN_CONFIG_FILE_NAME); - // TODO: move topic parsing to tedge_api - let logtype_reload_topic = Topic::new_unchecked( - format!( - "{}/{}/cmd/log_upload", - mqtt_topic_root, mqtt_device_topic_id - ) - .as_str(), + let logtype_reload_topic = mqtt_schema.topic_for( + &mqtt_device_topic_id, + &Channel::CommandMetadata { + operation: OperationType::LogUpload, + }, ); - let logfile_request_topic = TopicFilter::new_unchecked( - format!( - "{}/{}/cmd/log_upload/+", - mqtt_topic_root, mqtt_device_topic_id - ) - .as_str(), + + let logfile_request_topic = mqtt_schema.topics( + EntityFilter::Entity(&mqtt_device_topic_id), + ChannelFilter::Command(OperationType::LogUpload), ); - let current_operations = HashSet::new(); Ok(Self { config_dir, - mqtt_topic_root: mqtt_topic_root.to_string(), - mqtt_device_topic_id: mqtt_device_topic_id.to_string(), plugin_config_dir, plugin_config_path, logtype_reload_topic, logfile_request_topic, - current_operations, }) } } diff --git a/crates/extensions/tedge_log_manager/src/tests.rs b/crates/extensions/tedge_log_manager/src/tests.rs index c85da6fe847..1e8a3c72b4f 100644 --- a/crates/extensions/tedge_log_manager/src/tests.rs +++ b/crates/extensions/tedge_log_manager/src/tests.rs @@ -3,7 +3,6 @@ use crate::LogManagerConfig; use crate::Topic; use filetime::set_file_mtime; use filetime::FileTime; -use std::collections::HashSet; use std::path::Path; use std::time::Duration; use tedge_actors::test_helpers::MessageReceiverExt; @@ -91,13 +90,10 @@ fn new_log_manager_builder( ) { let config = LogManagerConfig { config_dir: temp_dir.to_path_buf(), - mqtt_topic_root: "te".to_string(), - mqtt_device_topic_id: "device/main//".to_string(), plugin_config_dir: temp_dir.to_path_buf(), plugin_config_path: temp_dir.join("tedge-log-plugin.toml"), logtype_reload_topic: Topic::new_unchecked("te/device/main///cmd/log_upload"), logfile_request_topic: TopicFilter::new_unchecked("te/device/main///cmd/log_upload/+"), - current_operations: HashSet::new(), }; let mut mqtt_builder: SimpleMessageBoxBuilder = diff --git a/plugins/tedge_log_plugin/Cargo.toml b/plugins/tedge_log_plugin/Cargo.toml index 2b2d125c342..59eaae82b23 100644 --- a/plugins/tedge_log_plugin/Cargo.toml +++ b/plugins/tedge_log_plugin/Cargo.toml @@ -13,11 +13,12 @@ repository = { workspace = true } [dependencies] anyhow = { workspace = true } clap = { workspace = true } -tedge_actors = { workspace = true } -tedge_config = { workspace = true } +tedge_actors = { workspace = true } +tedge_api = { workspace = true } +tedge_config = { workspace = true } tedge_file_system_ext = { workspace = true } tedge_health_ext = { workspace = true } -tedge_http_ext = { workspace = true } +tedge_http_ext = { workspace = true } tedge_log_manager = { workspace = true } tedge_mqtt_ext = { workspace = true } tedge_signal_ext = { workspace = true } diff --git a/plugins/tedge_log_plugin/src/lib.rs b/plugins/tedge_log_plugin/src/lib.rs index 3066f9c9a10..c4638afbe4d 100644 --- a/plugins/tedge_log_plugin/src/lib.rs +++ b/plugins/tedge_log_plugin/src/lib.rs @@ -2,6 +2,7 @@ use clap::Parser; use std::path::PathBuf; use std::sync::Arc; use tedge_actors::Runtime; +use tedge_api::mqtt_topics::MqttSchema; use tedge_config::system_services::get_log_level; use tedge_config::system_services::set_log_level; use tedge_config::TEdgeConfig; @@ -99,8 +100,8 @@ async fn run_with( // Instantiate log manager actor let log_manager_config = LogManagerConfig::from_options(LogManagerOptions { config_dir: cliopts.config_dir, - mqtt_device_topic_id, - mqtt_topic_root, + mqtt_schema: MqttSchema::with_root(mqtt_topic_root.to_string()), + mqtt_device_topic_id: mqtt_device_topic_id.to_string().parse()?, })?; let log_actor = LogManagerBuilder::try_new( log_manager_config,