diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index b8031ace64a..393de82eb3a 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -1,3 +1,4 @@ +use crate::operations::OperationHandler; use crate::Capabilities; use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; use c8y_api::smartrest::error::OperationsError; @@ -198,24 +199,11 @@ impl C8yMapperConfig { topics.add_all(mqtt_schema.topics(AnyEntity, CommandMetadata(cmd))); } - if capabilities.log_upload { - topics.add_all(crate::operations::log_upload::log_upload_topic_filter( - &mqtt_schema, - )); - } - if capabilities.config_snapshot { - topics.add_all(crate::operations::config_snapshot::topic_filter( - &mqtt_schema, - )); - } - if capabilities.config_update { - topics.add_all(crate::operations::config_update::topic_filter(&mqtt_schema)); - } - if capabilities.firmware_update { - topics.add_all( - crate::operations::firmware_update::firmware_update_topic_filter(&mqtt_schema), - ); - } + let operation_topics = OperationHandler::topic_filter(&capabilities) + .into_iter() + .map(|(e, c)| mqtt_schema.topics(e, c)) + .collect(); + topics.add_all(operation_topics); // Add user configurable external topic filters for topic in tedge_config.c8y.topics.0.clone() { diff --git a/crates/extensions/c8y_mapper_ext/src/lib.rs b/crates/extensions/c8y_mapper_ext/src/lib.rs index 1697ff488cc..26d8c6bf723 100644 --- a/crates/extensions/c8y_mapper_ext/src/lib.rs +++ b/crates/extensions/c8y_mapper_ext/src/lib.rs @@ -17,10 +17,10 @@ mod tests; #[derive(Debug, Clone, Copy, serde::Deserialize)] pub struct Capabilities { - log_upload: bool, - config_snapshot: bool, - config_update: bool, - firmware_update: bool, + pub log_upload: bool, + pub config_snapshot: bool, + pub config_update: bool, + pub firmware_update: bool, } #[cfg(test)] diff --git a/crates/extensions/c8y_mapper_ext/src/operations/convert.rs b/crates/extensions/c8y_mapper_ext/src/operations/convert.rs new file mode 100644 index 00000000000..7d95e737172 --- /dev/null +++ b/crates/extensions/c8y_mapper_ext/src/operations/convert.rs @@ -0,0 +1,312 @@ +//! Converting Cumulocity Smartrest operation messages into local thin-edge operation messages. + +use std::sync::Arc; + +use c8y_api::json_c8y_deserializer::C8yDownloadConfigFile; +use c8y_api::json_c8y_deserializer::C8yFirmware; +use c8y_api::json_c8y_deserializer::C8yLogfileRequest; +use c8y_api::json_c8y_deserializer::C8yUploadConfigFile; +use tedge_api::commands::CommandStatus; +use tedge_api::commands::ConfigMetadata; +use tedge_api::commands::ConfigSnapshotCmdPayload; +use tedge_api::commands::ConfigUpdateCmdPayload; +use tedge_api::commands::FirmwareUpdateCmdPayload; +use tedge_api::commands::LogMetadata; +use tedge_api::commands::LogUploadCmdPayload; +use tedge_api::entity_store::EntityExternalId; +use tedge_api::entity_store::EntityMetadata; +use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::OperationType; +use tedge_api::Jsonify; +use tedge_mqtt_ext::MqttMessage; +use tracing::error; +use tracing::warn; + +use crate::converter::CumulocityConverter; +use crate::error::ConversionError; +use crate::error::CumulocityMapperError; + +impl CumulocityConverter { + /// Converts a config_snapshot metadata message to + /// - supported operation "c8y_UploadConfigFile" + /// - supported config types + pub fn convert_config_snapshot_metadata( + &mut self, + topic_id: &EntityTopicId, + message: &MqttMessage, + ) -> Result, ConversionError> { + if !self.config.capabilities.config_snapshot { + warn!( + "Received config_snapshot metadata, however, config_snapshot feature is disabled" + ); + } + self.convert_config_metadata(topic_id, message, "c8y_UploadConfigFile") + } + + /// Converts a config_update metadata message to + /// - supported operation "c8y_DownloadConfigFile" + /// - supported config types + pub fn convert_config_update_metadata( + &mut self, + topic_id: &EntityTopicId, + message: &MqttMessage, + ) -> Result, ConversionError> { + if !self.config.capabilities.config_update { + warn!("Received config_update metadata, however, config_update feature is disabled"); + return Ok(vec![]); + } + self.convert_config_metadata(topic_id, message, "c8y_DownloadConfigFile") + } + + /// Convert c8y_UploadConfigFile JSON over MQTT operation to ThinEdge config_snapshot command + pub fn convert_config_snapshot_request( + &self, + device_xid: String, + cmd_id: String, + config_upload_request: C8yUploadConfigFile, + ) -> Result, CumulocityMapperError> { + let target = self + .entity_store + .try_get_by_external_id(&device_xid.into())?; + + let channel = Channel::Command { + operation: OperationType::ConfigSnapshot, + cmd_id: cmd_id.clone(), + }; + let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel); + + // Replace '/' with ':' to avoid creating unexpected directories in file transfer repo + let tedge_url = format!( + "http://{}/tedge/file-transfer/{}/config_snapshot/{}-{}", + &self.config.tedge_http_host, + target.external_id.as_ref(), + config_upload_request.config_type.replace('/', ":"), + cmd_id + ); + + let request = ConfigSnapshotCmdPayload { + status: CommandStatus::Init, + tedge_url: Some(tedge_url), + config_type: config_upload_request.config_type, + path: None, + log_path: None, + }; + + // Command messages must be retained + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) + } + + /// Convert c8y_LogfileRequest operation to a ThinEdge log_upload command + pub fn convert_log_upload_request( + &self, + device_xid: String, + cmd_id: String, + log_request: C8yLogfileRequest, + ) -> Result, CumulocityMapperError> { + let target = self + .entity_store + .try_get_by_external_id(&device_xid.into())?; + + let channel = Channel::Command { + operation: OperationType::LogUpload, + cmd_id: cmd_id.clone(), + }; + let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel); + + let tedge_url = format!( + "http://{}/tedge/file-transfer/{}/log_upload/{}-{}", + &self.config.tedge_http_host, + target.external_id.as_ref(), + log_request.log_file, + cmd_id + ); + + let request = LogUploadCmdPayload { + status: CommandStatus::Init, + tedge_url, + log_type: log_request.log_file, + date_from: log_request.date_from, + date_to: log_request.date_to, + search_text: Some(log_request.search_text).filter(|s| !s.is_empty()), + lines: log_request.maximum_lines, + log_path: None, + }; + + // Command messages must be retained + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) + } + + /// Converts a log_upload metadata message to + /// - supported operation "c8y_LogfileRequest" + /// - supported log types + pub fn convert_log_metadata( + &mut self, + topic_id: &EntityTopicId, + message: &MqttMessage, + ) -> Result, ConversionError> { + if !self.config.capabilities.log_upload { + warn!("Received log_upload metadata, however, log_upload feature is disabled"); + return Ok(vec![]); + } + + let mut messages = match self.register_operation(topic_id, "c8y_LogfileRequest") { + Err(err) => { + error!( + "Failed to register `c8y_LogfileRequest` operation for {topic_id} due to: {err}" + ); + return Ok(vec![]); + } + Ok(messages) => messages, + }; + + // To SmartREST supported log types + let metadata = LogMetadata::from_json(message.payload_str()?)?; + let mut types = metadata.types; + types.sort(); + let supported_log_types = types.join(","); + let payload = format!("118,{supported_log_types}"); + let c8y_topic = self.smartrest_publish_topic_for_entity(topic_id)?; + messages.push(MqttMessage::new(&c8y_topic, payload)); + + Ok(messages) + } + + /// Convert c8y_Firmware JSON over MQTT operation to ThinEdge firmware_update command. + pub fn convert_firmware_update_request( + &self, + device_xid: String, + cmd_id: String, + firmware_request: C8yFirmware, + ) -> Result, CumulocityMapperError> { + let entity_xid: EntityExternalId = device_xid.into(); + + let target = self.entity_store.try_get_by_external_id(&entity_xid)?; + + let channel = Channel::Command { + operation: OperationType::FirmwareUpdate, + cmd_id, + }; + let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel); + + let tedge_url = + if let Some(c8y_url) = self.c8y_endpoint.maybe_tenant_url(&firmware_request.url) { + self.auth_proxy.proxy_url(c8y_url).to_string() + } else { + firmware_request.url.clone() + }; + + let request = FirmwareUpdateCmdPayload { + status: CommandStatus::Init, + tedge_url: Some(tedge_url), + remote_url: firmware_request.url, + name: firmware_request.name, + version: firmware_request.version, + log_path: None, + }; + + // Command messages must be retained + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) + } + + pub fn register_firmware_update_operation( + &mut self, + topic_id: &EntityTopicId, + ) -> Result, ConversionError> { + if !self.config.capabilities.firmware_update { + warn!( + "Received firmware_update metadata, however, firmware_update feature is disabled" + ); + return Ok(vec![]); + } + + match self.register_operation(topic_id, "c8y_Firmware") { + Err(err) => { + error!("Failed to register `c8y_Firmware` operation for {topic_id} due to: {err}"); + Ok(vec![]) + } + Ok(messages) => Ok(messages), + } + } + + /// Upon receiving a SmartREST c8y_DownloadConfigFile request, convert it to a message on the + /// command channel. + pub async fn convert_config_update_request( + &self, + device_xid: String, + cmd_id: String, + config_download_request: C8yDownloadConfigFile, + ) -> Result, CumulocityMapperError> { + let entity_xid: EntityExternalId = device_xid.into(); + let target = self.entity_store.try_get_by_external_id(&entity_xid)?; + + let message = + self.create_config_update_cmd(cmd_id.into(), &config_download_request, target); + Ok(message) + } + + fn create_config_update_cmd( + &self, + cmd_id: Arc, + config_download_request: &C8yDownloadConfigFile, + target: &EntityMetadata, + ) -> Vec { + let channel = Channel::Command { + operation: OperationType::ConfigUpdate, + cmd_id: cmd_id.to_string(), + }; + let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel); + + let proxy_url = self + .c8y_endpoint + .maybe_tenant_url(&config_download_request.url) + .map(|cumulocity_url| self.auth_proxy.proxy_url(cumulocity_url).into()); + + let remote_url = proxy_url.unwrap_or(config_download_request.url.to_string()); + + let request = ConfigUpdateCmdPayload { + status: CommandStatus::Init, + tedge_url: None, + remote_url, + config_type: config_download_request.config_type.clone(), + path: None, + log_path: None, + }; + + // Command messages must be retained + vec![MqttMessage::new(&topic, request.to_json()).with_retain()] + } + + fn convert_config_metadata( + &mut self, + topic_id: &EntityTopicId, + message: &MqttMessage, + c8y_op_name: &str, + ) -> Result, ConversionError> { + let metadata = ConfigMetadata::from_json(message.payload_str()?)?; + + let mut messages = match self.register_operation(topic_id, c8y_op_name) { + Err(err) => { + error!("Failed to register {c8y_op_name} operation for {topic_id} due to: {err}"); + return Ok(vec![]); + } + Ok(messages) => messages, + }; + + // To SmartREST supported config types + let mut types = metadata.types; + types.sort(); + let supported_config_types = types.join(","); + let payload = format!("119,{supported_config_types}"); + let sm_topic = self.smartrest_publish_topic_for_entity(topic_id)?; + messages.push(MqttMessage::new(&sm_topic, payload)); + + Ok(messages) + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handler.rs b/crates/extensions/c8y_mapper_ext/src/operations/handler.rs new file mode 100644 index 00000000000..b930d3b8d4a --- /dev/null +++ b/crates/extensions/c8y_mapper_ext/src/operations/handler.rs @@ -0,0 +1,525 @@ +use super::handlers::EntityTarget; +use super::handlers::OperationContext; +use super::handlers::OperationMessage; +use super::handlers::UpdateStatus; +use crate::actor::IdDownloadRequest; +use crate::actor::IdDownloadResult; +use crate::actor::IdUploadRequest; +use crate::actor::IdUploadResult; +use crate::config::C8yMapperConfig; +use crate::Capabilities; +use c8y_api::http_proxy::C8yEndPoint; +use c8y_auth_proxy::url::ProxyUrlGenerator; +use c8y_http_proxy::handle::C8YHttpProxy; +use std::collections::HashMap; +use std::sync::Arc; +use tedge_actors::ClientMessageBox; +use tedge_actors::LoggingSender; +use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::ChannelFilter; +use tedge_api::mqtt_topics::EntityFilter; +use tedge_api::mqtt_topics::IdGenerator; +use tedge_mqtt_ext::MqttMessage; +use tokio::task::JoinError; + +/// Handles operations. +/// +/// Handling an operation usually consists of 3 steps: +/// +/// 1. Receive a smartrest message which is an operation request, convert it to thin-edge message, +/// and publish on local MQTT (done by the converter). +/// 2. Various local thin-edge components/services (e.g. tedge-agent) execute the operation, and +/// when they're done, they publish an MQTT message with 'status: successful/failed' +/// 3. The cumulocity mapper needs to do some additional steps, like downloading/uploading files via +/// HTTP, or talking to C8y via HTTP proxy, before it can send operation response via the bridge +/// and then clear the local MQTT operation topic. +/// +/// This struct concerns itself with performing step 3. +/// +/// Incoming operation-related MQTT messages need to be passed to the [`Self::handle`] method, which +/// performs an operation in the background in separate tasks. The operation tasks themselves handle +/// reporting their success/failure. +pub struct OperationHandler { + context: Arc, + running_operations: HashMap, RunningOperation>, +} + +impl OperationHandler { + pub fn new( + c8y_mapper_config: &C8yMapperConfig, + + downloader: ClientMessageBox, + uploader: ClientMessageBox, + mqtt_publisher: LoggingSender, + + http_proxy: C8YHttpProxy, + auth_proxy: ProxyUrlGenerator, + ) -> Self { + Self { + context: Arc::new(OperationContext { + capabilities: c8y_mapper_config.capabilities, + auto_log_upload: c8y_mapper_config.auto_log_upload, + tedge_http_host: c8y_mapper_config.tedge_http_host.clone(), + tmp_dir: c8y_mapper_config.tmp_dir.clone(), + mqtt_schema: c8y_mapper_config.mqtt_schema.clone(), + mqtt_publisher: mqtt_publisher.clone(), + software_management_api: c8y_mapper_config.software_management_api, + + // TODO(marcel): would be good not to generate new ids from running operations, see if + // we can remove it somehow + command_id: IdGenerator::new(crate::converter::REQUESTER_NAME), + + downloader, + uploader, + + c8y_endpoint: C8yEndPoint::new( + &c8y_mapper_config.c8y_host, + &c8y_mapper_config.c8y_mqtt, + &c8y_mapper_config.device_id, + ), + http_proxy: http_proxy.clone(), + auth_proxy: auth_proxy.clone(), + }), + + running_operations: Default::default(), + } + } + + /// Handles an MQTT command id message. + /// + /// All MQTT messages with a topic that contains an operation id, e.g. + /// `te/device/child001///cmd/software_list/c8y-2023-09-25T14:34:00` need to be passed here for + /// operations to be processed. Messages not related to operations are ignored. + /// + /// `entity` needs to be the same entity as in `message`. + /// + /// When a message with a new id is handled, a task will be spawned that processes this + /// operation. For the operation to be completed, subsequent messages with the same command id + /// need to be handled as well. + /// + /// When an operation terminates (successfully or unsuccessfully), an MQTT operation clearing + /// message will be published to the broker by the running operation task, but this message also + /// needs to be handled when an MQTT broker echoes it back to us, so that `OperationHandler` can + /// free the data associated with the operation. + /// + /// # Panics + /// + /// Will panic if a task that runs the operation has panicked. The task can panic if e.g. MQTT + /// send returns an error or the task encountered any other unexpected error that makes it + /// impossible to finish handling the operation (i.e. send MQTT clearing message and report + /// operation status to c8y). + /// + /// The panic in the operation task has to happen first, and then another message with the same + /// command id has to be handled for the call to `.handle()` to panic. + + // but there's a problem: in practice, when a panic in a child task happens, .handle() will + // never get called for that operation again. Operation task itself sends the messages, so if + // they can't be sent over MQTT because of a panic, they won't be handled, won't be joined, so + // we will not see that an exception has occurred. + // FIXME(marcel): ensure panics are always propagated without the caller having to ask for them + pub async fn handle(&mut self, entity: EntityTarget, message: MqttMessage) { + let Ok((_, channel)) = self.context.mqtt_schema.entity_channel_of(&message.topic) else { + return; + }; + + let Channel::Command { operation, cmd_id } = channel else { + return; + }; + + let message = OperationMessage { + operation, + entity, + cmd_id: cmd_id.into(), + message, + }; + + let topic: Arc = message.message.topic.name.clone().into(); + + let running_operation = self.running_operations.remove(&topic); + + let running_operation = + running_operation.unwrap_or(RunningOperation::spawn(Arc::clone(&self.context))); + + let operation_status = running_operation + .update(message) + .await + .expect("operation task should not panic"); + + match operation_status { + OperationStatus::Ongoing(operation) => { + self.running_operations.insert(topic, operation); + } + OperationStatus::Terminated => {} + } + } + + /// A topic filter for operation types this object can handle. + /// + /// The MQTT client should subscribe to topics with this filter to receive MQTT messages that it + /// should then pass to the [`Self::handle`] method. Depending on the tedge configuration, some + /// operations may be disabled and therefore absent in the filter. + pub fn topic_filter(capabilities: &Capabilities) -> Vec<(EntityFilter, ChannelFilter)> { + use tedge_api::mqtt_topics::ChannelFilter::Command; + use tedge_api::mqtt_topics::ChannelFilter::CommandMetadata; + use tedge_api::mqtt_topics::EntityFilter::AnyEntity; + use tedge_api::mqtt_topics::OperationType; + + let mut topics = vec![]; + + if capabilities.log_upload { + topics.extend([ + (AnyEntity, Command(OperationType::LogUpload)), + (AnyEntity, CommandMetadata(OperationType::LogUpload)), + ]); + } + if capabilities.config_snapshot { + topics.extend([ + (AnyEntity, Command(OperationType::ConfigSnapshot)), + (AnyEntity, CommandMetadata(OperationType::ConfigSnapshot)), + ]); + } + if capabilities.config_update { + topics.extend([ + (AnyEntity, Command(OperationType::ConfigUpdate)), + (AnyEntity, CommandMetadata(OperationType::ConfigUpdate)), + ]); + } + if capabilities.firmware_update { + topics.extend([ + (AnyEntity, Command(OperationType::FirmwareUpdate)), + (AnyEntity, CommandMetadata(OperationType::FirmwareUpdate)), + ]); + } + + topics + } +} + +struct RunningOperation { + handle: tokio::task::JoinHandle<()>, + tx: tokio::sync::mpsc::UnboundedSender, +} + +impl RunningOperation { + /// Spawns a task that handles the operation. + /// + /// The task handles a single operation with a given command id, and via a channel it receives + /// operation state changes (if any) to drive an operation to completion. + fn spawn(operation: Arc) -> Self { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + + let handle = tokio::spawn(async move { + while let Some(message) = rx.recv().await { + if let UpdateStatus::Terminated = operation.update(message).await { + break; + } + } + }); + + Self { handle, tx } + } + + /// Updates the operation with new state. + /// + /// Can keep an operation running, or terminate it. Returns an error if operation panicked. + async fn update(self, message: OperationMessage) -> Result { + let send_result = self.tx.send(message); + + if send_result.is_err() { + self.handle.await?; + Ok(OperationStatus::Terminated) + } else { + Ok(OperationStatus::Ongoing(self)) + } + } +} + +enum OperationStatus { + Ongoing(RunningOperation), + Terminated, +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::time::Duration; + + use c8y_auth_proxy::url::Protocol; + use c8y_http_proxy::messages::C8YRestRequest; + use c8y_http_proxy::messages::C8YRestResult; + use tedge_actors::test_helpers::FakeServerBox; + use tedge_actors::test_helpers::FakeServerBoxBuilder; + use tedge_actors::test_helpers::MessageReceiverExt; + use tedge_actors::Builder; + use tedge_actors::MessageReceiver; + use tedge_actors::MessageSink; + use tedge_actors::Sender; + use tedge_actors::SimpleMessageBox; + use tedge_actors::SimpleMessageBoxBuilder; + use tedge_api::commands::ConfigSnapshotCmd; + use tedge_api::commands::ConfigSnapshotCmdPayload; + use tedge_api::entity_store::EntityExternalId; + use tedge_api::mqtt_topics::EntityTopicId; + use tedge_api::mqtt_topics::OperationType; + use tedge_api::CommandStatus; + use tedge_downloader_ext::DownloadResponse; + use tedge_mqtt_ext::Topic; + use tedge_test_utils::fs::TempTedgeDir; + use tedge_uploader_ext::UploadResponse; + + use crate::tests::test_mapper_config; + + const TEST_TIMEOUT_MS: Duration = Duration::from_millis(3000); + + #[tokio::test] + async fn handle_ignores_messages_that_are_not_operations() { + // system under test + let mut sut = setup_operation_handler().operation_handler; + let mqtt_schema = sut.context.mqtt_schema.clone(); + + let entity_topic_id = EntityTopicId::default_main_device(); + let entity_target = EntityTarget { + topic_id: entity_topic_id.clone(), + external_id: EntityExternalId::from("anything"), + smartrest_publish_topic: Topic::new("anything").unwrap(), + }; + + let message_wrong_entity = MqttMessage::new(&Topic::new("asdf").unwrap(), []); + sut.handle(entity_target.clone(), message_wrong_entity) + .await; + + assert_eq!(sut.running_operations.len(), 0); + + let topic = mqtt_schema.topic_for( + &entity_topic_id, + &Channel::CommandMetadata { + operation: OperationType::Restart, + }, + ); + let message_wrong_channel = MqttMessage::new(&topic, []); + sut.handle(entity_target, message_wrong_channel).await; + + assert_eq!(sut.running_operations.len(), 0); + } + + #[tokio::test] + async fn handle_joins_terminated_operations() { + let TestHandle { + operation_handler: mut sut, + downloader: dl, + uploader: ul, + mqtt, + c8y_proxy, + ttd: _ttd, + .. + } = setup_operation_handler(); + + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + let mut dl = dl.with_timeout(TEST_TIMEOUT_MS); + let mut ul = ul.with_timeout(TEST_TIMEOUT_MS); + let mut c8y_proxy = c8y_proxy.with_timeout(TEST_TIMEOUT_MS); + + let mqtt_schema = sut.context.mqtt_schema.clone(); + + let entity_topic_id = EntityTopicId::default_main_device(); + let entity_target = EntityTarget { + topic_id: entity_topic_id.clone(), + external_id: EntityExternalId::from("anything"), + smartrest_publish_topic: Topic::new("anything").unwrap(), + }; + + // spawn an operation to see if it's successfully joined when it's completed. + // particular operation used is not important, because we want to test only the handler. + // it would be even better if we could define some inline operation so test could be shorter + // TODO(marcel): don't assume operation implementations when testing the handler + let config_snapshot_operation = ConfigSnapshotCmd { + target: entity_topic_id, + cmd_id: "config-snapshot-1".to_string(), + payload: ConfigSnapshotCmdPayload { + status: CommandStatus::Successful, + tedge_url: Some("asdf".to_string()), + config_type: "typeA".to_string(), + path: None, + log_path: None, + }, + }; + + sut.handle( + entity_target.clone(), + config_snapshot_operation.command_message(&mqtt_schema), + ) + .await; + assert_eq!(sut.running_operations.len(), 1); + + dl.recv() + .await + .expect("downloader should receive DownloadRequest"); + + dl.send(( + "config-snapshot-1".to_string(), + Ok(DownloadResponse { + url: "asdf".to_string(), + file_path: "asdf".into(), + }), + )) + .await + .unwrap(); + + c8y_proxy + .recv() + .await + .expect("C8yProxy should receive CreateEvent"); + + c8y_proxy + .send(Ok(c8y_http_proxy::messages::C8YRestResponse::EventId( + "asdf".to_string(), + ))) + .await + .unwrap(); + + ul.recv() + .await + .expect("uploader should receive UploadRequest"); + + ul.send(( + "config-snapshot-1".to_string(), + Ok(UploadResponse { + url: "asdf".to_string(), + file_path: "asdf".into(), + }), + )) + .await + .unwrap(); + + assert_eq!(sut.running_operations.len(), 1); + + // skip 503 smartrest + mqtt.skip(1).await; + + let clearing_message = mqtt.recv().await.expect("MQTT should receive message"); + assert_eq!( + clearing_message, + config_snapshot_operation.clearing_message(&mqtt_schema) + ); + + assert_eq!(sut.running_operations.len(), 1); + + // finally, check that after handling clearing message, operation was joined + sut.handle(entity_target, clearing_message).await; + + assert_eq!(sut.running_operations.len(), 0); + } + + #[tokio::test] + #[should_panic] + async fn handle_should_panic_when_background_task_panics() { + // we're immediately dropping test's temporary directory, so we'll get an error that a + // directory for the operation could not be created + let TestHandle { + operation_handler: mut sut, + .. + } = setup_operation_handler(); + + let mqtt_schema = sut.context.mqtt_schema.clone(); + + let entity_topic_id = EntityTopicId::default_main_device(); + let entity_target = EntityTarget { + topic_id: entity_topic_id.clone(), + external_id: EntityExternalId::from("anything"), + smartrest_publish_topic: Topic::new("anything").unwrap(), + }; + + // spawn an operation to see if it's successfully joined when it's completed. + // particular operation used is not important, because we want to test only the handler. + // it would be even better if we could define some inline operation so test could be shorter + // TODO(marcel): don't assume operation implementations when testing the handler + let config_snapshot_operation = ConfigSnapshotCmd { + target: entity_topic_id, + cmd_id: "config-snapshot-1".to_string(), + payload: ConfigSnapshotCmdPayload { + status: CommandStatus::Successful, + tedge_url: Some("asdf".to_string()), + config_type: "typeA".to_string(), + path: None, + log_path: None, + }, + }; + + sut.handle( + entity_target.clone(), + config_snapshot_operation.command_message(&mqtt_schema), + ) + .await; + assert_eq!(sut.running_operations.len(), 1); + + // give OperationHandler time to handle message + // TODO(marcel): remove sleeps + tokio::time::sleep(Duration::from_millis(50)).await; + + // normally clearing message would be sent by operation task. + // Using it here just as a dummy, to call `handle` with the same cmd-id, so that it panics + sut.handle( + entity_target.clone(), + config_snapshot_operation.clearing_message(&mqtt_schema), + ) + .await; + } + + fn setup_operation_handler() -> TestHandle { + let ttd = TempTedgeDir::new(); + let c8y_mapper_config = test_mapper_config(&ttd); + + let mqtt_builder: SimpleMessageBoxBuilder = + SimpleMessageBoxBuilder::new("MQTT", 10); + let mqtt_publisher = LoggingSender::new("MQTT".to_string(), mqtt_builder.get_sender()); + + let mut c8y_proxy_builder: FakeServerBoxBuilder = + FakeServerBoxBuilder::default(); + let c8y_proxy = C8YHttpProxy::new(&mut c8y_proxy_builder); + + let mut uploader_builder: FakeServerBoxBuilder = + FakeServerBoxBuilder::default(); + let uploader = ClientMessageBox::new(&mut uploader_builder); + + let mut downloader_builder: FakeServerBoxBuilder = + FakeServerBoxBuilder::default(); + let downloader = ClientMessageBox::new(&mut downloader_builder); + + let auth_proxy_addr = c8y_mapper_config.auth_proxy_addr.clone(); + let auth_proxy_port = c8y_mapper_config.auth_proxy_port; + let auth_proxy = ProxyUrlGenerator::new(auth_proxy_addr, auth_proxy_port, Protocol::Http); + + let operation_handler = OperationHandler::new( + &c8y_mapper_config, + downloader, + uploader, + mqtt_publisher, + c8y_proxy, + auth_proxy, + ); + + let mqtt = mqtt_builder.build(); + let downloader = downloader_builder.build(); + let uploader = uploader_builder.build(); + let c8y_proxy = c8y_proxy_builder.build(); + + TestHandle { + mqtt, + downloader, + uploader, + c8y_proxy, + operation_handler, + ttd, + } + } + + struct TestHandle { + operation_handler: OperationHandler, + mqtt: SimpleMessageBox, + c8y_proxy: FakeServerBox, + uploader: FakeServerBox, + downloader: FakeServerBox, + ttd: TempTedgeDir, + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/config_snapshot.rs similarity index 88% rename from crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs rename to crates/extensions/c8y_mapper_ext/src/operations/handlers/config_snapshot.rs index c0a88cf12b8..d396cc85f99 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/config_snapshot.rs @@ -2,86 +2,16 @@ use super::error::OperationError; use super::EntityTarget; use super::OperationContext; use super::OperationOutcome; -use crate::converter::CumulocityConverter; -use crate::error::ConversionError; -use crate::error::CumulocityMapperError; use anyhow::Context; -use c8y_api::json_c8y_deserializer::C8yUploadConfigFile; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; use camino::Utf8PathBuf; use std::borrow::Cow; use tedge_api::commands::CommandStatus; use tedge_api::commands::ConfigSnapshotCmd; -use tedge_api::commands::ConfigSnapshotCmdPayload; -use tedge_api::mqtt_topics::Channel; -use tedge_api::mqtt_topics::ChannelFilter; -use tedge_api::mqtt_topics::EntityFilter; -use tedge_api::mqtt_topics::EntityTopicId; -use tedge_api::mqtt_topics::MqttSchema; -use tedge_api::mqtt_topics::OperationType; -use tedge_api::Jsonify; use tedge_downloader_ext::DownloadRequest; use tedge_mqtt_ext::MqttMessage; -use tedge_mqtt_ext::TopicFilter; use tracing::log::warn; -pub fn topic_filter(mqtt_schema: &MqttSchema) -> TopicFilter { - [ - mqtt_schema.topics( - EntityFilter::AnyEntity, - ChannelFilter::Command(OperationType::ConfigSnapshot), - ), - mqtt_schema.topics( - EntityFilter::AnyEntity, - ChannelFilter::CommandMetadata(OperationType::ConfigSnapshot), - ), - ] - .into_iter() - .collect() -} - -impl CumulocityConverter { - /// Convert c8y_UploadConfigFile JSON over MQTT operation to ThinEdge config_snapshot command - pub fn convert_config_snapshot_request( - &self, - device_xid: String, - cmd_id: String, - config_upload_request: C8yUploadConfigFile, - ) -> Result, CumulocityMapperError> { - let target = self - .entity_store - .try_get_by_external_id(&device_xid.into())?; - - let channel = Channel::Command { - operation: OperationType::ConfigSnapshot, - cmd_id: cmd_id.clone(), - }; - let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel); - - // Replace '/' with ':' to avoid creating unexpected directories in file transfer repo - let tedge_url = format!( - "http://{}/tedge/file-transfer/{}/config_snapshot/{}-{}", - &self.config.tedge_http_host, - target.external_id.as_ref(), - config_upload_request.config_type.replace('/', ":"), - cmd_id - ); - - let request = ConfigSnapshotCmdPayload { - status: CommandStatus::Init, - tedge_url: Some(tedge_url), - config_type: config_upload_request.config_type, - path: None, - log_path: None, - }; - - // Command messages must be retained - Ok(vec![ - MqttMessage::new(&topic, request.to_json()).with_retain() - ]) - } -} - impl OperationContext { /// Address received ThinEdge config_snapshot command. If its status is /// - "executing", it converts the message to SmartREST "Executing". @@ -197,24 +127,6 @@ impl OperationContext { } } -impl CumulocityConverter { - /// Converts a config_snapshot metadata message to - /// - supported operation "c8y_UploadConfigFile" - /// - supported config types - pub fn convert_config_snapshot_metadata( - &mut self, - topic_id: &EntityTopicId, - message: &MqttMessage, - ) -> Result, ConversionError> { - if !self.config.capabilities.config_snapshot { - warn!( - "Received config_snapshot metadata, however, config_snapshot feature is disabled" - ); - } - self.convert_config_metadata(topic_id, message, "c8y_UploadConfigFile") - } -} - #[cfg(test)] mod tests { use crate::config::C8yMapperConfig; diff --git a/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/config_update.rs similarity index 79% rename from crates/extensions/c8y_mapper_ext/src/operations/config_update.rs rename to crates/extensions/c8y_mapper_ext/src/operations/handlers/config_update.rs index 926fe7f12b4..14fd149e06d 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/config_update.rs @@ -1,25 +1,9 @@ -use crate::converter::CumulocityConverter; -use crate::error::ConversionError; -use crate::error::CumulocityMapperError; use anyhow::Context; -use c8y_api::json_c8y_deserializer::C8yDownloadConfigFile; use c8y_api::smartrest::smartrest_serializer::succeed_operation_no_payload; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; -use std::sync::Arc; use tedge_api::commands::CommandStatus; use tedge_api::commands::ConfigUpdateCmd; -use tedge_api::commands::ConfigUpdateCmdPayload; -use tedge_api::entity_store::EntityExternalId; -use tedge_api::entity_store::EntityMetadata; -use tedge_api::mqtt_topics::Channel; -use tedge_api::mqtt_topics::ChannelFilter; -use tedge_api::mqtt_topics::EntityFilter; -use tedge_api::mqtt_topics::EntityTopicId; -use tedge_api::mqtt_topics::MqttSchema; -use tedge_api::mqtt_topics::OperationType; -use tedge_api::Jsonify; use tedge_mqtt_ext::MqttMessage; -use tedge_mqtt_ext::TopicFilter; use tracing::log::warn; use super::error::OperationError; @@ -27,21 +11,6 @@ use super::EntityTarget; use super::OperationContext; use super::OperationOutcome; -pub fn topic_filter(mqtt_schema: &MqttSchema) -> TopicFilter { - [ - mqtt_schema.topics( - EntityFilter::AnyEntity, - ChannelFilter::Command(OperationType::ConfigUpdate), - ), - mqtt_schema.topics( - EntityFilter::AnyEntity, - ChannelFilter::CommandMetadata(OperationType::ConfigUpdate), - ), - ] - .into_iter() - .collect() -} - impl OperationContext { /// Address a received ThinEdge config_update command. If its status is /// - "executing", it converts the message to SmartREST "Executing". @@ -94,71 +63,6 @@ impl OperationContext { } } -impl CumulocityConverter { - /// Upon receiving a SmartREST c8y_DownloadConfigFile request, convert it to a message on the - /// command channel. - pub async fn convert_config_update_request( - &self, - device_xid: String, - cmd_id: String, - config_download_request: C8yDownloadConfigFile, - ) -> Result, CumulocityMapperError> { - let entity_xid: EntityExternalId = device_xid.into(); - let target = self.entity_store.try_get_by_external_id(&entity_xid)?; - - let message = - self.create_config_update_cmd(cmd_id.into(), &config_download_request, target); - Ok(message) - } - - /// Converts a config_update metadata message to - /// - supported operation "c8y_DownloadConfigFile" - /// - supported config types - pub fn convert_config_update_metadata( - &mut self, - topic_id: &EntityTopicId, - message: &MqttMessage, - ) -> Result, ConversionError> { - if !self.config.capabilities.config_update { - warn!("Received config_update metadata, however, config_update feature is disabled"); - return Ok(vec![]); - } - self.convert_config_metadata(topic_id, message, "c8y_DownloadConfigFile") - } - - fn create_config_update_cmd( - &self, - cmd_id: Arc, - config_download_request: &C8yDownloadConfigFile, - target: &EntityMetadata, - ) -> Vec { - let channel = Channel::Command { - operation: OperationType::ConfigUpdate, - cmd_id: cmd_id.to_string(), - }; - let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel); - - let proxy_url = self - .c8y_endpoint - .maybe_tenant_url(&config_download_request.url) - .map(|cumulocity_url| self.auth_proxy.proxy_url(cumulocity_url).into()); - - let remote_url = proxy_url.unwrap_or(config_download_request.url.to_string()); - - let request = ConfigUpdateCmdPayload { - status: CommandStatus::Init, - tedge_url: None, - remote_url, - config_type: config_download_request.config_type.clone(), - path: None, - log_path: None, - }; - - // Command messages must be retained - vec![MqttMessage::new(&topic, request.to_json()).with_retain()] - } -} - #[cfg(test)] mod tests { use crate::tests::skip_init_messages; diff --git a/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/firmware_update.rs similarity index 85% rename from crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs rename to crates/extensions/c8y_mapper_ext/src/operations/handlers/firmware_update.rs index 0b304fb51b6..cb1b938cf60 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/firmware_update.rs @@ -2,82 +2,18 @@ use super::error::OperationError; use super::EntityTarget; use super::OperationContext; use super::OperationOutcome; -use crate::converter::CumulocityConverter; -use crate::error::ConversionError; -use crate::error::CumulocityMapperError; use anyhow::Context; -use c8y_api::json_c8y_deserializer::C8yFirmware; use c8y_api::smartrest::smartrest_serializer::succeed_operation_no_payload; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; use tedge_api::commands::FirmwareInfo; use tedge_api::commands::FirmwareUpdateCmd; -use tedge_api::commands::FirmwareUpdateCmdPayload; -use tedge_api::entity_store::EntityExternalId; use tedge_api::mqtt_topics::Channel; -use tedge_api::mqtt_topics::ChannelFilter::Command; -use tedge_api::mqtt_topics::ChannelFilter::CommandMetadata; -use tedge_api::mqtt_topics::EntityFilter::AnyEntity; -use tedge_api::mqtt_topics::EntityTopicId; -use tedge_api::mqtt_topics::MqttSchema; -use tedge_api::mqtt_topics::OperationType; use tedge_api::CommandStatus; use tedge_api::Jsonify; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::QoS; -use tedge_mqtt_ext::TopicFilter; -use tracing::error; use tracing::warn; -pub fn firmware_update_topic_filter(mqtt_schema: &MqttSchema) -> TopicFilter { - [ - mqtt_schema.topics(AnyEntity, Command(OperationType::FirmwareUpdate)), - mqtt_schema.topics(AnyEntity, CommandMetadata(OperationType::FirmwareUpdate)), - ] - .into_iter() - .collect() -} - -impl CumulocityConverter { - /// Convert c8y_Firmware JSON over MQTT operation to ThinEdge firmware_update command. - pub fn convert_firmware_update_request( - &self, - device_xid: String, - cmd_id: String, - firmware_request: C8yFirmware, - ) -> Result, CumulocityMapperError> { - let entity_xid: EntityExternalId = device_xid.into(); - - let target = self.entity_store.try_get_by_external_id(&entity_xid)?; - - let channel = Channel::Command { - operation: OperationType::FirmwareUpdate, - cmd_id, - }; - let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel); - - let tedge_url = - if let Some(c8y_url) = self.c8y_endpoint.maybe_tenant_url(&firmware_request.url) { - self.auth_proxy.proxy_url(c8y_url).to_string() - } else { - firmware_request.url.clone() - }; - - let request = FirmwareUpdateCmdPayload { - status: CommandStatus::Init, - tedge_url: Some(tedge_url), - remote_url: firmware_request.url, - name: firmware_request.name, - version: firmware_request.version, - log_path: None, - }; - - // Command messages must be retained - Ok(vec![ - MqttMessage::new(&topic, request.to_json()).with_retain() - ]) - } -} - impl OperationContext { /// Address a received ThinEdge firmware_update command. If its status is /// - "executing", it converts the message to SmartREST "Executing". @@ -147,28 +83,6 @@ impl OperationContext { } } -impl CumulocityConverter { - pub fn register_firmware_update_operation( - &mut self, - topic_id: &EntityTopicId, - ) -> Result, ConversionError> { - if !self.config.capabilities.firmware_update { - warn!( - "Received firmware_update metadata, however, firmware_update feature is disabled" - ); - return Ok(vec![]); - } - - match self.register_operation(topic_id, "c8y_Firmware") { - Err(err) => { - error!("Failed to register `c8y_Firmware` operation for {topic_id} due to: {err}"); - Ok(vec![]) - } - Ok(messages) => Ok(messages), - } - } -} - #[cfg(test)] mod tests { use crate::tests::*; diff --git a/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/log_upload.rs similarity index 86% rename from crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs rename to crates/extensions/c8y_mapper_ext/src/operations/handlers/log_upload.rs index bcb51bd43b3..9dadf147b76 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/log_upload.rs @@ -1,85 +1,17 @@ use super::error::OperationError; use super::EntityTarget; use super::OperationContext; -use crate::converter::CumulocityConverter; -use crate::error::ConversionError; -use crate::error::CumulocityMapperError; -use crate::operations::OperationOutcome; +use super::OperationOutcome; use anyhow::Context; -use c8y_api::json_c8y_deserializer::C8yLogfileRequest; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; use camino::Utf8PathBuf; use tedge_api::commands::CommandStatus; -use tedge_api::commands::LogMetadata; use tedge_api::commands::LogUploadCmd; -use tedge_api::commands::LogUploadCmdPayload; -use tedge_api::mqtt_topics::Channel; -use tedge_api::mqtt_topics::ChannelFilter::Command; -use tedge_api::mqtt_topics::ChannelFilter::CommandMetadata; -use tedge_api::mqtt_topics::EntityFilter::AnyEntity; -use tedge_api::mqtt_topics::EntityTopicId; -use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; -use tedge_api::Jsonify; use tedge_downloader_ext::DownloadRequest; use tedge_mqtt_ext::MqttMessage; -use tedge_mqtt_ext::TopicFilter; -use tracing::log::error; use tracing::log::warn; -pub fn log_upload_topic_filter(mqtt_schema: &MqttSchema) -> TopicFilter { - [ - mqtt_schema.topics(AnyEntity, Command(OperationType::LogUpload)), - mqtt_schema.topics(AnyEntity, CommandMetadata(OperationType::LogUpload)), - ] - .into_iter() - .collect() -} - -impl CumulocityConverter { - /// Convert c8y_LogfileRequest operation to a ThinEdge log_upload command - pub fn convert_log_upload_request( - &self, - device_xid: String, - cmd_id: String, - log_request: C8yLogfileRequest, - ) -> Result, CumulocityMapperError> { - let target = self - .entity_store - .try_get_by_external_id(&device_xid.into())?; - - let channel = Channel::Command { - operation: OperationType::LogUpload, - cmd_id: cmd_id.clone(), - }; - let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel); - - let tedge_url = format!( - "http://{}/tedge/file-transfer/{}/log_upload/{}-{}", - &self.config.tedge_http_host, - target.external_id.as_ref(), - log_request.log_file, - cmd_id - ); - - let request = LogUploadCmdPayload { - status: CommandStatus::Init, - tedge_url, - log_type: log_request.log_file, - date_from: log_request.date_from, - date_to: log_request.date_to, - search_text: Some(log_request.search_text).filter(|s| !s.is_empty()), - lines: log_request.maximum_lines, - log_path: None, - }; - - // Command messages must be retained - Ok(vec![ - MqttMessage::new(&topic, request.to_json()).with_retain() - ]) - } -} - impl OperationContext { /// Address a received log_upload command. If its status is /// - "executing", it converts the message to SmartREST "Executing". @@ -185,43 +117,6 @@ impl OperationContext { } } -impl CumulocityConverter { - /// Converts a log_upload metadata message to - /// - supported operation "c8y_LogfileRequest" - /// - supported log types - pub fn convert_log_metadata( - &mut self, - topic_id: &EntityTopicId, - message: &MqttMessage, - ) -> Result, ConversionError> { - if !self.config.capabilities.log_upload { - warn!("Received log_upload metadata, however, log_upload feature is disabled"); - return Ok(vec![]); - } - - let mut messages = match self.register_operation(topic_id, "c8y_LogfileRequest") { - Err(err) => { - error!( - "Failed to register `c8y_LogfileRequest` operation for {topic_id} due to: {err}" - ); - return Ok(vec![]); - } - Ok(messages) => messages, - }; - - // To SmartREST supported log types - let metadata = LogMetadata::from_json(message.payload_str()?)?; - let mut types = metadata.types; - types.sort(); - let supported_log_types = types.join(","); - let payload = format!("118,{supported_log_types}"); - let c8y_topic = self.smartrest_publish_topic_for_entity(topic_id)?; - messages.push(MqttMessage::new(&c8y_topic, payload)); - - Ok(messages) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs new file mode 100644 index 00000000000..3b13475b422 --- /dev/null +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs @@ -0,0 +1,302 @@ +//! Handling of different types of thin-edge.io operations. + +mod config_snapshot; +mod config_update; +mod firmware_update; +mod log_upload; +mod restart; +mod software_list; +mod software_update; + +use super::error; +use super::error::OperationError; +use crate::actor::IdDownloadRequest; +use crate::actor::IdDownloadResult; +use crate::actor::IdUploadRequest; +use crate::actor::IdUploadResult; +use crate::Capabilities; +use c8y_api::http_proxy::C8yEndPoint; +use c8y_api::smartrest::smartrest_serializer::fail_operation; +use c8y_api::smartrest::smartrest_serializer::set_operation_executing; +use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; +use c8y_auth_proxy::url::ProxyUrlGenerator; +use c8y_http_proxy::handle::C8YHttpProxy; +use camino::Utf8Path; +use std::sync::Arc; +use tedge_actors::ClientMessageBox; +use tedge_actors::LoggingSender; +use tedge_actors::Sender; +use tedge_api::entity_store::EntityExternalId; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::IdGenerator; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::OperationType; +use tedge_api::workflow::GenericCommandState; +use tedge_config::AutoLogUpload; +use tedge_config::SoftwareManagementApiFlag; +use tedge_mqtt_ext::MqttMessage; +use tedge_mqtt_ext::QoS; +use tedge_mqtt_ext::Topic; +use tracing::debug; +use tracing::error; + +/// State required by the operation handlers. +pub(super) struct OperationContext { + pub(super) capabilities: Capabilities, + pub(super) auto_log_upload: AutoLogUpload, + pub(super) tedge_http_host: Arc, + pub(super) tmp_dir: Arc, + pub(super) mqtt_schema: MqttSchema, + pub(super) software_management_api: SoftwareManagementApiFlag, + pub(super) command_id: IdGenerator, + + pub(super) http_proxy: C8YHttpProxy, + pub(super) c8y_endpoint: C8yEndPoint, + pub(super) auth_proxy: ProxyUrlGenerator, + + pub(super) downloader: ClientMessageBox, + pub(super) uploader: ClientMessageBox, + pub(super) mqtt_publisher: LoggingSender, +} + +impl OperationContext { + pub async fn update(&self, message: OperationMessage) -> UpdateStatus { + let OperationMessage { + entity, + cmd_id, + message, + operation, + } = message; + let external_id = entity.external_id.clone(); + + let command = match GenericCommandState::from_command_message(&message) { + Ok(command) => command, + Err(err) => { + error!(%err, ?message, "could not parse command payload"); + return UpdateStatus::Terminated; + } + }; + + let operation_result = match operation { + OperationType::Health | OperationType::Custom(_) => { + debug!( + topic = message.topic.name, + ?operation, + "ignoring local-only operation" + ); + Ok(OperationOutcome::Ignored) + } + + OperationType::Restart => { + self.publish_restart_operation_status(&entity, &cmd_id, &message) + .await + } + // SoftwareList is not a regular operation: it doesn't update its status and doesn't report any + // failures; it just maps local software list to c8y software list payloads and sends it via MQTT + // Smartrest 2.0/HTTP + OperationType::SoftwareList => { + let result = self.publish_software_list(&entity, &cmd_id, &message).await; + + let mut mqtt_publisher = self.mqtt_publisher.clone(); + match result { + Err(err) => { + error!("Fail to list installed software packages: {err}"); + } + Ok(OperationOutcome::Finished { messages }) => { + for message in messages { + mqtt_publisher.send(message).await.unwrap(); + } + } + // command is not yet finished, avoid clearing the command topic + Ok(_) => return UpdateStatus::Ongoing, + } + + clear_command_topic(command, &mut mqtt_publisher).await; + return UpdateStatus::Terminated; + } + OperationType::SoftwareUpdate => { + self.publish_software_update_status(&entity, &cmd_id, &message) + .await + } + OperationType::LogUpload => { + self.handle_log_upload_state_change(&entity, &cmd_id, &message) + .await + } + OperationType::ConfigSnapshot => { + self.handle_config_snapshot_state_change(&entity, &cmd_id, &message) + .await + } + OperationType::ConfigUpdate => { + self.handle_config_update_state_change(&entity, &cmd_id, &message) + .await + } + OperationType::FirmwareUpdate => { + self.handle_firmware_update_state_change(&entity, &cmd_id, &message) + .await + } + }; + + let mut mqtt_publisher = self.mqtt_publisher.clone(); + + // unwrap is safe: at this point all local operations that are not regular c8y + // operations should be handled above + let c8y_operation = to_c8y_operation(&operation).unwrap(); + + match to_response( + operation_result, + c8y_operation, + &entity.smartrest_publish_topic, + ) { + OperationOutcome::Ignored => UpdateStatus::Ongoing, + OperationOutcome::Executing => { + let c8y_state_executing_payload = set_operation_executing(c8y_operation); + let c8y_state_executing_message = + MqttMessage::new(&entity.smartrest_publish_topic, c8y_state_executing_payload); + mqtt_publisher + .send(c8y_state_executing_message) + .await + .unwrap(); + + UpdateStatus::Ongoing + } + OperationOutcome::Finished { messages } => { + if let Err(e) = self + .upload_operation_log(&external_id, &cmd_id, &operation, &command) + .await + { + error!("failed to upload operation logs: {e}"); + } + + for message in messages { + mqtt_publisher.send(message).await.unwrap(); + } + + clear_command_topic(command, &mut mqtt_publisher).await; + + UpdateStatus::Terminated + } + } + } +} + +/// Whether or not this operation requires more messages to be handled or is it terminated. +pub enum UpdateStatus { + Ongoing, + Terminated, +} + +async fn clear_command_topic( + command: GenericCommandState, + mqtt_publisher: &mut LoggingSender, +) { + let command = command.clear(); + let clearing_message = command.into_message(); + assert!(clearing_message.payload_bytes().is_empty()); + assert!(clearing_message.retain); + assert_eq!(clearing_message.qos, QoS::AtLeastOnce); + mqtt_publisher.send(clearing_message).await.unwrap(); +} + +/// Result of an update of operation's state. +/// +/// When a new MQTT message is received with an updated state of the operation, the mapper needs to +/// do something in response. Depending on if it cares about the operation, it can ignore it, send +/// some MQTT messages to notify C8y about the state change, or terminate the operation. +pub(super) enum OperationOutcome { + /// Do nothing in response. + /// + /// Used for states that don't have an equivalent on C8y so we don't have to notify. + Ignored, + + /// Update C8y operation state to `EXECUTING`. + Executing, + + /// Operation is terminated. + /// + /// Operation state is either `SUCCESSFUL` or `FAILED`. Report state to C8y, send operation log, + /// clean local MQTT topic. + Finished { messages: Vec }, +} + +/// Converts operation result to valid C8y response. +fn to_response( + result: Result, + operation_type: CumulocitySupportedOperations, + smartrest_publish_topic: &Topic, +) -> OperationOutcome { + let err = match result { + Ok(res) => { + return res; + } + Err(err) => err, + }; + + // assuming `high level error: low level error: root cause error` error display impl + let set_operation_to_failed_payload = fail_operation(operation_type, &err.to_string()); + + let set_operation_to_failed_message = + MqttMessage::new(smartrest_publish_topic, set_operation_to_failed_payload); + + let messages = vec![set_operation_to_failed_message]; + + OperationOutcome::Finished { messages } +} + +/// For a given `OperationType`, obtain a matching `C8ySupportedOperations`. +/// +/// For `OperationType`s that don't have C8y operation equivalent, `None` is returned. +fn to_c8y_operation(operation_type: &OperationType) -> Option { + match operation_type { + OperationType::LogUpload => Some(CumulocitySupportedOperations::C8yLogFileRequest), + OperationType::Restart => Some(CumulocitySupportedOperations::C8yRestartRequest), + OperationType::ConfigSnapshot => Some(CumulocitySupportedOperations::C8yUploadConfigFile), + OperationType::ConfigUpdate => Some(CumulocitySupportedOperations::C8yDownloadConfigFile), + OperationType::FirmwareUpdate => Some(CumulocitySupportedOperations::C8yFirmware), + OperationType::SoftwareUpdate => Some(CumulocitySupportedOperations::C8ySoftwareUpdate), + // software list is not an c8y, only a fragment, but is a local operation that is spawned as + // part of C8y_SoftwareUpdate operation + OperationType::SoftwareList => None, + // local-only operation, not always invoked by c8y, handled in other codepath + OperationType::Health => None, + // other custom operations, no c8y equivalent + OperationType::Custom(_) => None, + } +} +/// An MQTT message that contains an operation payload. +/// +/// These are MQTT messages that contain operation payloads. These messages need to be passed to +/// tasks that handle a given operation to advance the operation and eventually complete it. +pub(super) struct OperationMessage { + pub(super) operation: OperationType, + pub(super) entity: EntityTarget, + pub(super) cmd_id: Arc, + pub(super) message: MqttMessage, +} + +/// A subset of entity-related information necessary to handle an operation. +/// +/// Because the operation may take time and other operations may run concurrently, we don't want to +/// query the entity store. +#[derive(Clone, Debug)] +pub struct EntityTarget { + pub topic_id: EntityTopicId, + pub external_id: EntityExternalId, + pub smartrest_publish_topic: Topic, +} + +pub fn get_smartrest_response_for_upload_result( + upload_result: tedge_uploader_ext::UploadResult, + binary_url: &str, + operation: c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations, +) -> c8y_api::smartrest::smartrest_serializer::SmartRest { + match upload_result { + Ok(_) => c8y_api::smartrest::smartrest_serializer::succeed_static_operation( + operation, + Some(binary_url), + ), + Err(err) => c8y_api::smartrest::smartrest_serializer::fail_operation( + operation, + &format!("Upload failed with {err}"), + ), + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/operations/restart.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/restart.rs similarity index 100% rename from crates/extensions/c8y_mapper_ext/src/operations/restart.rs rename to crates/extensions/c8y_mapper_ext/src/operations/handlers/restart.rs diff --git a/crates/extensions/c8y_mapper_ext/src/operations/software_list.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/software_list.rs similarity index 100% rename from crates/extensions/c8y_mapper_ext/src/operations/software_list.rs rename to crates/extensions/c8y_mapper_ext/src/operations/handlers/software_list.rs diff --git a/crates/extensions/c8y_mapper_ext/src/operations/software_update.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/software_update.rs similarity index 100% rename from crates/extensions/c8y_mapper_ext/src/operations/software_update.rs rename to crates/extensions/c8y_mapper_ext/src/operations/handlers/software_update.rs diff --git a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs index 9879e321119..9c59f1d56b0 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs @@ -1,809 +1,28 @@ //! Utilities for executing Cumulocity operations. //! -//! C8y operations need some special handling by the C8y mapper, which needs to use the C8y HTTP -//! proxy to report on their progress. Additionally, while executing operations we often need to -//! send messages to different actors and wait for their results before continuing. +//! C8y operations need some special handling by the C8y mapper, which needs to use Smartrest via +//! MQTT or C8y HTTP proxy to report on their progress. Additionally, while executing operations we +//! often need to send messages to different actors and wait for their results before continuing. //! //! The operations are always triggered remotely by Cumulocity, and a triggered operation must //! always terminate in a success or failure. This status needs to be reported to Cumulocity. //! //! This module contains: -//! - data definitions of various states which are necessary to maintain in the mapper -//! - status and error handing utilities for reporting operation success/failure in different ways -//! (MQTT, Smartrest) -//! - implementations of operations +//! - operation handler, which handles thin-edge operation MQTT messages by spawning tasks that +//! handle different operations ([`handler`]) +//! - conversion from C8y operation messages into thin-edge operation messages ([`convert`]) +//! - implementations of operations ([`handlers`]) //! -//! thin-edge.io operations reference: https://thin-edge.github.io/thin-edge.io/operate/c8y/supported-operations/ +//! thin-edge.io operations reference: +//! https://thin-edge.github.io/thin-edge.io/operate/c8y/supported-operations/ -use crate::actor::IdDownloadRequest; -use crate::actor::IdDownloadResult; -use crate::actor::IdUploadRequest; -use crate::actor::IdUploadResult; -use crate::config::C8yMapperConfig; -use crate::converter::CumulocityConverter; -use crate::error::ConversionError; -use crate::Capabilities; -use c8y_api::http_proxy::C8yEndPoint; -use c8y_api::smartrest::smartrest_serializer::fail_operation; -use c8y_api::smartrest::smartrest_serializer::set_operation_executing; -use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; -use c8y_auth_proxy::url::ProxyUrlGenerator; -use c8y_http_proxy::handle::C8YHttpProxy; -use camino::Utf8Path; -use error::OperationError; -use std::collections::HashMap; -use std::sync::Arc; -use tedge_actors::ClientMessageBox; -use tedge_actors::LoggingSender; -use tedge_actors::Sender; -use tedge_api::commands::ConfigMetadata; -use tedge_api::entity_store::EntityExternalId; -use tedge_api::mqtt_topics::Channel; -use tedge_api::mqtt_topics::EntityTopicId; -use tedge_api::mqtt_topics::IdGenerator; -use tedge_api::mqtt_topics::MqttSchema; -use tedge_api::mqtt_topics::OperationType; -use tedge_api::workflow::GenericCommandState; -use tedge_api::Jsonify; -use tedge_config::AutoLogUpload; -use tedge_config::SoftwareManagementApiFlag; -use tedge_mqtt_ext::MqttMessage; -use tedge_mqtt_ext::QoS; -use tedge_mqtt_ext::Topic; -use tracing::debug; -use tracing::error; - -pub mod config_snapshot; -pub mod config_update; +mod convert; mod error; -pub mod firmware_update; -pub mod log_upload; -mod restart; -mod software_list; -mod software_update; -mod upload; - -/// Handles operations. -/// -/// Handling an operation usually consists of 3 steps: -/// -/// 1. Receive a smartrest message which is an operation request, convert it to thin-edge message, -/// and publish on local MQTT (done by the converter). -/// 2. Various local thin-edge components/services (e.g. tedge-agent) execute the operation, and -/// when they're done, they publish an MQTT message with 'status: successful/failed' -/// 3. The cumulocity mapper needs to do some additional steps, like downloading/uploading files via -/// HTTP, or talking to C8y via HTTP proxy, before it can send operation response via the bridge -/// and then clear the local MQTT operation topic. -/// -/// This struct concerns itself with performing step 3. -/// -/// Incoming operation-related MQTT messages need to be passed to the [`Self::handle`] method, which -/// performs an operation in the background in separate tasks. The operation tasks themselves handle -/// reporting their success/failure. -pub struct OperationHandler { - context: Arc, - running_operations: HashMap, RunningOperation>, -} - -impl OperationHandler { - pub fn new( - c8y_mapper_config: &C8yMapperConfig, - - downloader: ClientMessageBox, - uploader: ClientMessageBox, - mqtt_publisher: LoggingSender, - - http_proxy: C8YHttpProxy, - auth_proxy: ProxyUrlGenerator, - ) -> Self { - Self { - context: Arc::new(OperationContext { - capabilities: c8y_mapper_config.capabilities, - auto_log_upload: c8y_mapper_config.auto_log_upload, - tedge_http_host: c8y_mapper_config.tedge_http_host.clone(), - tmp_dir: c8y_mapper_config.tmp_dir.clone(), - mqtt_schema: c8y_mapper_config.mqtt_schema.clone(), - mqtt_publisher: mqtt_publisher.clone(), - software_management_api: c8y_mapper_config.software_management_api, - - // TODO(marcel): would be good not to generate new ids from running operations, see if - // we can remove it somehow - command_id: IdGenerator::new(crate::converter::REQUESTER_NAME), - - downloader, - uploader, - - c8y_endpoint: C8yEndPoint::new( - &c8y_mapper_config.c8y_host, - &c8y_mapper_config.c8y_mqtt, - &c8y_mapper_config.device_id, - ), - http_proxy: http_proxy.clone(), - auth_proxy: auth_proxy.clone(), - }), - - running_operations: Default::default(), - } - } - - /// Handles an MQTT command id message. - /// - /// All MQTT messages with a topic that contains an operation id, e.g. - /// `te/device/child001///cmd/software_list/c8y-2023-09-25T14:34:00` need to be passed here for - /// operations to be processed. Messages not related to operations are ignored. - /// - /// `entity` needs to be the same entity as in `message`. - /// - /// When a message with a new id is handled, a task will be spawned that processes this - /// operation. For the operation to be completed, subsequent messages with the same command id - /// need to be handled as well. - /// - /// When an operation terminates (successfully or unsuccessfully), an MQTT operation clearing - /// message will be published to the broker by the running operation task, but this message also - /// needs to be handled when an MQTT broker echoes it back to us, so that `OperationHandler` can - /// free the data associated with the operation. - /// - /// # Panics - /// - /// Will panic if a task that runs the operation has panicked. The task can panic if e.g. MQTT - /// send returns an error or the task encountered any other unexpected error that makes it - /// impossible to finish handling the operation (i.e. send MQTT clearing message and report - /// operation status to c8y). - /// - /// The panic in the operation task has to happen first, and then another message with the same - /// command id has to be handled for the call to `.handle()` to panic. - - // but there's a problem: in practice, when a panic in a child task happens, .handle() will - // never get called for that operation again. Operation task itself sends the messages, so if - // they can't be sent over MQTT because of a panic, they won't be handled, won't be joined, so - // we will not see that an exception has occurred. - // FIXME(marcel): ensure panics are always propagated without the caller having to ask for them - pub async fn handle(&mut self, entity: EntityTarget, message: MqttMessage) { - let Ok((_, channel)) = self.context.mqtt_schema.entity_channel_of(&message.topic) else { - return; - }; - - let Channel::Command { operation, cmd_id } = channel else { - return; - }; - - let message = OperationMessage { - operation, - entity, - cmd_id: cmd_id.into(), - message, - }; - - let topic: Arc = message.message.topic.name.clone().into(); - let terminated_operation = { - let op = self.running_operations.get(&topic); - - if let Some(running_operation) = op { - // task already terminated - if running_operation.tx.send(message).is_err() { - let running_operation = self.running_operations.remove(&topic).unwrap(); - Some(running_operation) - } else { - None - } - } else { - let running_operation = RunningOperation::spawn(message, Arc::clone(&self.context)); - - self.running_operations - .insert(topic.clone(), running_operation); - None - } - }; - - if let Some(terminated_operation) = terminated_operation { - terminated_operation - .handle - .await - .expect("operation task should not panic"); - } - } -} - -pub struct RunningOperation { - handle: tokio::task::JoinHandle<()>, - tx: tokio::sync::mpsc::UnboundedSender, -} - -impl RunningOperation { - /// Spawns a task that handles the operation. - /// - /// The task handles a single operation with a given command id, and via a channel it receives - /// operation state changes (if any) to drive an operation to completion. - fn spawn(message: OperationMessage, context: Arc) -> Self { - let cmd_id = message.cmd_id.clone(); - - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - tx.send(message).unwrap(); - - let handle = tokio::spawn(async move { - while let Some(message) = rx.recv().await { - if message.cmd_id != cmd_id { - debug!( - msg_cmd_id = %message.cmd_id, - %cmd_id, "operation-related message was routed incorrectly" - ); - continue; - } - - let OperationMessage { - entity, - cmd_id, - message, - operation, - } = message; - let external_id = entity.external_id.clone(); - - let command = match GenericCommandState::from_command_message(&message) { - Ok(command) => command, - Err(err) => { - error!(%err, ?message, "could not parse command payload"); - return; - } - }; - - let operation_result = match operation { - OperationType::Health | OperationType::Custom(_) => { - debug!( - topic = message.topic.name, - ?operation, - "ignoring local-only operation" - ); - Ok(OperationOutcome::Ignored) - } - - OperationType::Restart => { - context - .publish_restart_operation_status(&entity, &cmd_id, &message) - .await - } - // SoftwareList is not a regular operation: it doesn't update its status and doesn't report any - // failures; it just maps local software list to c8y software list payloads and sends it via MQTT - // Smartrest 2.0/HTTP - OperationType::SoftwareList => { - let result = context - .publish_software_list(&entity, &cmd_id, &message) - .await; - - let mut mqtt_publisher = context.mqtt_publisher.clone(); - match result { - Err(err) => { - error!("Fail to list installed software packages: {err}"); - } - Ok(OperationOutcome::Finished { messages }) => { - for message in messages { - mqtt_publisher.send(message).await.unwrap(); - } - } - // command is not yet finished, avoid clearing the command topic - Ok(_) => { - continue; - } - } - - clear_command_topic(command, &mut mqtt_publisher).await; - rx.close(); - continue; - } - OperationType::SoftwareUpdate => { - context - .publish_software_update_status(&entity, &cmd_id, &message) - .await - } - OperationType::LogUpload => { - context - .handle_log_upload_state_change(&entity, &cmd_id, &message) - .await - } - OperationType::ConfigSnapshot => { - context - .handle_config_snapshot_state_change(&entity, &cmd_id, &message) - .await - } - OperationType::ConfigUpdate => { - context - .handle_config_update_state_change(&entity, &cmd_id, &message) - .await - } - OperationType::FirmwareUpdate => { - context - .handle_firmware_update_state_change(&entity, &cmd_id, &message) - .await - } - }; - - let mut mqtt_publisher = context.mqtt_publisher.clone(); - - // unwrap is safe: at this point all local operations that are not regular c8y - // operations should be handled above - let c8y_operation = to_c8y_operation(&operation).unwrap(); - - match to_response( - operation_result, - c8y_operation, - &entity.smartrest_publish_topic, - ) { - OperationOutcome::Ignored => {} - OperationOutcome::Executing => { - let c8y_state_executing_payload = set_operation_executing(c8y_operation); - let c8y_state_executing_message = MqttMessage::new( - &entity.smartrest_publish_topic, - c8y_state_executing_payload, - ); - mqtt_publisher - .send(c8y_state_executing_message) - .await - .unwrap(); - } - OperationOutcome::Finished { messages } => { - if let Err(e) = context - .upload_operation_log(&external_id, &cmd_id, &operation, &command) - .await - { - error!("failed to upload operation logs: {e}"); - } - - for message in messages { - mqtt_publisher.send(message).await.unwrap(); - } - - clear_command_topic(command, &mut mqtt_publisher).await; - - rx.close(); - } - } - } - }); - - Self { handle, tx } - } -} - -async fn clear_command_topic( - command: GenericCommandState, - mqtt_publisher: &mut LoggingSender, -) { - let command = command.clear(); - let clearing_message = command.into_message(); - assert!(clearing_message.payload_bytes().is_empty()); - assert!(clearing_message.retain); - assert_eq!(clearing_message.qos, QoS::AtLeastOnce); - mqtt_publisher.send(clearing_message).await.unwrap(); -} - -/// Result of an update of operation's state. -/// -/// When a new MQTT message is received with an updated state of the operation, the mapper needs to -/// do something in response. Depending on if it cares about the operation, it can ignore it, send -/// some MQTT messages to notify C8y about the state change, or terminate the operation. -enum OperationOutcome { - /// Do nothing in response. - /// - /// Used for states that don't have an equivalent on C8y so we don't have to notify. - Ignored, - - /// Update C8y operation state to `EXECUTING`. - Executing, - - /// Operation is terminated. - /// - /// Operation state is either `SUCCESSFUL` or `FAILED`. Report state to C8y, send operation log, - /// clean local MQTT topic. - Finished { messages: Vec }, -} - -/// Converts operation result to valid C8y response. -fn to_response( - result: Result, - operation_type: CumulocitySupportedOperations, - smartrest_publish_topic: &Topic, -) -> OperationOutcome { - let err = match result { - Ok(res) => { - return res; - } - Err(err) => err, - }; - - // assuming `high level error: low level error: root cause error` error display impl - let set_operation_to_failed_payload = fail_operation(operation_type, &err.to_string()); - - let set_operation_to_failed_message = - MqttMessage::new(smartrest_publish_topic, set_operation_to_failed_payload); - - let messages = vec![set_operation_to_failed_message]; - - OperationOutcome::Finished { messages } -} - -/// For a given `OperationType`, obtain a matching `C8ySupportedOperations`. -/// -/// For `OperationType`s that don't have C8y operation equivalent, `None` is returned. -fn to_c8y_operation(operation_type: &OperationType) -> Option { - match operation_type { - OperationType::LogUpload => Some(CumulocitySupportedOperations::C8yLogFileRequest), - OperationType::Restart => Some(CumulocitySupportedOperations::C8yRestartRequest), - OperationType::ConfigSnapshot => Some(CumulocitySupportedOperations::C8yUploadConfigFile), - OperationType::ConfigUpdate => Some(CumulocitySupportedOperations::C8yDownloadConfigFile), - OperationType::FirmwareUpdate => Some(CumulocitySupportedOperations::C8yFirmware), - OperationType::SoftwareUpdate => Some(CumulocitySupportedOperations::C8ySoftwareUpdate), - // software list is not an c8y, only a fragment, but is a local operation that is spawned as - // part of C8y_SoftwareUpdate operation - OperationType::SoftwareList => None, - // local-only operation, not always invoked by c8y, handled in other codepath - OperationType::Health => None, - // other custom operations, no c8y equivalent - OperationType::Custom(_) => None, - } -} - -/// State required by the operation handlers. -struct OperationContext { - capabilities: Capabilities, - auto_log_upload: AutoLogUpload, - tedge_http_host: Arc, - tmp_dir: Arc, - mqtt_schema: MqttSchema, - software_management_api: SoftwareManagementApiFlag, - command_id: IdGenerator, - - http_proxy: C8YHttpProxy, - c8y_endpoint: C8yEndPoint, - auth_proxy: ProxyUrlGenerator, - - downloader: ClientMessageBox, - uploader: ClientMessageBox, - mqtt_publisher: LoggingSender, -} - -/// An MQTT message that contains an operation payload. -/// -/// These are MQTT messages that contain operation payloads. These messages need to be passed to -/// tasks that handle a given operation to advance the operation and eventually complete it. -struct OperationMessage { - operation: OperationType, - entity: EntityTarget, - cmd_id: Arc, - message: MqttMessage, -} - -/// A subset of entity-related information necessary to handle an operation. -/// -/// Because the operation may take time and other operations may run concurrently, we don't want to -/// query the entity store. -#[derive(Clone, Debug)] -pub struct EntityTarget { - pub topic_id: EntityTopicId, - pub external_id: EntityExternalId, - pub smartrest_publish_topic: Topic, -} -impl CumulocityConverter { - fn convert_config_metadata( - &mut self, - topic_id: &EntityTopicId, - message: &MqttMessage, - c8y_op_name: &str, - ) -> Result, ConversionError> { - let metadata = ConfigMetadata::from_json(message.payload_str()?)?; +mod handler; +pub use handler::OperationHandler; - let mut messages = match self.register_operation(topic_id, c8y_op_name) { - Err(err) => { - error!("Failed to register {c8y_op_name} operation for {topic_id} due to: {err}"); - return Ok(vec![]); - } - Ok(messages) => messages, - }; +mod handlers; +pub use handlers::EntityTarget; - // To SmartREST supported config types - let mut types = metadata.types; - types.sort(); - let supported_config_types = types.join(","); - let payload = format!("119,{supported_config_types}"); - let sm_topic = self.smartrest_publish_topic_for_entity(topic_id)?; - messages.push(MqttMessage::new(&sm_topic, payload)); - - Ok(messages) - } -} - -fn get_smartrest_response_for_upload_result( - upload_result: tedge_uploader_ext::UploadResult, - binary_url: &str, - operation: c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations, -) -> c8y_api::smartrest::smartrest_serializer::SmartRest { - match upload_result { - Ok(_) => c8y_api::smartrest::smartrest_serializer::succeed_static_operation( - operation, - Some(binary_url), - ), - Err(err) => c8y_api::smartrest::smartrest_serializer::fail_operation( - operation, - &format!("Upload failed with {err}"), - ), - } -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use c8y_auth_proxy::url::Protocol; - use c8y_http_proxy::messages::C8YRestRequest; - use c8y_http_proxy::messages::C8YRestResult; - use tedge_actors::test_helpers::FakeServerBox; - use tedge_actors::test_helpers::FakeServerBoxBuilder; - use tedge_actors::test_helpers::MessageReceiverExt; - use tedge_actors::Builder; - use tedge_actors::MessageReceiver; - use tedge_actors::MessageSink; - use tedge_actors::SimpleMessageBox; - use tedge_actors::SimpleMessageBoxBuilder; - use tedge_api::commands::ConfigSnapshotCmd; - use tedge_api::commands::ConfigSnapshotCmdPayload; - use tedge_api::CommandStatus; - use tedge_downloader_ext::DownloadResponse; - use tedge_test_utils::fs::TempTedgeDir; - use tedge_uploader_ext::UploadResponse; - - use crate::tests::test_mapper_config; - - use super::*; - - const TEST_TIMEOUT_MS: Duration = Duration::from_millis(3000); - - #[tokio::test] - async fn handle_ignores_messages_that_are_not_operations() { - // system under test - let mut sut = setup_operation_handler().operation_handler; - let mqtt_schema = sut.context.mqtt_schema.clone(); - - let entity_topic_id = EntityTopicId::default_main_device(); - let entity_target = EntityTarget { - topic_id: entity_topic_id.clone(), - external_id: EntityExternalId::from("anything"), - smartrest_publish_topic: Topic::new("anything").unwrap(), - }; - - let message_wrong_entity = MqttMessage::new(&Topic::new("asdf").unwrap(), []); - sut.handle(entity_target.clone(), message_wrong_entity) - .await; - - assert_eq!(sut.running_operations.len(), 0); - - let topic = mqtt_schema.topic_for( - &entity_topic_id, - &Channel::CommandMetadata { - operation: OperationType::Restart, - }, - ); - let message_wrong_channel = MqttMessage::new(&topic, []); - sut.handle(entity_target, message_wrong_channel).await; - - assert_eq!(sut.running_operations.len(), 0); - } - - #[tokio::test] - async fn handle_joins_terminated_operations() { - let TestHandle { - operation_handler: mut sut, - downloader: dl, - uploader: ul, - mqtt, - c8y_proxy, - ttd: _ttd, - .. - } = setup_operation_handler(); - - let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); - let mut dl = dl.with_timeout(TEST_TIMEOUT_MS); - let mut ul = ul.with_timeout(TEST_TIMEOUT_MS); - let mut c8y_proxy = c8y_proxy.with_timeout(TEST_TIMEOUT_MS); - - let mqtt_schema = sut.context.mqtt_schema.clone(); - - let entity_topic_id = EntityTopicId::default_main_device(); - let entity_target = EntityTarget { - topic_id: entity_topic_id.clone(), - external_id: EntityExternalId::from("anything"), - smartrest_publish_topic: Topic::new("anything").unwrap(), - }; - - // spawn an operation to see if it's successfully joined when it's completed. - // particular operation used is not important, because we want to test only the handler. - // it would be even better if we could define some inline operation so test could be shorter - // TODO(marcel): don't assume operation implementations when testing the handler - let config_snapshot_operation = ConfigSnapshotCmd { - target: entity_topic_id, - cmd_id: "config-snapshot-1".to_string(), - payload: ConfigSnapshotCmdPayload { - status: CommandStatus::Successful, - tedge_url: Some("asdf".to_string()), - config_type: "typeA".to_string(), - path: None, - log_path: None, - }, - }; - - sut.handle( - entity_target.clone(), - config_snapshot_operation.command_message(&mqtt_schema), - ) - .await; - assert_eq!(sut.running_operations.len(), 1); - - dl.recv() - .await - .expect("downloader should receive DownloadRequest"); - - dl.send(( - "config-snapshot-1".to_string(), - Ok(DownloadResponse { - url: "asdf".to_string(), - file_path: "asdf".into(), - }), - )) - .await - .unwrap(); - - c8y_proxy - .recv() - .await - .expect("C8yProxy should receive CreateEvent"); - - c8y_proxy - .send(Ok(c8y_http_proxy::messages::C8YRestResponse::EventId( - "asdf".to_string(), - ))) - .await - .unwrap(); - - ul.recv() - .await - .expect("uploader should receive UploadRequest"); - - ul.send(( - "config-snapshot-1".to_string(), - Ok(UploadResponse { - url: "asdf".to_string(), - file_path: "asdf".into(), - }), - )) - .await - .unwrap(); - - assert_eq!(sut.running_operations.len(), 1); - - // skip 503 smartrest - mqtt.skip(1).await; - - let clearing_message = mqtt.recv().await.expect("MQTT should receive message"); - assert_eq!( - clearing_message, - config_snapshot_operation.clearing_message(&mqtt_schema) - ); - - assert_eq!(sut.running_operations.len(), 1); - - // finally, check that after handling clearing message, operation was joined - sut.handle(entity_target, clearing_message).await; - - assert_eq!(sut.running_operations.len(), 0); - } - - #[tokio::test] - #[should_panic] - async fn handle_should_panic_when_background_task_panics() { - // we're immediately dropping test's temporary directory, so we'll get an error that a - // directory for the operation could not be created - let TestHandle { - operation_handler: mut sut, - .. - } = setup_operation_handler(); - - let mqtt_schema = sut.context.mqtt_schema.clone(); - - let entity_topic_id = EntityTopicId::default_main_device(); - let entity_target = EntityTarget { - topic_id: entity_topic_id.clone(), - external_id: EntityExternalId::from("anything"), - smartrest_publish_topic: Topic::new("anything").unwrap(), - }; - - // spawn an operation to see if it's successfully joined when it's completed. - // particular operation used is not important, because we want to test only the handler. - // it would be even better if we could define some inline operation so test could be shorter - // TODO(marcel): don't assume operation implementations when testing the handler - let config_snapshot_operation = ConfigSnapshotCmd { - target: entity_topic_id, - cmd_id: "config-snapshot-1".to_string(), - payload: ConfigSnapshotCmdPayload { - status: CommandStatus::Successful, - tedge_url: Some("asdf".to_string()), - config_type: "typeA".to_string(), - path: None, - log_path: None, - }, - }; - - sut.handle( - entity_target.clone(), - config_snapshot_operation.command_message(&mqtt_schema), - ) - .await; - assert_eq!(sut.running_operations.len(), 1); - - // give OperationHandler time to handle message - // TODO(marcel): remove sleeps - tokio::time::sleep(Duration::from_millis(50)).await; - - // normally clearing message would be sent by operation task. - // Using it here just as a dummy, to call `handle` with the same cmd-id, so that it panics - sut.handle( - entity_target.clone(), - config_snapshot_operation.clearing_message(&mqtt_schema), - ) - .await; - } - - fn setup_operation_handler() -> TestHandle { - let ttd = TempTedgeDir::new(); - let c8y_mapper_config = test_mapper_config(&ttd); - - let mqtt_builder: SimpleMessageBoxBuilder = - SimpleMessageBoxBuilder::new("MQTT", 10); - let mqtt_publisher = LoggingSender::new("MQTT".to_string(), mqtt_builder.get_sender()); - - let mut c8y_proxy_builder: FakeServerBoxBuilder = - FakeServerBoxBuilder::default(); - let c8y_proxy = C8YHttpProxy::new(&mut c8y_proxy_builder); - - let mut uploader_builder: FakeServerBoxBuilder = - FakeServerBoxBuilder::default(); - let uploader = ClientMessageBox::new(&mut uploader_builder); - - let mut downloader_builder: FakeServerBoxBuilder = - FakeServerBoxBuilder::default(); - let downloader = ClientMessageBox::new(&mut downloader_builder); - - let auth_proxy_addr = c8y_mapper_config.auth_proxy_addr.clone(); - let auth_proxy_port = c8y_mapper_config.auth_proxy_port; - let auth_proxy = ProxyUrlGenerator::new(auth_proxy_addr, auth_proxy_port, Protocol::Http); - - let operation_handler = OperationHandler::new( - &c8y_mapper_config, - downloader, - uploader, - mqtt_publisher, - c8y_proxy, - auth_proxy, - ); - - let mqtt = mqtt_builder.build(); - let downloader = downloader_builder.build(); - let uploader = uploader_builder.build(); - let c8y_proxy = c8y_proxy_builder.build(); - - TestHandle { - mqtt, - downloader, - uploader, - c8y_proxy, - operation_handler, - ttd, - } - } - - struct TestHandle { - operation_handler: OperationHandler, - mqtt: SimpleMessageBox, - c8y_proxy: FakeServerBox, - uploader: FakeServerBox, - downloader: FakeServerBox, - ttd: TempTedgeDir, - } -} +mod upload; diff --git a/crates/extensions/c8y_mapper_ext/src/operations/upload.rs b/crates/extensions/c8y_mapper_ext/src/operations/upload.rs index 4da608f3ab2..ce586177424 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/upload.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/upload.rs @@ -13,10 +13,9 @@ use tedge_uploader_ext::UploadResult; use time::OffsetDateTime; use url::Url; +use super::handlers::OperationContext; use crate::error::ConversionError; -use super::OperationContext; - impl OperationContext { #[allow(clippy::too_many_arguments)] pub(crate) async fn upload_file( diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index bee21579af3..daef3881056 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -8,6 +8,7 @@ use crate::actor::IdUploadRequest; use crate::actor::IdUploadResult; use crate::actor::PublishMessage; use crate::availability::AvailabilityBuilder; +use crate::operations::OperationHandler; use crate::Capabilities; use assert_json_diff::assert_json_include; use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; @@ -2779,13 +2780,20 @@ pub(crate) fn test_mapper_config(tmp_dir: &TempTedgeDir) -> C8yMapperConfig { let mut topics = C8yMapperConfig::default_internal_topic_filter(tmp_dir.path(), &"c8y".try_into().unwrap()) .unwrap(); - topics.add_all(crate::operations::log_upload::log_upload_topic_filter( - &mqtt_schema, - )); - topics.add_all(crate::operations::config_snapshot::topic_filter( - &mqtt_schema, - )); - topics.add_all(crate::operations::config_update::topic_filter(&mqtt_schema)); + + let capabilities = Capabilities { + log_upload: true, + config_snapshot: true, + config_update: true, + firmware_update: true, + }; + + let operation_topics = OperationHandler::topic_filter(&capabilities) + .into_iter() + .map(|(e, c)| mqtt_schema.topics(e, c)) + .collect(); + topics.add_all(operation_topics); + topics.add_all(C8yMapperConfig::default_external_topic_filter()); C8yMapperConfig::new( @@ -2801,7 +2809,7 @@ pub(crate) fn test_mapper_config(tmp_dir: &TempTedgeDir) -> C8yMapperConfig { c8y_host, tedge_http_host, topics, - Capabilities::default(), + capabilities, auth_proxy_addr, auth_proxy_port, Protocol::Http,