Skip to content

Commit

Permalink
Merge pull request #2305 from Ruadhri17/log-plugin-mqtt-schema
Browse files Browse the repository at this point in the history
Parse log plugin topics using new mqtt schema
  • Loading branch information
Ruadhri17 authored Oct 6, 2023
2 parents 5d57ed1 + faac8d9 commit c6e0126
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 40 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 1 addition & 6 deletions crates/extensions/tedge_log_manager/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ impl LogManagerActor {
Ok(Some(request)) => match request.status {
CommandStatus::Init => {
info!("Log request received: {request:?}");
self.config
.current_operations
.insert(message.topic.name.clone());
self.start_executing_logfile_request(&message.topic, request)
.await?;
}
Expand All @@ -99,9 +96,7 @@ impl LogManagerActor {
self.handle_logfile_request_operation(&message.topic, request)
.await?;
}
CommandStatus::Successful | CommandStatus::Failed { .. } => {
self.config.current_operations.remove(&message.topic.name);
}
CommandStatus::Successful | CommandStatus::Failed { .. } => {}
},
Ok(None) => {}
Err(err) => {
Expand Down
43 changes: 18 additions & 25 deletions crates/extensions/tedge_log_manager/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::ChannelFilter;
use tedge_api::mqtt_topics::EntityFilter;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::OperationType;
use tedge_config::ReadError;
use tedge_mqtt_ext::Topic;
use tedge_mqtt_ext::TopicFilter;
Expand All @@ -12,56 +16,45 @@ pub const DEFAULT_PLUGIN_CONFIG_DIR_NAME: &str = "plugins/";
#[derive(Clone, Debug)]
pub struct LogManagerConfig {
pub config_dir: PathBuf,
pub mqtt_topic_root: String,
pub mqtt_device_topic_id: String,
pub plugin_config_dir: PathBuf,
pub plugin_config_path: PathBuf,
pub logtype_reload_topic: Topic,
pub logfile_request_topic: TopicFilter,
pub current_operations: HashSet<String>,
}

pub struct LogManagerOptions {
pub config_dir: PathBuf,
pub mqtt_topic_root: Arc<str>,
pub mqtt_device_topic_id: Arc<str>,
pub mqtt_schema: MqttSchema,
pub mqtt_device_topic_id: EntityTopicId,
}

impl LogManagerConfig {
pub fn from_options(cliopts: LogManagerOptions) -> Result<Self, ReadError> {
let config_dir = cliopts.config_dir;
let mqtt_topic_root = cliopts.mqtt_topic_root;
let mqtt_schema = cliopts.mqtt_schema;
let mqtt_device_topic_id = cliopts.mqtt_device_topic_id;

let plugin_config_dir = config_dir.join(DEFAULT_PLUGIN_CONFIG_DIR_NAME);
let plugin_config_path = plugin_config_dir.join(DEFAULT_PLUGIN_CONFIG_FILE_NAME);

// TODO: move topic parsing to tedge_api
let logtype_reload_topic = Topic::new_unchecked(
format!(
"{}/{}/cmd/log_upload",
mqtt_topic_root, mqtt_device_topic_id
)
.as_str(),
let logtype_reload_topic = mqtt_schema.topic_for(
&mqtt_device_topic_id,
&Channel::CommandMetadata {
operation: OperationType::LogUpload,
},
);
let logfile_request_topic = TopicFilter::new_unchecked(
format!(
"{}/{}/cmd/log_upload/+",
mqtt_topic_root, mqtt_device_topic_id
)
.as_str(),

let logfile_request_topic = mqtt_schema.topics(
EntityFilter::Entity(&mqtt_device_topic_id),
ChannelFilter::Command(OperationType::LogUpload),
);
let current_operations = HashSet::new();

Ok(Self {
config_dir,
mqtt_topic_root: mqtt_topic_root.to_string(),
mqtt_device_topic_id: mqtt_device_topic_id.to_string(),
plugin_config_dir,
plugin_config_path,
logtype_reload_topic,
logfile_request_topic,
current_operations,
})
}
}
4 changes: 0 additions & 4 deletions crates/extensions/tedge_log_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::LogManagerConfig;
use crate::Topic;
use filetime::set_file_mtime;
use filetime::FileTime;
use std::collections::HashSet;
use std::path::Path;
use std::time::Duration;
use tedge_actors::test_helpers::MessageReceiverExt;
Expand Down Expand Up @@ -91,13 +90,10 @@ fn new_log_manager_builder(
) {
let config = LogManagerConfig {
config_dir: temp_dir.to_path_buf(),
mqtt_topic_root: "te".to_string(),
mqtt_device_topic_id: "device/main//".to_string(),
plugin_config_dir: temp_dir.to_path_buf(),
plugin_config_path: temp_dir.join("tedge-log-plugin.toml"),
logtype_reload_topic: Topic::new_unchecked("te/device/main///cmd/log_upload"),
logfile_request_topic: TopicFilter::new_unchecked("te/device/main///cmd/log_upload/+"),
current_operations: HashSet::new(),
};

let mut mqtt_builder: SimpleMessageBoxBuilder<MqttMessage, MqttMessage> =
Expand Down
7 changes: 4 additions & 3 deletions plugins/tedge_log_plugin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ repository = { workspace = true }
[dependencies]
anyhow = { workspace = true }
clap = { workspace = true }
tedge_actors = { workspace = true }
tedge_config = { workspace = true }
tedge_actors = { workspace = true }
tedge_api = { workspace = true }
tedge_config = { workspace = true }
tedge_file_system_ext = { workspace = true }
tedge_health_ext = { workspace = true }
tedge_http_ext = { workspace = true }
tedge_http_ext = { workspace = true }
tedge_log_manager = { workspace = true }
tedge_mqtt_ext = { workspace = true }
tedge_signal_ext = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions plugins/tedge_log_plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use clap::Parser;
use std::path::PathBuf;
use std::sync::Arc;
use tedge_actors::Runtime;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_config::system_services::get_log_level;
use tedge_config::system_services::set_log_level;
use tedge_config::TEdgeConfig;
Expand Down Expand Up @@ -99,8 +100,8 @@ async fn run_with(
// Instantiate log manager actor
let log_manager_config = LogManagerConfig::from_options(LogManagerOptions {
config_dir: cliopts.config_dir,
mqtt_device_topic_id,
mqtt_topic_root,
mqtt_schema: MqttSchema::with_root(mqtt_topic_root.to_string()),
mqtt_device_topic_id: mqtt_device_topic_id.to_string().parse()?,
})?;
let log_actor = LogManagerBuilder::try_new(
log_manager_config,
Expand Down

0 comments on commit c6e0126

Please sign in to comment.