Skip to content

Commit

Permalink
Dynamically register config snapshot and upload operations
Browse files Browse the repository at this point in the history
These operations can now be triggered as a sub-workflow.

Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed May 17, 2024
1 parent a7f4836 commit cf99f0d
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 26 deletions.
17 changes: 8 additions & 9 deletions crates/core/tedge_agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,16 +297,15 @@ impl Agent {
is_sudo_enabled: self.config.is_sudo_enabled,
config_update_enabled: self.config.capabilities.config_update,
})?;
Some(
ConfigManagerBuilder::try_new(
manager_config,
&mut mqtt_actor_builder,
&mut fs_watch_actor_builder,
&mut downloader_actor_builder,
&mut uploader_actor_builder,
)
.await?,
let mut config_manager = ConfigManagerBuilder::try_new(
manager_config,
&mut fs_watch_actor_builder,
&mut downloader_actor_builder,
&mut uploader_actor_builder,
)
.await?;
converter_actor_builder.register_builtin_operation(&mut config_manager);
Some(config_manager)
} else if self.config.capabilities.config_update {
warn!("Config_snapshot operation must be enabled to run config_update!");
None
Expand Down
10 changes: 4 additions & 6 deletions crates/extensions/tedge_config_manager/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl ConfigManagerActor {
) -> Result<(), ChannelError> {
match request {
ConfigOperation::Snapshot(topic, request) => match request.status {
CommandStatus::Init => {
CommandStatus::Init | CommandStatus::Scheduled => {
info!("Config Snapshot received: {request:?}");
self.start_executing_config_request(ConfigOperation::Snapshot(topic, request))
.await?;
Expand All @@ -126,13 +126,12 @@ impl ConfigManagerActor {
debug!("Executing log request: {request:?}");
self.handle_config_snapshot_request(topic, request).await?;
}
CommandStatus::Scheduled
| CommandStatus::Unknown
CommandStatus::Unknown
| CommandStatus::Successful
| CommandStatus::Failed { .. } => {}
},
ConfigOperation::Update(topic, request) => match request.status {
CommandStatus::Init => {
CommandStatus::Init | CommandStatus::Scheduled => {
info!("Config Update received: {request:?}");
self.start_executing_config_request(ConfigOperation::Update(topic, request))
.await?;
Expand All @@ -141,8 +140,7 @@ impl ConfigManagerActor {
debug!("Executing log request: {request:?}");
self.handle_config_update_request(topic, request).await?;
}
CommandStatus::Scheduled
| CommandStatus::Unknown
CommandStatus::Unknown
| CommandStatus::Successful
| CommandStatus::Failed { .. } => {}
},
Expand Down
102 changes: 93 additions & 9 deletions crates/extensions/tedge_config_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,29 @@ mod tests;
use actor::*;
pub use config::*;
use log::error;
use serde_json::json;
use std::path::PathBuf;
use tedge_actors::Builder;
use tedge_actors::CloneSender;
use tedge_actors::DynSender;
use tedge_actors::LinkError;
use tedge_actors::MappingSender;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;
use tedge_actors::Service;
use tedge_actors::SimpleMessageBoxBuilder;
use tedge_api::commands::ConfigSnapshotCmd;
use tedge_api::commands::ConfigUpdateCmd;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::workflow::GenericCommandData;
use tedge_api::workflow::GenericCommandMetadata;
use tedge_api::workflow::GenericCommandState;
use tedge_api::workflow::OperationName;
use tedge_api::Jsonify;
use tedge_file_system_ext::FsWatchEvent;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::TopicFilter;
Expand All @@ -44,22 +55,14 @@ pub struct ConfigManagerBuilder {
impl ConfigManagerBuilder {
pub async fn try_new(
config: ConfigManagerConfig,
mqtt: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage>),
fs_notify: &mut impl MessageSource<FsWatchEvent, PathBuf>,
downloader_actor: &mut impl Service<ConfigDownloadRequest, ConfigDownloadResult>,
uploader_actor: &mut impl Service<ConfigUploadRequest, ConfigUploadResult>,
) -> Result<Self, FileError> {
Self::init(&config).await?;

let plugin_config = PluginConfig::new(config.plugin_config_path.as_path());
let mut box_builder = SimpleMessageBoxBuilder::new("Tedge-Config-Manager", 16);

mqtt.connect_source(NoConfig, &mut box_builder);
box_builder.connect_mapped_source(
Self::subscriptions(&config),
mqtt,
Self::mqtt_message_parser(&config),
);
let box_builder = SimpleMessageBoxBuilder::new("Tedge-Config-Manager", 16);

let download_sender =
downloader_actor.connect_client(box_builder.get_sender().sender_clone());
Expand All @@ -80,6 +83,18 @@ impl ConfigManagerBuilder {
})
}

pub fn connect_mqtt(
&mut self,
mqtt: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage>),
) {
mqtt.connect_source(NoConfig, &mut self.box_builder);
self.box_builder.connect_mapped_source(
Self::subscriptions(&self.config),
mqtt,
Self::mqtt_message_parser(&self.config),
);
}

pub async fn init(config: &ConfigManagerConfig) -> Result<(), FileError> {
if config.plugin_config_path.exists() {
return Ok(());
Expand Down Expand Up @@ -179,3 +194,72 @@ impl Builder<ConfigManagerActor> for ConfigManagerBuilder {
))
}
}

impl MessageSource<GenericCommandData, NoConfig> for ConfigManagerBuilder {
fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink<GenericCommandData>) {
self.box_builder.connect_mapped_sink(
config,
&peer.get_sender(),
|data: ConfigOperationData| match data {
ConfigOperationData::State(ConfigOperation::Snapshot(topic, payload)) => Some(
GenericCommandState::new(topic, payload.status.to_string(), payload.to_value())
.into(),
),
ConfigOperationData::State(ConfigOperation::Update(topic, payload)) => Some(
GenericCommandState::new(topic, payload.status.to_string(), payload.to_value())
.into(),
),
ConfigOperationData::Metadata { topic, types } => {
let Some(operation) = MqttSchema::get_operation_name(topic.as_ref()) else {
return None;
};
Some(GenericCommandData::Metadata(GenericCommandMetadata {
operation,
payload: json!( {
"types": types
}),
}))
}
},
)
}
}

impl IntoIterator for &ConfigManagerBuilder {
type Item = (OperationName, DynSender<GenericCommandState>);
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
vec![
(
OperationType::ConfigSnapshot.to_string(),
MappingSender::new(
self.box_builder.get_sender(),
generic_command_into_snapshot_request,
)
.into(),
),
(
OperationType::ConfigUpdate.to_string(),
MappingSender::new(
self.box_builder.get_sender(),
generic_command_into_update_request,
)
.into(),
),
]
.into_iter()
}
}

fn generic_command_into_snapshot_request(cmd: GenericCommandState) -> Option<ConfigInput> {
let topic = cmd.topic.clone();
let cmd = ConfigSnapshotCmd::try_from(cmd).ok()?;
Some(ConfigOperation::Snapshot(topic, cmd.payload).into())
}

fn generic_command_into_update_request(cmd: GenericCommandState) -> Option<ConfigInput> {
let topic = cmd.topic.clone();
let cmd = ConfigUpdateCmd::try_from(cmd).ok()?;
Some(ConfigOperation::Update(topic, cmd.payload).into())
}
5 changes: 3 additions & 2 deletions crates/extensions/tedge_config_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,17 @@ async fn new_config_manager_builder(
let mut uploader_builder: FakeServerBoxBuilder<ConfigUploadRequest, ConfigUploadResult> =
FakeServerBoxBuilder::default();

let config_builder = ConfigManagerBuilder::try_new(
let mut config_builder = ConfigManagerBuilder::try_new(
config,
&mut mqtt_builder,
&mut fs_watcher_builder,
&mut downloader_builder,
&mut uploader_builder,
)
.await
.unwrap();

config_builder.connect_mqtt(&mut mqtt_builder);

(
config_builder,
mqtt_builder.build().with_timeout(TEST_TIMEOUT_MS),
Expand Down

0 comments on commit cf99f0d

Please sign in to comment.