Skip to content

Commit

Permalink
Remove ConfigManagerActor dependency on MQTT
Browse files Browse the repository at this point in the history
The builder still expect an MQTT connection.
This will be removed when connecting ConfigManagerActor to the workflow
system.

Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed May 17, 2024
1 parent c8bd36b commit a7f4836
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 59 deletions.
44 changes: 11 additions & 33 deletions crates/extensions/tedge_config_manager/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ pub type ConfigDownloadResult = (MqttTopic, DownloadResult);
pub type ConfigUploadRequest = (MqttTopic, UploadRequest);
pub type ConfigUploadResult = (MqttTopic, UploadResult);

fan_in_message_type!(ConfigInput[MqttMessage, FsWatchEvent, ConfigDownloadResult, ConfigUploadResult] : Debug);
fan_in_message_type!(ConfigInput[ConfigOperation, FsWatchEvent, ConfigDownloadResult, ConfigUploadResult] : Debug);

pub struct ConfigManagerActor {
config: ConfigManagerConfig,
plugin_config: PluginConfig,
pending_operations: HashMap<String, ConfigOperation>,
input_receiver: LoggingReceiver<ConfigInput>,
mqtt_publisher: LoggingSender<MqttMessage>,
output_sender: LoggingSender<ConfigOperationData>,
download_sender: DynSender<ConfigDownloadRequest>,
upload_sender: DynSender<ConfigUploadRequest>,
}
Expand All @@ -70,7 +70,9 @@ impl Actor for ConfigManagerActor {

while let Some(event) = self.input_receiver.recv().await {
let result = match event {
ConfigInput::MqttMessage(message) => self.process_mqtt_message(message).await,
ConfigInput::ConfigOperation(request) => {
self.process_operation_request(request).await
}
ConfigInput::FsWatchEvent(event) => self.process_file_watch_events(event).await,
ConfigInput::ConfigDownloadResult((topic, result)) => {
Ok(self.process_downloaded_config(&topic, result).await?)
Expand All @@ -94,7 +96,7 @@ impl ConfigManagerActor {
config: ConfigManagerConfig,
plugin_config: PluginConfig,
input_receiver: LoggingReceiver<ConfigInput>,
mqtt_publisher: LoggingSender<MqttMessage>,
output_sender: LoggingSender<ConfigOperationData>,
download_sender: DynSender<ConfigDownloadRequest>,
upload_sender: DynSender<ConfigUploadRequest>,
) -> Self {
Expand All @@ -103,29 +105,12 @@ impl ConfigManagerActor {
plugin_config,
pending_operations: HashMap::new(),
input_receiver,
mqtt_publisher,
output_sender,
download_sender,
upload_sender,
}
}

async fn process_mqtt_message(&mut self, message: MqttMessage) -> Result<(), ChannelError> {
match ConfigOperation::request_from_message(&self.config, &message) {
Ok(Some(request)) => self.process_operation_request(request).await?,
Ok(None) => {}
Err(ConfigManagementError::InvalidTopicError) => {
error!(
"Received unexpected message on topic: {}",
message.topic.name
);
}
Err(err) => {
error!("Incorrect log request payload: {}", err);
}
}
Ok(())
}

async fn process_operation_request(
&mut self,
request: ConfigOperation,
Expand Down Expand Up @@ -518,7 +503,7 @@ impl ConfigManagerActor {
topic: topic.clone(),
types: config_types.clone(),
};
self.mqtt_publisher.send(metadata.into()).await?;
self.output_sender.send(metadata).await?;
}
Ok(())
}
Expand All @@ -527,15 +512,8 @@ impl ConfigManagerActor {
&mut self,
operation: ConfigOperation,
) -> Result<(), ChannelError> {
self.publish_operation_data(ConfigOperationData::State(operation))
.await
}

async fn publish_operation_data(
&mut self,
data: ConfigOperationData,
) -> Result<(), ChannelError> {
self.mqtt_publisher.send(data.into()).await
let state = ConfigOperationData::State(operation);
self.output_sender.send(state).await
}
}

Expand All @@ -546,7 +524,7 @@ pub enum ConfigOperation {
}

impl ConfigOperation {
fn request_from_message(
pub(crate) fn request_from_message(
config: &ConfigManagerConfig,
message: &MqttMessage,
) -> Result<Option<Self>, ConfigManagementError> {
Expand Down
60 changes: 34 additions & 26 deletions crates/extensions/tedge_config_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ mod tests;

use actor::*;
pub use config::*;
use log::error;
use std::path::PathBuf;
use tedge_actors::futures::channel::mpsc;
use tedge_actors::Builder;
use tedge_actors::CloneSender;
use tedge_actors::DynSender;
use tedge_actors::LinkError;
use tedge_actors::LoggingReceiver;
use tedge_actors::LoggingSender;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;
use tedge_actors::Service;
use tedge_actors::SimpleMessageBoxBuilder;
use tedge_file_system_ext::FsWatchEvent;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::TopicFilter;
Expand All @@ -36,11 +36,9 @@ use toml::toml;
pub struct ConfigManagerBuilder {
config: ConfigManagerConfig,
plugin_config: PluginConfig,
receiver: LoggingReceiver<ConfigInput>,
mqtt_publisher: DynSender<MqttMessage>,
box_builder: SimpleMessageBoxBuilder<ConfigInput, ConfigOperationData>,
download_sender: DynSender<ConfigDownloadRequest>,
upload_sender: DynSender<ConfigUploadRequest>,
signal_sender: mpsc::Sender<RuntimeRequest>,
}

impl ConfigManagerBuilder {
Expand All @@ -54,36 +52,31 @@ impl ConfigManagerBuilder {
Self::init(&config).await?;

let plugin_config = PluginConfig::new(config.plugin_config_path.as_path());
let mut box_builder = SimpleMessageBoxBuilder::new("Tedge-Config-Manager", 16);

let (events_sender, events_receiver) = mpsc::channel(10);
let (signal_sender, signal_receiver) = mpsc::channel(10);
let receiver = LoggingReceiver::new(
"Tedge-Config-Manager".into(),
events_receiver,
signal_receiver,
mqtt.connect_source(NoConfig, &mut box_builder);
box_builder.connect_mapped_source(
Self::subscriptions(&config),
mqtt,
Self::mqtt_message_parser(&config),
);
let events_sender: DynSender<ConfigInput> = events_sender.into();

mqtt.connect_sink(Self::subscriptions(&config), &events_sender);
let mqtt_publisher = mqtt.get_sender();
let download_sender =
downloader_actor.connect_client(box_builder.get_sender().sender_clone());

let download_sender = downloader_actor.connect_client(events_sender.sender_clone());

let upload_sender = uploader_actor.connect_client(events_sender.sender_clone());
let upload_sender = uploader_actor.connect_client(box_builder.get_sender().sender_clone());

fs_notify.connect_sink(
ConfigManagerBuilder::watched_directory(&config),
&events_sender,
&box_builder.get_sender(),
);

Ok(ConfigManagerBuilder {
config,
plugin_config,
receiver,
mqtt_publisher,
box_builder,
download_sender,
upload_sender,
signal_sender,
})
}

Expand Down Expand Up @@ -147,25 +140,40 @@ impl ConfigManagerBuilder {
fn watched_directory(config: &ConfigManagerConfig) -> PathBuf {
config.plugin_config_dir.clone()
}

/// Extract a config actor request from an MQTT message
fn mqtt_message_parser(
config: &ConfigManagerConfig,
) -> impl Fn(MqttMessage) -> Option<ConfigInput> {
let config = config.clone();
move |message| match ConfigOperation::request_from_message(&config, &message) {
Ok(Some(request)) => Some(request.into()),
Ok(None) => None,
Err(err) => {
error!("Received invalid config request: {err}");
None
}
}
}
}

impl RuntimeRequestSink for ConfigManagerBuilder {
fn get_signal_sender(&self) -> DynSender<RuntimeRequest> {
Box::new(self.signal_sender.clone())
self.box_builder.get_signal_sender()
}
}

impl Builder<ConfigManagerActor> for ConfigManagerBuilder {
type Error = LinkError;

fn try_build(self) -> Result<ConfigManagerActor, Self::Error> {
let mqtt_publisher = LoggingSender::new("Tedge-Config-Manager".into(), self.mqtt_publisher);
let (output_sender, input_receiver) = self.box_builder.build().into_split();

Ok(ConfigManagerActor::new(
self.config,
self.plugin_config,
self.receiver,
mqtt_publisher,
input_receiver,
output_sender,
self.download_sender,
self.upload_sender,
))
Expand Down

0 comments on commit a7f4836

Please sign in to comment.