Skip to content

Commit

Permalink
Introduce a new Scheduled state for operations
Browse files Browse the repository at this point in the history
The motivation is to be able to register a custom version of a built-in
workflow. The mappers still create commands in the Init state, but the
built-in operations only react on the Schedule state. If no custom
version of the workflow has been provided, the agent moves operation
requests from the Init to the Scheduled state. If a custom version is
provided, then this user-defined workflow can add extra checks and steps
before triggering the built-in operation steps (by putting the operation
in the Schedule state).

As of now, the log and config operation are unchanged (starting on Init
and ignoring the Schedule state).

Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Nov 22, 2023
1 parent 4bc4e92 commit 9b8a18f
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 33 deletions.
3 changes: 2 additions & 1 deletion crates/core/tedge_agent/src/restart_manager/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ impl Actor for RestartManagerActor {
}

while let Some(request) = self.message_box.recv().await {
if request.status() != CommandStatus::Init {
if request.status() != CommandStatus::Scheduled {
// Only handle commands in the scheduled state
continue;
}
let executing_response = self.update_state_repository(request.clone()).await;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_agent/src/restart_manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn test_new_restart_operation() -> Result<(), DynError> {
target: EntityTopicId::default_main_device(),
cmd_id: "1234".to_string(),
payload: RestartCommandPayload {
status: CommandStatus::Init,
status: CommandStatus::Scheduled,
},
})
.await?;
Expand Down
8 changes: 4 additions & 4 deletions crates/core/tedge_agent/src/software_manager/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ impl SoftwareManagerActor {
plugins: &mut ExternalPlugins,
operation_logs: &OperationLogs,
) -> Result<(), SoftwareManagerError> {
if request.status() != CommandStatus::Init {
// Handle only the init state
if request.status() != CommandStatus::Scheduled {
// Only handle commands in the scheduled state
return Ok(());
}

Expand Down Expand Up @@ -249,8 +249,8 @@ impl SoftwareManagerActor {
plugins: &ExternalPlugins,
operation_logs: &OperationLogs,
) -> Result<(), SoftwareManagerError> {
if request.status() != CommandStatus::Init {
// Handle only the init state
if request.status() != CommandStatus::Scheduled {
// Only handle commands in the scheduled state
return Ok(());
}

Expand Down
5 changes: 3 additions & 2 deletions crates/core/tedge_agent/src/software_manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn test_new_software_update_operation() -> Result<(), DynError> {
target: EntityTopicId::default_main_device(),
cmd_id: "random".to_string(),
payload: SoftwareUpdateCommandPayload {
status: Default::default(),
status: CommandStatus::Scheduled,
update_list: vec![debian_list],
failures: vec![],
},
Expand Down Expand Up @@ -122,7 +122,8 @@ async fn test_new_software_list_operation() -> Result<(), DynError> {
let mut converter_box = spawn_software_manager(&temp_dir).await?;

let command =
SoftwareListCommand::new(&EntityTopicId::default_main_device(), "1234".to_string());
SoftwareListCommand::new(&EntityTopicId::default_main_device(), "1234".to_string())
.with_status(CommandStatus::Scheduled);
converter_box.send(command.clone().into()).await?;

let executing_response = command.clone().with_status(CommandStatus::Executing);
Expand Down
13 changes: 7 additions & 6 deletions crates/core/tedge_agent/src/tedge_operation_converter/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn convert_incoming_software_list_request() -> Result<(), DynError> {
// Simulate SoftwareList MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked("te/device/main///cmd/software_list/some-cmd-id"),
r#"{ "status": "init" }"#,
r#"{ "status": "scheduled" }"#,
);
mqtt_box.send(mqtt_message).await?;

Expand All @@ -47,7 +47,8 @@ async fn convert_incoming_software_list_request() -> Result<(), DynError> {
.assert_received([SoftwareListCommand::new(
&EntityTopicId::default_main_device(),
"some-cmd-id".to_string(),
)])
)
.with_status(CommandStatus::Scheduled)])
.await;
Ok(())
}
Expand All @@ -61,7 +62,7 @@ async fn convert_incoming_software_update_request() -> Result<(), DynError> {
// Simulate SoftwareUpdate MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked("te/device/child001///cmd/software_update/1234"),
r#"{"status":"init","updateList":[{"type":"debian","modules":[{"name":"debian1","version":"0.0.1","action":"install"}]}]}"#,
r#"{"status":"scheduled","updateList":[{"type":"debian","modules":[{"name":"debian1","version":"0.0.1","action":"install"}]}]}"#,
);
mqtt_box.send(mqtt_message).await?;

Expand All @@ -84,7 +85,7 @@ async fn convert_incoming_software_update_request() -> Result<(), DynError> {
target: EntityTopicId::default_child_device("child001").unwrap(),
cmd_id: "1234".to_string(),
payload: SoftwareUpdateCommandPayload {
status: CommandStatus::Init,
status: CommandStatus::Scheduled,
update_list: vec![debian_list],
failures: vec![],
},
Expand All @@ -105,7 +106,7 @@ async fn convert_incoming_restart_request() -> Result<(), DynError> {
// Simulate Restart MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked(&format!("te/{target_device}/cmd/restart/random")),
r#"{"status": "init"}"#,
r#"{"status": "scheduled"}"#,
);
mqtt_box.send(mqtt_message).await?;

Expand All @@ -115,7 +116,7 @@ async fn convert_incoming_restart_request() -> Result<(), DynError> {
target: target_device.parse()?,
cmd_id: "random".to_string(),
payload: RestartCommandPayload {
status: CommandStatus::Init,
status: CommandStatus::Scheduled,
},
}])
.await;
Expand Down
1 change: 1 addition & 0 deletions crates/core/tedge_api/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ impl CommandPayload for RestartCommandPayload {
pub enum CommandStatus {
#[default]
Init,
Scheduled,
Executing,
Successful,
Failed {
Expand Down
51 changes: 37 additions & 14 deletions crates/core/tedge_api/src/workflow.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::mqtt_topics::EntityTopicId;
use crate::mqtt_topics::MqttSchema;
use crate::mqtt_topics::OperationType;
use log::info;
use mqtt_channel::Message;
use mqtt_channel::QoS;
use serde::Deserialize;
Expand All @@ -17,6 +18,10 @@ pub struct OperationWorkflow {
/// The operation to which this workflow applies
pub operation: OperationType,

/// Mark this workflow as built_in
#[serde(default, skip)]
pub built_in: bool,

/// The states of the state machine
#[serde(flatten)]
pub states: HashMap<StateName, OperationState>,
Expand Down Expand Up @@ -112,14 +117,23 @@ impl WorkflowSupervisor {
&mut self,
workflow: OperationWorkflow,
) -> Result<(), WorkflowRegistrationError> {
if self.workflows.contains_key(&workflow.operation) {
Err(WorkflowRegistrationError::DuplicatedWorkflow {
operation: workflow.operation.to_string(),
})
} else {
self.workflows.insert(workflow.operation.clone(), workflow);
Ok(())
if let Some(previous) = self.workflows.get(&workflow.operation) {
if previous.built_in == workflow.built_in {
return Err(WorkflowRegistrationError::DuplicatedWorkflow {
operation: workflow.operation.to_string(),
});
}

info!(
"The built-in {} operation has been customized",
workflow.operation
);
if workflow.built_in {
return Ok(());
}
}
self.workflows.insert(workflow.operation.clone(), workflow);
Ok(())
}

/// List the capabilities provided by the registered workflows
Expand Down Expand Up @@ -157,25 +171,34 @@ impl OperationWorkflow {
/// Create a built-in operation workflow
pub fn built_in(operation: OperationType) -> Self {
let states = [
("init", vec!["executing"]),
("executing", vec!["successful", "failed"]),
("successful", vec![]),
("failed", vec![]),
("init", false, vec!["scheduled"]),
("scheduled", true, vec!["executing"]),
("executing", true, vec!["successful", "failed"]),
("successful", false, vec![]),
("failed", false, vec![]),
]
.into_iter()
.map(|(step, next)| {
.map(|(step, delegate, next)| {
(
step.to_string(),
OperationState {
owner: None,
owner: if delegate {
Some("tedge".to_string())
} else {
None
},
script: None,
next: next.into_iter().map(|s| s.to_string()).collect(),
},
)
})
.collect();

OperationWorkflow { operation, states }
OperationWorkflow {
built_in: true,
operation,
states,
}
}

/// Return the MQTT message to register support for the operation described by this workflow
Expand Down
4 changes: 2 additions & 2 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1453,7 +1453,7 @@ impl CumulocityConverter {
.ok_or_else(|| Error::UnknownEntity(target.to_string()))?;

match response.status() {
CommandStatus::Init => {
CommandStatus::Init | CommandStatus::Scheduled => {
// The command has not been processed yet
Ok(vec![])
}
Expand Down Expand Up @@ -1528,7 +1528,7 @@ impl CumulocityConverter {
Ok(vec![response.clearing_message(&self.mqtt_schema)])
}

CommandStatus::Init | CommandStatus::Executing => {
CommandStatus::Init | CommandStatus::Scheduled | CommandStatus::Executing => {
// C8Y doesn't expect any message to be published
Ok(Vec::new())
}
Expand Down
8 changes: 6 additions & 2 deletions crates/extensions/tedge_config_manager/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ impl ConfigManagerActor {
self.handle_config_snapshot_request(&message.topic, request)
.await?;
}
CommandStatus::Successful | CommandStatus::Failed { .. } => {}
CommandStatus::Scheduled
| CommandStatus::Successful
| CommandStatus::Failed { .. } => {}
},
Ok(Some(ConfigOperation::Update(request))) => match request.status {
CommandStatus::Init => {
Expand All @@ -135,7 +137,9 @@ impl ConfigManagerActor {
self.handle_config_update_request(&message.topic, request)
.await?;
}
CommandStatus::Successful | CommandStatus::Failed { .. } => {}
CommandStatus::Scheduled
| CommandStatus::Successful
| CommandStatus::Failed { .. } => {}
},
Ok(None) => {}
Err(ConfigManagementError::InvalidTopicError) => {
Expand Down
4 changes: 3 additions & 1 deletion crates/extensions/tedge_log_manager/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ impl LogManagerActor {
self.handle_logfile_request_operation(&message.topic, request)
.await?;
}
CommandStatus::Successful | CommandStatus::Failed { .. } => {}
CommandStatus::Scheduled
| CommandStatus::Successful
| CommandStatus::Failed { .. } => {}
},
Ok(None) => {}
Err(err) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh

echo new software list request >>/tmp/operations.log
echo '{ "status":"scheduled" }'
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
operation = "software_list" # A built in operation can be overridden

[init]
script = "/etc/tedge/operations/init-software-list.sh" # The json output of the script is used for the next step
next = ["scheduled"]

[scheduled]
owner = "tedge" # Builtin behavior
next = ["executing"]

[executing]
owner = "tedge" # Builtin behavior
next = ["successful", "failed"]

[successful]
next = []

[failed]
next = []

0 comments on commit 9b8a18f

Please sign in to comment.