Skip to content

Commit

Permalink
Merge pull request #3105 from didier-wenzek/feat/improve-workflow-bui…
Browse files Browse the repository at this point in the history
…ltin-actions-v2

Feat: improve workflow builtin actions
  • Loading branch information
didier-wenzek authored Sep 11, 2024
2 parents ea6adf6 + 7ad7db4 commit a7e4d9f
Show file tree
Hide file tree
Showing 17 changed files with 510 additions and 133 deletions.
80 changes: 62 additions & 18 deletions crates/core/tedge_agent/src/operation_workflows/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct WorkflowActor {
pub(crate) state_repository: AgentStateRepository<CommandBoard>,
pub(crate) log_dir: Utf8PathBuf,
pub(crate) input_receiver: UnboundedLoggingReceiver<AgentInput>,
pub(crate) command_dispatcher: CommandDispatcher,
pub(crate) builtin_command_dispatcher: CommandDispatcher,
pub(crate) command_sender: DynSender<InternalCommandState>,
pub(crate) mqtt_publisher: LoggingSender<MqttMessage>,
pub(crate) script_runner: ClientMessageBox<Execute, std::io::Result<Output>>,
Expand All @@ -73,16 +73,15 @@ impl Actor for WorkflowActor {
self.process_mqtt_message(message).await?;
}
AgentInput::InternalCommandState(InternalCommandState(command_state)) => {
self.process_internal_state_update(command_state).await?;
self.process_command_update(command_state).await?;
}
AgentInput::GenericCommandData(GenericCommandData::State(new_state)) => {
self.process_command_state_update(new_state).await?;
self.process_builtin_command_update(new_state).await?;
}
AgentInput::GenericCommandData(GenericCommandData::Metadata(
GenericCommandMetadata { operation, payload },
)) => {
self.publish_operation_capability(operation, payload)
.await?;
self.publish_builtin_capability(operation, payload).await?;
}
}
}
Expand All @@ -101,7 +100,7 @@ impl WorkflowActor {
Ok(())
}

