Skip to content

Commit

Permalink
ensure operation handler handles all errors
Browse files Browse the repository at this point in the history
More precisely, `to_response` function takes the operation result and
converts it to a cumulocity response. The handlers of different
operations now don't have to concern themselves with creating a
cumulocity response, they can just return the error directly.

Signed-off-by: Marcel Guzik <[email protected]>
  • Loading branch information
Bravo555 committed Jul 16, 2024
1 parent 1d7e99e commit f37739f
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl OperationContext {

return Ok(OperationResult::Finished {
messages: vec![c8y_notification],
command: command.into_generic_command(&self.mqtt_schema),

});
}
Ok(download) => download,
Expand Down Expand Up @@ -197,7 +197,6 @@ impl OperationContext {

Ok(OperationResult::Finished {
messages: vec![c8y_notification],
command: command.into_generic_command(&self.mqtt_schema),
})
}
CommandStatus::Failed { reason } => {
Expand All @@ -208,7 +207,7 @@ impl OperationContext {

Ok(OperationResult::Finished {
messages: vec![c8y_notification],
command: command.into_generic_command(&self.mqtt_schema),

})
}
_ => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ impl OperationContext {

Ok(OperationResult::Finished {
messages: vec![c8y_notification],
command: command.into_generic_command(&self.mqtt_schema),
})
}
CommandStatus::Failed { reason } => {
Expand All @@ -93,7 +92,6 @@ impl OperationContext {

Ok(OperationResult::Finished {
messages: vec![c8y_notification],
command: command.into_generic_command(&self.mqtt_schema),
})
}
_ => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ impl OperationContext {

Ok(OperationResult::Finished {
messages: vec![c8y_notification, twin_metadata],
command: command.into_generic_command(&self.mqtt_schema),
})
}
CommandStatus::Failed { reason } => {
Expand All @@ -146,7 +145,6 @@ impl OperationContext {

Ok(OperationResult::Finished {
messages: vec![c8y_notification],
command: command.into_generic_command(&self.mqtt_schema),
})
}
_ => Ok(OperationResult::Ignored),
Expand Down
5 changes: 2 additions & 3 deletions crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl OperationContext {
let c8y_notification = MqttMessage::new(smartrest_topic, smartrest_error);
return Ok(OperationResult::Finished {
messages: vec![c8y_notification],
command: command.into_generic_command(&self.mqtt_schema),

});
}
Ok(download) => download,
Expand Down Expand Up @@ -183,7 +183,6 @@ impl OperationContext {

Ok(OperationResult::Finished {
messages: vec![c8y_notification],
command: command.into_generic_command(&self.mqtt_schema),
})
}
CommandStatus::Failed { reason } => {
Expand All @@ -193,7 +192,7 @@ impl OperationContext {
MqttMessage::new(smartrest_topic, smartrest_operation_status);
Ok(OperationResult::Finished {
messages: vec![c8y_notification],
command: command.into_generic_command(&self.mqtt_schema),

})
}
_ => {
Expand Down
160 changes: 97 additions & 63 deletions crates/extensions/c8y_mapper_ext/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::converter::CumulocityConverter;
use crate::error::ConversionError;
use crate::Capabilities;
use c8y_api::http_proxy::C8yEndPoint;
use c8y_api::smartrest::smartrest_serializer::fail_operation;
use c8y_api::smartrest::smartrest_serializer::set_operation_executing;
use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations;
use c8y_auth_proxy::url::ProxyUrlGenerator;
Expand All @@ -48,6 +49,7 @@ use tedge_config::SoftwareManagementApiFlag;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::QoS;
use tedge_mqtt_ext::Topic;
use tracing::debug;
use tracing::error;

pub mod config_snapshot;
Expand Down Expand Up @@ -202,6 +204,10 @@ impl RunningOperation {
let handle = tokio::spawn(async move {
while let Some(message) = rx.recv().await {
if message.cmd_id != cmd_id {
debug!(
msg_cmd_id = %message.cmd_id,
%cmd_id, "operation-related message was routed incorrectly"
);
continue;
}

Expand All @@ -213,91 +219,100 @@ impl RunningOperation {
} = message;
let external_id = entity.external_id.clone();

let operation_result = match operation {
OperationType::Health | OperationType::Custom(_) => {
Ok(OperationResult::Ignored)
let command = match GenericCommandState::from_command_message(&message) {
Ok(command) => command,
Err(err) => {
error!(%err, ?message, "could not parse command payload");
return;
}
};

let Some(c8y_operation) = to_c8y_operation(&operation) else {
debug!(
topic = message.topic.name,
?operation,
"ignoring local-only operation"
);
return;
};

OperationType::Restart => {
let operation_result = match c8y_operation {
CumulocitySupportedOperations::C8yRestartRequest => {
context
.publish_restart_operation_status(&entity, &cmd_id, message)
.await
}
OperationType::SoftwareList => {
CumulocitySupportedOperations::C8ySoftwareList => {
context
.publish_software_list(&entity, &cmd_id, &message)
.await
}
OperationType::SoftwareUpdate => {
CumulocitySupportedOperations::C8ySoftwareUpdate => {
context
.publish_software_update_status(&entity, &cmd_id, &message)
.await
}
OperationType::LogUpload => {
CumulocitySupportedOperations::C8yLogFileRequest => {
context
.handle_log_upload_state_change(&entity, &cmd_id, &message)
.await
}
OperationType::ConfigSnapshot => {
CumulocitySupportedOperations::C8yUploadConfigFile => {
context
.handle_config_snapshot_state_change(&entity, &cmd_id, &message)
.await
}
OperationType::ConfigUpdate => {
CumulocitySupportedOperations::C8yDownloadConfigFile => {
context
.handle_config_update_state_change(&entity, &cmd_id, &message)
.await
}
OperationType::FirmwareUpdate => {
CumulocitySupportedOperations::C8yFirmware => {
context
.handle_firmware_update_state_change(&entity, &cmd_id, &message)
.await
}
};

let mut mqtt_publisher = context.mqtt_publisher.clone();
match operation_result {
Ok(result) => match result {
OperationResult::Ignored => {}
OperationResult::Executing => {
if let Some(c8y_state_executing_payload) =
c8y_state_message_executing(operation)
{
let c8y_state_executing_message = MqttMessage::new(
&entity.smartrest_publish_topic,
c8y_state_executing_payload,
);
mqtt_publisher
.send(c8y_state_executing_message)
.await
.unwrap();
}
match to_response(
operation_result,
c8y_operation,
&entity.smartrest_publish_topic,
) {
OperationResult::Ignored => {}
OperationResult::Executing => {
let c8y_state_executing_payload = set_operation_executing(c8y_operation);
let c8y_state_executing_message = MqttMessage::new(
&entity.smartrest_publish_topic,
c8y_state_executing_payload,
);
mqtt_publisher
.send(c8y_state_executing_message)
.await
.unwrap();
}
OperationResult::Finished { messages } => {
if let Err(e) = context
.upload_operation_log(&external_id, &cmd_id, &operation, &command)
.await
{
error!("failed to upload operation logs: {e}");
}
OperationResult::Finished { messages, command } => {
if let Err(e) = context
.upload_operation_log(&external_id, &cmd_id, &operation, &command)
.await
{
error!("failed to upload operation logs: {e}");
}

for message in messages {
mqtt_publisher.send(message).await.unwrap();
}

// clear command topic
let state = command.clear();
let clearing_message = state.into_message();
assert!(clearing_message.payload_bytes().is_empty());
assert!(clearing_message.retain);
assert_eq!(clearing_message.qos, QoS::AtLeastOnce);
mqtt_publisher.send(clearing_message).await.unwrap();

rx.close();

for message in messages {
mqtt_publisher.send(message).await.unwrap();
}
},
Err(err) => {
unimplemented!()

// clear command topic
let command = command.clear();
let clearing_message = command.into_message();
assert!(clearing_message.payload_bytes().is_empty());
assert!(clearing_message.retain);
assert_eq!(clearing_message.qos, QoS::AtLeastOnce);
mqtt_publisher.send(clearing_message).await.unwrap();

rx.close();
}
}
}
Expand Down Expand Up @@ -325,31 +340,50 @@ enum OperationResult {
///
/// Operation state is either `SUCCESSFUL` or `FAILED`. Report state to C8y, send operation log,
/// clean local MQTT topic.
Finished {
messages: Vec<MqttMessage>,
command: GenericCommandState,
},
Finished { messages: Vec<MqttMessage> },
}

/// For a given `OperationType`, obtain a C8y Smartrest Set operation to EXECUTING message.
fn c8y_state_message_executing(operation_type: OperationType) -> Option<String> {
// convert local operation to c8y operation
let c8y_operation = match operation_type {
/// Converts operation result to valid C8y response.
fn to_response(
result: Result<OperationResult, ConversionError>,
operation_type: CumulocitySupportedOperations,
smartrest_publish_topic: &Topic,
) -> OperationResult {
let err = match result {
Ok(res) => {
return res;
}
Err(err) => err,
};

// assuming `high level error: low level error: root cause error` error display impl
let set_operation_to_failed_payload = fail_operation(operation_type, &err.to_string());

let set_operation_to_failed_message =
MqttMessage::new(smartrest_publish_topic, set_operation_to_failed_payload);

let messages = vec![set_operation_to_failed_message];

OperationResult::Finished { messages }
}

/// For a given `OperationType`, obtain a matching `C8ySupportedOperations`.
///
/// For `OperationType`s that don't have C8y operation equivalent, `None` is returned.
fn to_c8y_operation(operation_type: &OperationType) -> Option<CumulocitySupportedOperations> {
match operation_type {
OperationType::LogUpload => Some(CumulocitySupportedOperations::C8yLogFileRequest),
OperationType::Restart => Some(CumulocitySupportedOperations::C8yRestartRequest),
OperationType::ConfigSnapshot => Some(CumulocitySupportedOperations::C8yUploadConfigFile),
OperationType::ConfigUpdate => Some(CumulocitySupportedOperations::C8yDownloadConfigFile),
OperationType::FirmwareUpdate => Some(CumulocitySupportedOperations::C8yFirmware),
OperationType::SoftwareUpdate => Some(CumulocitySupportedOperations::C8ySoftwareUpdate),
// SoftwareList is handled by HTTP proxy, not smartrest
OperationType::SoftwareList => None,
OperationType::SoftwareList => Some(CumulocitySupportedOperations::C8ySoftwareList),
// local-only operation, not always invoked by c8y, handled in other codepath
OperationType::Health => None,
// other custom operations, no c8y equivalent
OperationType::Custom(_) => None,
};

c8y_operation.map(set_operation_executing)
}
}

/// State required by the operation handlers.
Expand Down
2 changes: 0 additions & 2 deletions crates/extensions/c8y_mapper_ext/src/operations/restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ impl OperationContext {

Ok(OperationResult::Finished {
messages: vec![MqttMessage::new(topic, smartrest_set_operation)],
command: command.into_generic_command(&self.mqtt_schema),
})
}
CommandStatus::Failed { ref reason } => {
Expand All @@ -51,7 +50,6 @@ impl OperationContext {

Ok(OperationResult::Finished {
messages: vec![MqttMessage::new(topic, smartrest_set_operation)],
command: command.into_generic_command(&self.mqtt_schema),
})
}
_ => {
Expand Down
15 changes: 3 additions & 12 deletions crates/extensions/c8y_mapper_ext/src/operations/software_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ impl OperationContext {
target.external_id.as_ref().to_string(),
)
.await?;
return Ok(OperationResult::Finished {
messages: vec![],
command: command.into_generic_command(&self.mqtt_schema),
});
return Ok(OperationResult::Finished { messages: vec![] });
}

// Send a list via SmartREST, "advanced software list" feature c8y >= 10.14
Expand All @@ -63,18 +60,12 @@ impl OperationContext {
messages.push(MqttMessage::new(&topic, payload))
}

Ok(OperationResult::Finished {
messages,
command: command.into_generic_command(&self.mqtt_schema),
})
Ok(OperationResult::Finished { messages })
}

CommandStatus::Failed { reason } => {
error!("Fail to list installed software packages: {reason}");
Ok(OperationResult::Finished {
messages: vec![],
command: command.into_generic_command(&self.mqtt_schema),
})
Ok(OperationResult::Finished { messages: vec![] })
}

CommandStatus::Init
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ impl OperationContext {
MqttMessage::new(topic, smartrest_set_operation),
self.request_software_list(&target.topic_id),
],
command: command.into_generic_command(&self.mqtt_schema),
})
}
CommandStatus::Failed { reason } => {
Expand All @@ -63,7 +62,6 @@ impl OperationContext {
MqttMessage::new(topic, smartrest_set_operation),
self.request_software_list(&target.topic_id),
],
command: command.into_generic_command(&self.mqtt_schema),
})
}
}
Expand Down

0 comments on commit f37739f

Please sign in to comment.