Skip to content

Commit

Permalink
Merge pull request #2844 from didier-wenzek/feat/config-and-log-manag…
Browse files Browse the repository at this point in the history
…ement-workflows

feat: Config and log management workflows
  • Loading branch information
didier-wenzek authored May 31, 2024
2 parents 3a236f0 + 15b7e26 commit 08577c9
Show file tree
Hide file tree
Showing 40 changed files with 1,199 additions and 480 deletions.
4 changes: 2 additions & 2 deletions crates/common/mqtt_channel/src/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ impl Topic {
/// An MQTT topic filter
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct TopicFilter {
pub patterns: Vec<String>,
pub qos: QoS,
patterns: Vec<String>,
qos: QoS,
}

impl Default for TopicFilter {
Expand Down
6 changes: 3 additions & 3 deletions crates/core/c8y_api/src/json_c8y.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub struct C8ySoftwareModuleItem {
pub url: Option<DownloadInfo>,
}

impl<'a> Jsonify<'a> for C8ySoftwareModuleItem {}
impl Jsonify for C8ySoftwareModuleItem {}

impl From<SoftwareModule> for C8ySoftwareModuleItem {
fn from(module: SoftwareModule) -> Self {
Expand All @@ -108,7 +108,7 @@ pub struct C8yUpdateSoftwareListResponse {
c8y_software_list: Option<Vec<C8ySoftwareModuleItem>>,
}

impl<'a> Jsonify<'a> for C8yUpdateSoftwareListResponse {}
impl Jsonify for C8yUpdateSoftwareListResponse {}

impl From<&SoftwareListCommand> for C8yUpdateSoftwareListResponse {
fn from(list: &SoftwareListCommand) -> Self {
Expand Down Expand Up @@ -163,7 +163,7 @@ impl From<ThinEdgeEvent> for C8yCreateEvent {
}
}

impl<'a> Jsonify<'a> for C8yCreateEvent {}
impl Jsonify for C8yCreateEvent {}

fn update_the_external_source_event(extras: &mut HashMap<String, Value>, source: &str) {
let mut value = serde_json::Map::new();
Expand Down
23 changes: 22 additions & 1 deletion crates/core/tedge_actors/src/builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ pub trait MessageSink<M: Message> {
cast: MessageMapper,
) where
N: Message,
MS: Iterator<Item = M> + Send,
MS: IntoIterator<Item = M> + Send,
MS::IntoIter: Send,
MessageMapper: Fn(N) -> MS,
MessageMapper: 'static + Send + Sync,
{
Expand Down Expand Up @@ -193,6 +194,26 @@ pub trait MessageSource<M: Message, Config> {
/// A peer can subscribe to a subset of the messages produced by this source.
/// This subset of messages expected by the peer is defined by the `config` parameter.
fn connect_sink(&mut self, config: Config, peer: &impl MessageSink<M>);

/// Connect a peer actor that will consume transformed messages produced by this actor.
///
/// The transformation function will be applied to the messages sent by the source,
/// to convert them in a sequence, possibly empty, of messages forwarded to the sink.
fn connect_mapped_sink<N, NS, MessageMapper>(
&mut self,
config: Config,
peer: &impl MessageSink<N>,
cast: MessageMapper,
) where
N: Message,
NS: IntoIterator<Item = N> + Send,
NS::IntoIter: Send,
MessageMapper: Fn(M) -> NS,
MessageMapper: 'static + Send + Sync,
{
let sender: DynSender<M> = MappingSender::new(peer.get_sender(), cast).into();
self.connect_sink(config, &sender)
}
}

/// The [Builder] of an [Actor](crate::Actor) must implement this trait
Expand Down
3 changes: 2 additions & 1 deletion crates/core/tedge_actors/src/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ impl<M, N, NS, F> Sender<M> for MappingSender<F, N>
where
M: Message,
N: Message,
NS: Iterator<Item = N> + Send,
NS: IntoIterator<Item = N> + Send,
NS::IntoIter: Send,
F: Fn(M) -> NS,
F: 'static + Sync + Send,
{
Expand Down
38 changes: 18 additions & 20 deletions crates/core/tedge_agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,14 @@ impl Agent {
let mut software_update_builder = SoftwareManagerBuilder::new(self.config.sw_update_config);

// Converter actor
let converter_actor_builder = TedgeOperationConverterBuilder::new(
let mut converter_actor_builder = TedgeOperationConverterBuilder::new(
self.config.operation_config,
workflows,
&mut software_update_builder,
&mut restart_actor_builder,
&mut mqtt_actor_builder,
&mut script_runner,
);
converter_actor_builder.register_builtin_operation(&mut restart_actor_builder);
converter_actor_builder.register_builtin_operation(&mut software_update_builder);

// Shutdown on SIGINT
let signal_actor_builder = SignalActor::builder(&runtime.get_handle());
Expand Down Expand Up @@ -297,16 +297,15 @@ impl Agent {
is_sudo_enabled: self.config.is_sudo_enabled,
config_update_enabled: self.config.capabilities.config_update,
})?;
Some(
ConfigManagerBuilder::try_new(
manager_config,
&mut mqtt_actor_builder,
&mut fs_watch_actor_builder,
&mut downloader_actor_builder,
&mut uploader_actor_builder,
)
.await?,
let mut config_manager = ConfigManagerBuilder::try_new(
manager_config,
&mut fs_watch_actor_builder,
&mut downloader_actor_builder,
&mut uploader_actor_builder,
)
.await?;
converter_actor_builder.register_builtin_operation(&mut config_manager);
Some(config_manager)
} else if self.config.capabilities.config_update {
warn!("Config_snapshot operation must be enabled to run config_update!");
None
Expand All @@ -323,15 +322,14 @@ impl Agent {
mqtt_schema: mqtt_schema.clone(),
mqtt_device_topic_id: self.config.mqtt_device_topic_id.clone(),
})?;
Some(
LogManagerBuilder::try_new(
log_manager_config,
&mut mqtt_actor_builder,
&mut fs_watch_actor_builder,
&mut uploader_actor_builder,
)
.await?,
let mut log_actor = LogManagerBuilder::try_new(
log_manager_config,
&mut fs_watch_actor_builder,
&mut uploader_actor_builder,
)
.await?;
converter_actor_builder.register_builtin_operation(&mut log_actor);
Some(log_actor)
} else {
None
};
Expand Down
24 changes: 24 additions & 0 deletions crates/core/tedge_agent/src/restart_manager/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ use crate::restart_manager::config::RestartManagerConfig;
use tedge_actors::Builder;
use tedge_actors::DynSender;
use tedge_actors::LinkError;
use tedge_actors::MappingSender;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;
use tedge_actors::SimpleMessageBoxBuilder;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::workflow::GenericCommandData;
use tedge_api::workflow::GenericCommandState;
use tedge_api::workflow::OperationName;
use tedge_api::RestartCommand;

pub struct RestartManagerBuilder {
Expand Down Expand Up @@ -39,6 +44,25 @@ impl MessageSource<RestartCommand, NoConfig> for RestartManagerBuilder {
}
}

impl MessageSource<GenericCommandData, NoConfig> for RestartManagerBuilder {
fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink<GenericCommandData>) {
self.message_box.connect_sink(config, &peer.get_sender())
}
}

impl IntoIterator for &RestartManagerBuilder {
type Item = (OperationName, DynSender<GenericCommandState>);
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
let sender =
MappingSender::new(self.message_box.get_sender(), |msg: GenericCommandState| {
msg.try_into().ok()
});
vec![(OperationType::Restart.to_string(), sender.into())].into_iter()
}
}

impl RuntimeRequestSink for RestartManagerBuilder {
fn get_signal_sender(&self) -> DynSender<RuntimeRequest> {
self.message_box.get_signal_sender()
Expand Down
32 changes: 32 additions & 0 deletions crates/core/tedge_agent/src/software_manager/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ use tedge_api::commands::CommandStatus;
use tedge_api::commands::SoftwareCommandMetadata;
use tedge_api::commands::SoftwareListCommand;
use tedge_api::commands::SoftwareUpdateCommand;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::workflow::GenericCommandData;
use tedge_api::workflow::GenericCommandMetadata;
use tedge_api::workflow::GenericCommandState;
use tedge_api::Jsonify;
use tedge_api::SoftwareType;
use tedge_config::TEdgeConfigError;
use tracing::error;
Expand All @@ -35,6 +40,33 @@ use tracing::warn;

fan_in_message_type!(SoftwareCommand[SoftwareUpdateCommand, SoftwareListCommand, SoftwareCommandMetadata] : Debug, Eq, PartialEq, Deserialize, Serialize);

impl SoftwareCommand {
pub fn into_generic_commands(self) -> Vec<GenericCommandData> {
match self {
SoftwareCommand::SoftwareUpdateCommand(cmd) => {
vec![GenericCommandState::from(cmd).into()]
}
SoftwareCommand::SoftwareListCommand(cmd) => {
vec![GenericCommandState::from(cmd).into()]
}
SoftwareCommand::SoftwareCommandMetadata(metadata) => {
vec![
GenericCommandMetadata {
operation: OperationType::SoftwareList.to_string(),
payload: metadata.to_value(),
}
.into(),
GenericCommandMetadata {
operation: OperationType::SoftwareUpdate.to_string(),
payload: metadata.to_value(),
}
.into(),
]
}
}
}
}

/// Actor which performs software operations.
///
/// This actor takes as input [`SoftwareRequest`]s, and responds with
Expand Down
48 changes: 48 additions & 0 deletions crates/core/tedge_agent/src/software_manager/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@ use crate::software_manager::config::SoftwareManagerConfig;
use tedge_actors::Builder;
use tedge_actors::DynSender;
use tedge_actors::LinkError;
use tedge_actors::MappingSender;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;
use tedge_actors::SimpleMessageBoxBuilder;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::workflow::GenericCommandData;
use tedge_api::workflow::GenericCommandState;
use tedge_api::workflow::OperationName;
use tedge_api::SoftwareListCommand;
use tedge_api::SoftwareUpdateCommand;

pub struct SoftwareManagerBuilder {
config: SoftwareManagerConfig,
Expand Down Expand Up @@ -45,6 +52,47 @@ impl RuntimeRequestSink for SoftwareManagerBuilder {
}
}

impl MessageSource<GenericCommandData, NoConfig> for SoftwareManagerBuilder {
fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink<GenericCommandData>) {
self.message_box
.connect_mapped_sink(config, &peer.get_sender(), |msg: SoftwareCommand| {
msg.into_generic_commands()
})
}
}

impl IntoIterator for &SoftwareManagerBuilder {
type Item = (OperationName, DynSender<GenericCommandState>);
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
let software_list_sender =
MappingSender::new(self.message_box.get_sender(), |msg: GenericCommandState| {
SoftwareListCommand::try_from(msg)
.map(SoftwareCommand::SoftwareListCommand)
.ok()
});
let software_update_sender =
MappingSender::new(self.message_box.get_sender(), |msg: GenericCommandState| {
SoftwareUpdateCommand::try_from(msg)
.map(SoftwareCommand::SoftwareUpdateCommand)
.ok()
})
.into();
vec![
(
OperationType::SoftwareList.to_string(),
software_list_sender.into(),
),
(
OperationType::SoftwareUpdate.to_string(),
software_update_sender,
),
]
.into_iter()
}
}

impl Builder<SoftwareManagerActor> for SoftwareManagerBuilder {
type Error = LinkError;

Expand Down
Loading

0 comments on commit 08577c9

Please sign in to comment.