async fn publish_operation_capability(
async fn publish_builtin_capability(
&mut self,
operation: OperationName,
payload: serde_json::Value,
Expand All @@ -117,6 +116,11 @@ impl WorkflowActor {
Ok(())
}

/// Process a command update received from MQTT
///
/// Beware, these updates are coming from external components (the mapper inits and clears commands),
/// but also from *this* actor as all its state transitions are published over MQTT.
/// Only the former will be actually processed with [Self::process_command_update].
async fn process_mqtt_message(&mut self, message: MqttMessage) -> Result<(), RuntimeError> {
let Ok((operation, cmd_id)) = self.extract_command_identifiers(&message.topic.name) else {
log::error!("Unknown command channel: {}", &message.topic.name);
Expand All @@ -136,7 +140,7 @@ impl WorkflowActor {
Ok(Some(new_state)) => {
self.persist_command_board().await?;
if new_state.is_init() {
self.process_internal_state_update(new_state.set_log_path(&log_file.path))
self.process_command_update(new_state.set_log_path(&log_file.path))
.await?;
}
}
Expand All @@ -152,7 +156,13 @@ impl WorkflowActor {
Ok(())
}

async fn process_internal_state_update(
/// Process a command state update taking any action as defined by the workflow
///
/// A new state can be received:
/// - from MQTT as for init and clear messages
/// - from the engine itself when a progress is made
/// - from one of the builtin operation actors
async fn process_command_update(
&mut self,
state: GenericCommandState,
) -> Result<(), RuntimeError> {
Expand Down Expand Up @@ -205,10 +215,25 @@ impl WorkflowActor {
let new_state = state.move_to(next_step);
self.publish_command_state(new_state, &mut log_file).await
}
OperationAction::BuiltIn => {
OperationAction::BuiltIn(_, _) => {
let step = &state.status;
info!("Processing {operation} operation {step} step");
Ok(self.command_dispatcher.send(state).await?)

Ok(self.builtin_command_dispatcher.send(state).await?)
}
OperationAction::BuiltInOperation(ref builtin_op, ref handlers) => {
let step = &state.status;
info!("Executing builtin:{builtin_op} operation {step} step");

// Fork a builtin state
let builtin_state = action.adapt_builtin_request(state.clone());

// Move to the next state to await the builtin operation outcome
let new_state = state.update(handlers.on_exec.clone());
self.publish_command_state(new_state, &mut log_file).await?;

// Forward the command to the builtin operation actor
Ok(self.builtin_command_dispatcher.send(builtin_state).await?)
}
OperationAction::AwaitingAgentRestart(handlers) => {
let step = &state.status;
Expand Down Expand Up @@ -307,7 +332,7 @@ impl WorkflowActor {
}
OperationAction::AwaitOperationCompletion(handlers, output_excerpt) => {
let step = &state.status;
info!("{operation} operation {step} waiting for sub-operation completion");
info!("{operation} operation {step}: waiting for sub-operation completion");

// Get the sub-operation state and resume this command when the sub-operation is in a terminal state
if let Some(sub_state) = self
Expand Down Expand Up @@ -349,8 +374,6 @@ impl WorkflowActor {
))
.await;
}
} else {
log_file.log_info("=> sub-operation not yet launched").await;
};

Ok(())
Expand All @@ -376,18 +399,39 @@ impl WorkflowActor {
}
}

async fn process_command_state_update(
/// Pre-process an update received from a builtin operation actor
///
/// The actual work will be done by [Self::process_command_update].
async fn process_builtin_command_update(
&mut self,
new_state: GenericCommandState,
) -> Result<(), RuntimeError> {
if let Err(err) = self.workflows.apply_internal_update(new_state.clone()) {
if new_state.is_finished() {
self.finalize_builtin_command_update(new_state).await
} else {
// As not finalized, the builtin state is sent back
// to the builtin operation actor for further processing.
let builtin_state = new_state.clone();
Ok(self.builtin_command_dispatcher.send(builtin_state).await?)
}
}

/// Finalize a builtin operation
///
/// Moving to the next step calling [Self::process_command_update].
async fn finalize_builtin_command_update(
&mut self,
new_state: GenericCommandState,
) -> Result<(), RuntimeError> {
let adapted_state = self.workflows.adapt_builtin_response(new_state);
if let Err(err) = self.workflows.apply_internal_update(adapted_state.clone()) {
error!("Fail to persist workflow operation state: {err}");
}
self.persist_command_board().await?;
self.mqtt_publisher
.send(new_state.clone().into_message())
.send(adapted_state.clone().into_message())
.await?;
self.process_internal_state_update(new_state).await
self.process_command_update(adapted_state).await
}

fn open_command_log(
Expand Down Expand Up @@ -440,7 +484,7 @@ impl WorkflowActor {
match self.state_repository.load().await {
Ok(Some(pending_commands)) => {
for command in self.workflows.load_pending_commands(pending_commands) {
self.process_internal_state_update(command.clone()).await?;
self.process_command_update(command.clone()).await?;
}
}
Ok(None) => {}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_agent/src/operation_workflows/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl Builder<WorkflowActor> for WorkflowActorBuilder {
state_repository: repository,
log_dir: self.config.log_dir,
input_receiver: self.input_receiver,
command_dispatcher: self.command_dispatcher,
builtin_command_dispatcher: self.command_dispatcher,
mqtt_publisher: self.mqtt_publisher,
command_sender: self.command_sender,
script_runner: self.script_runner,
Expand Down
12 changes: 6 additions & 6 deletions crates/core/tedge_agent/src/operation_workflows/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,13 @@ async fn convert_outgoing_software_list_response() -> Result<(), DynError> {
SoftwareListCommand::new(&EntityTopicId::default_main_device(), "1234".to_string());
let software_list_response = software_list_request
.clone()
.with_status(CommandStatus::Executing);
.with_status(CommandStatus::Successful);
software_box.send(software_list_response.into()).await?;

mqtt_box
.assert_received([MqttMessage::new(
&Topic::new_unchecked("te/device/main///cmd/software_list/1234"),
r#"{"status":"executing"}"#,
r#"{"status":"successful"}"#,
)
.with_retain()])
.await;
Expand Down Expand Up @@ -286,13 +286,13 @@ async fn convert_outgoing_software_update_response() -> Result<(), DynError> {
// Simulate SoftwareUpdate response message received.
let software_update_request =
SoftwareUpdateCommand::new(&EntityTopicId::default_main_device(), "1234".to_string());
let software_update_response = software_update_request.with_status(CommandStatus::Executing);
let software_update_response = software_update_request.with_status(CommandStatus::Successful);
software_box.send(software_update_response.into()).await?;

mqtt_box
.assert_received([MqttMessage::new(
&Topic::new_unchecked("te/device/main///cmd/software_update/1234"),
r#"{"status":"executing"}"#,
r#"{"status":"successful"}"#,
)
.with_retain()])
.await;
Expand Down Expand Up @@ -325,7 +325,7 @@ async fn convert_outgoing_restart_response() -> Result<(), DynError> {
let executing_response = RestartCommand {
target: EntityTopicId::default_main_device(),
cmd_id: "abc".to_string(),
payload: RestartCommandPayload::new(CommandStatus::Executing),
payload: RestartCommandPayload::new(CommandStatus::Successful),
};
restart_box.send(executing_response).await?;

Expand All @@ -335,7 +335,7 @@ async fn convert_outgoing_restart_response() -> Result<(), DynError> {
.map(|msg| (msg.topic, msg.payload))
.expect("MqttMessage");
assert_eq!(topic.name, "te/device/main///cmd/restart/abc");
assert!(format!("{:?}", payload).contains(r#"status":"executing"#));
assert!(format!("{:?}", payload).contains(r#"status":"successful"#));

Ok(())
}
Expand Down
17 changes: 0 additions & 17 deletions crates/core/tedge_api/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,23 +731,6 @@ pub enum CommandStatus {
Unknown,
}

impl CommandStatus {
pub fn is_terminal_status(&self) -> bool {
matches!(
self,
CommandStatus::Successful | CommandStatus::Failed { reason: _ }
)
}

pub fn is_successful(&self) -> bool {
*self == CommandStatus::Successful
}

pub fn is_failed(&self) -> bool {
matches!(self, CommandStatus::Failed { reason: _ })
}
}

fn default_failure_reason() -> String {
"Unknown reason".to_string()
}
Expand Down
6 changes: 6 additions & 0 deletions crates/core/tedge_api/src/workflow/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ pub enum WorkflowDefinitionError {

#[error("The provided target {0} is not a valid path expression")]
InvalidPathExpression(String),

#[error("The `builtin:{builtin_operation}` cannot be invoked from `{main_operation}`, but only from `{builtin_operation}`")]
InvalidBuiltinOperation {
main_operation: String,
builtin_operation: String,
},
}

/// Error related to a script definition
Expand Down
Loading

0 comments on commit a7e4d9f

Please sign in to comment.