Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operation Workflow Specification & Implementation #2071

Merged
merged 14 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ tedge-log-plugin = { path = "plugins/tedge_log_plugin" }
tedge-mapper = { path = "crates/core/tedge_mapper" }
tedge_mqtt_ext = { path = "crates/extensions/tedge_mqtt_ext" }
tedge_uploader_ext = { path = "crates/extensions/tedge_uploader_ext" }
tedge_script_ext = { path = "crates/extensions/tedge_script_ext" }
tedge_signal_ext = { path = "crates/extensions/tedge_signal_ext" }
tedge_test_utils = { path = "crates/tests/tedge_test_utils" }
tedge_timer_ext = { path = "crates/extensions/tedge_timer_ext" }
Expand Down
1 change: 1 addition & 0 deletions crates/core/tedge_agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tedge_file_system_ext = { workspace = true }
tedge_health_ext = { workspace = true }
tedge_log_manager = { workspace = true }
tedge_mqtt_ext = { workspace = true }
tedge_script_ext = { workspace = true }
tedge_signal_ext = { workspace = true }
tedge_uploader_ext = { workspace = true }
tedge_utils = { workspace = true }
Expand Down
56 changes: 56 additions & 0 deletions crates/core/tedge_agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,26 @@ use camino::Utf8PathBuf;
use flockfile::check_another_instance_is_not_running;
use flockfile::Flockfile;
use flockfile::FlockfileError;
use log::error;
use reqwest::Identity;
use std::ffi::OsStr;
use std::fmt::Debug;
use std::path::Path;
use std::sync::Arc;
use tedge_actors::Concurrent;
use tedge_actors::ConvertingActor;
use tedge_actors::ConvertingActorBuilder;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::Runtime;
use tedge_actors::ServerActorBuilder;
use tedge_api::mqtt_topics::DeviceTopicId;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::Service;
use tedge_api::path::DataDir;
use tedge_api::workflow::OperationWorkflow;
use tedge_api::workflow::WorkflowSupervisor;
use tedge_config_manager::ConfigManagerBuilder;
use tedge_config_manager::ConfigManagerConfig;
use tedge_config_manager::ConfigManagerOptions;
Expand All @@ -37,6 +44,7 @@ use tedge_log_manager::LogManagerOptions;
use tedge_mqtt_ext::MqttActorBuilder;
use tedge_mqtt_ext::MqttConfig;
use tedge_mqtt_ext::TopicFilter;
use tedge_script_ext::ScriptActor;
use tedge_signal_ext::SignalActor;
use tedge_uploader_ext::UploaderActor;
use tedge_utils::file::create_directory_with_defaults;
Expand All @@ -57,6 +65,7 @@ pub struct AgentConfig {
pub use_lock: bool,
pub log_dir: Utf8PathBuf,
pub data_dir: DataDir,
pub operations_dir: Utf8PathBuf,
pub mqtt_device_topic_id: EntityTopicId,
pub mqtt_topic_root: Arc<str>,
pub service_type: String,
Expand Down Expand Up @@ -115,6 +124,7 @@ impl AgentConfig {

// For agent specific
let log_dir = tedge_config.logs.path.join("tedge").join("agent");
let operations_dir = config_dir.join("operations");

let identity = tedge_config.http.client.auth.identity()?;

Expand All @@ -129,6 +139,7 @@ impl AgentConfig {
use_lock,
data_dir,
log_dir,
operations_dir,
mqtt_topic_root,
mqtt_device_topic_id,
service_type: tedge_config.service.ty.clone(),
Expand Down Expand Up @@ -165,6 +176,7 @@ impl Agent {
create_directory_with_defaults(&self.config.data_dir)?;
create_directory_with_defaults(&self.config.http_config.file_transfer_dir)?;
create_directory_with_defaults(self.config.data_dir.cache_dir())?;
create_directory_with_defaults(self.config.operations_dir.clone())?;

Ok(())
}
Expand Down Expand Up @@ -193,13 +205,19 @@ impl Agent {
let mut software_update_builder =
SoftwareManagerBuilder::new(self.config.sw_update_config.clone());

// Operation workflows
let workflows = self.load_operation_workflows().await?;
let mut script_runner: ServerActorBuilder<ScriptActor, Concurrent> = ScriptActor::builder();

// Converter actor
let converter_actor_builder = TedgeOperationConverterBuilder::new(
self.config.mqtt_topic_root.as_ref(),
self.config.mqtt_device_topic_id.clone(),
workflows,
&mut software_update_builder,
&mut restart_actor_builder,
&mut mqtt_actor_builder,
&mut script_runner,
);

// Shutdown on SIGINT
Expand Down Expand Up @@ -273,6 +291,7 @@ impl Agent {
runtime.spawn(mqtt_actor_builder).await?;
runtime.spawn(restart_actor_builder).await?;
runtime.spawn(software_update_builder).await?;
runtime.spawn(script_runner).await?;
runtime.spawn(converter_actor_builder).await?;
runtime.spawn(health_actor).await?;

Expand All @@ -293,6 +312,43 @@ impl Agent {

Ok(())
}

async fn load_operation_workflows(&self) -> Result<WorkflowSupervisor, anyhow::Error> {
let dir_path = &self.config.operations_dir;
let mut workflows = WorkflowSupervisor::default();
for entry in std::fs::read_dir(dir_path)?.flatten() {
let file = entry.path();
if file.extension() == Some(OsStr::new("toml")) {
match read_operation_workflow(&file)
.await
.and_then(|workflow| load_operation_workflow(&mut workflows, workflow))
{
Ok(cmd) => {
info!("Using operation workflow definition from {file:?} for '{cmd}' operation");
}
Err(err) => {
error!("Ignoring operation workflow definition from {file:?}: {err}")
}
};
}
}
Ok(workflows)
}
}

async fn read_operation_workflow(path: &Path) -> Result<OperationWorkflow, anyhow::Error> {
Ok(toml::from_str(std::str::from_utf8(
&tokio::fs::read(path).await?,
)?)?)
}

fn load_operation_workflow(
workflows: &mut WorkflowSupervisor,
workflow: OperationWorkflow,
) -> Result<String, anyhow::Error> {
let name = workflow.operation.to_string();
workflows.register_custom_workflow(workflow)?;
Ok(name)
}

pub fn create_tedge_to_te_converter(
Expand Down
3 changes: 2 additions & 1 deletion crates/core/tedge_agent/src/restart_manager/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ impl Actor for RestartManagerActor {
}

while let Some(request) = self.message_box.recv().await {
if request.status() != CommandStatus::Init {
if request.status() != CommandStatus::Scheduled {
// Only handle commands in the scheduled state
continue;
}
let executing_response = self.update_state_repository(request.clone()).await;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_agent/src/restart_manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn test_new_restart_operation() -> Result<(), DynError> {
target: EntityTopicId::default_main_device(),
cmd_id: "1234".to_string(),
payload: RestartCommandPayload {
status: CommandStatus::Init,
status: CommandStatus::Scheduled,
},
})
.await?;
Expand Down
8 changes: 4 additions & 4 deletions crates/core/tedge_agent/src/software_manager/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ impl SoftwareManagerActor {
plugins: &mut ExternalPlugins,
operation_logs: &OperationLogs,
) -> Result<(), SoftwareManagerError> {
if request.status() != CommandStatus::Init {
// Handle only the init state
if request.status() != CommandStatus::Scheduled {
// Only handle commands in the scheduled state
return Ok(());
}

Expand Down Expand Up @@ -249,8 +249,8 @@ impl SoftwareManagerActor {
plugins: &ExternalPlugins,
operation_logs: &OperationLogs,
) -> Result<(), SoftwareManagerError> {
if request.status() != CommandStatus::Init {
// Handle only the init state
if request.status() != CommandStatus::Scheduled {
// Only handle commands in the scheduled state
return Ok(());
}

Expand Down
5 changes: 3 additions & 2 deletions crates/core/tedge_agent/src/software_manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn test_new_software_update_operation() -> Result<(), DynError> {
target: EntityTopicId::default_main_device(),
cmd_id: "random".to_string(),
payload: SoftwareUpdateCommandPayload {
status: Default::default(),
status: CommandStatus::Scheduled,
update_list: vec![debian_list],
failures: vec![],
},
Expand Down Expand Up @@ -122,7 +122,8 @@ async fn test_new_software_list_operation() -> Result<(), DynError> {
let mut converter_box = spawn_software_manager(&temp_dir).await?;

let command =
SoftwareListCommand::new(&EntityTopicId::default_main_device(), "1234".to_string());
SoftwareListCommand::new(&EntityTopicId::default_main_device(), "1234".to_string())
.with_status(CommandStatus::Scheduled);
converter_box.send(command.clone().into()).await?;

let executing_response = command.clone().with_status(CommandStatus::Executing);
Expand Down
Loading