diff --git a/Cargo.lock b/Cargo.lock index e7f606981a8..9b681fcda79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -780,7 +780,6 @@ dependencies = [ "clock", "json-writer", "mime", - "mockito", "plugin_sm", "proptest", "rand", diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index c6a89cb69fb..c4f1c945e8a 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -212,17 +212,24 @@ impl TEdgeComponent for CumulocityMapper { )?); let mut c8y_mapper_actor = C8yMapperBuilder::try_new( - c8y_mapper_config, + c8y_mapper_config.clone(), &mut mqtt_actor, &mut c8y_http_proxy_actor, &mut timer_actor, - &mut uploader_actor, - &mut downloader_actor, &mut fs_watch_actor, &mut service_monitor_actor, )?; let c8y_prefix = &tedge_config.c8y.bridge.topic_prefix; + + let operation_handler_actor = c8y_mapper_ext::operations::OperationHandlerBuilder::new( + c8y_mapper_config.to_operation_handler_config(), + &mut c8y_mapper_actor, + &mut uploader_actor, + &mut downloader_actor, + &mut c8y_http_proxy_actor, + ); + // Adaptor translating commands sent on te/device/main///cmd/+/+ into requests on tedge/commands/req/+/+ // and translating the responses received on tedge/commands/res/+/+ to te/device/main///cmd/+/+ let old_to_new_agent_adapter = OldAgentAdapter::builder(c8y_prefix, &mut mqtt_actor); @@ -252,6 +259,7 @@ impl TEdgeComponent for CumulocityMapper { if let Some(availability_actor) = availability_actor { runtime.spawn(availability_actor).await?; } + runtime.spawn(operation_handler_actor).await?; runtime.run_to_completion().await?; Ok(()) diff --git a/crates/extensions/c8y_mapper_ext/Cargo.toml b/crates/extensions/c8y_mapper_ext/Cargo.toml index f75dacd39dc..72683a6a21b 100644 --- a/crates/extensions/c8y_mapper_ext/Cargo.toml +++ b/crates/extensions/c8y_mapper_ext/Cargo.toml @@ -49,7 +49,6 @@ url = { workspace = true } [dev-dependencies] assert-json-diff = { workspace = true } assert_matches = { workspace = true } -mockito = { workspace = true } proptest = { workspace = true } rand = { workspace = true } tedge_actors = { workspace = true, features = ["test-helpers"] } diff --git a/crates/extensions/c8y_mapper_ext/src/actor.rs b/crates/extensions/c8y_mapper_ext/src/actor.rs index b7b5b1409f8..270008cdb54 100644 --- a/crates/extensions/c8y_mapper_ext/src/actor.rs +++ b/crates/extensions/c8y_mapper_ext/src/actor.rs @@ -13,7 +13,6 @@ use std::time::Duration; use tedge_actors::fan_in_message_type; use tedge_actors::Actor; use tedge_actors::Builder; -use tedge_actors::ClientMessageBox; use tedge_actors::CloneSender; use tedge_actors::DynSender; use tedge_actors::LoggingSender; @@ -27,6 +26,7 @@ use tedge_actors::Sender; use tedge_actors::Service; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; +use tedge_api::entity_store::EntityMetadata; use tedge_api::entity_store::EntityRegistrationMessage; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::ChannelFilter; @@ -80,6 +80,9 @@ pub struct C8yMapperActor { timer_sender: LoggingSender, bridge_status_messages: SimpleMessageBox, message_handlers: HashMap>>, + // these handlers require entity metadata, so the entity already has to be registered + registered_message_handlers: + HashMap>>, } #[async_trait] @@ -140,6 +143,10 @@ impl C8yMapperActor { timer_sender: LoggingSender, bridge_status_messages: SimpleMessageBox, message_handlers: HashMap>>, + registered_message_handlers: HashMap< + ChannelFilter, + Vec>, + >, ) -> Self { Self { converter, @@ -148,6 +155,7 @@ impl C8yMapperActor { timer_sender, bridge_status_messages, message_handlers, + registered_message_handlers, } } @@ -267,6 +275,25 @@ impl C8yMapperActor { sender.send(message.clone()).await?; } } + + if let Some(message_handler) = self.registered_message_handlers.get_mut(&channel.into()) { + let (entity, _) = self + .converter + .mqtt_schema + .entity_channel_of(&message.topic) + .expect("message should've been confirmed to be using MQTT topic scheme v1"); + + let entity = self + .converter + .entity_store + .get(&entity) + .expect("entity should've already been registered"); + + for sender in message_handler { + sender.send((message.clone(), entity.clone())).await?; + } + } + Ok(()) } @@ -331,11 +358,11 @@ pub struct C8yMapperBuilder { mqtt_publisher: DynSender, http_proxy: C8YHttpProxy, timer_sender: DynSender, - downloader: ClientMessageBox, - uploader: ClientMessageBox, auth_proxy: ProxyUrlGenerator, bridge_monitor_builder: SimpleMessageBoxBuilder, message_handlers: HashMap>>, + registered_message_handlers: + HashMap>>, } impl C8yMapperBuilder { @@ -345,8 +372,6 @@ impl C8yMapperBuilder { mqtt: &mut (impl MessageSource + MessageSink), http: &mut impl Service, timer: &mut impl Service, - uploader: &mut impl Service, - downloader: &mut impl Service, fs_watcher: &mut impl MessageSource, service_monitor: &mut (impl MessageSource + MessageSink), ) -> Result { @@ -360,9 +385,6 @@ impl C8yMapperBuilder { let http_proxy = C8YHttpProxy::new(http); let timer_sender = timer.connect_client(box_builder.get_sender().sender_clone()); - let downloader = ClientMessageBox::new(downloader); - let uploader = ClientMessageBox::new(uploader); - fs_watcher.connect_sink( config.ops_dir.as_std_path().to_path_buf(), &box_builder.get_sender(), @@ -382,6 +404,7 @@ impl C8yMapperBuilder { ); let message_handlers = HashMap::new(); + let registered_message_handlers = HashMap::new(); Ok(Self { config, @@ -389,11 +412,10 @@ impl C8yMapperBuilder { mqtt_publisher, http_proxy, timer_sender, - uploader, - downloader, auth_proxy, bridge_monitor_builder, message_handlers, + registered_message_handlers, }) } @@ -426,6 +448,22 @@ impl MessageSource> for C8yMapperBuilder { } } +impl MessageSource<(MqttMessage, EntityMetadata), Vec> for C8yMapperBuilder { + fn connect_sink( + &mut self, + config: Vec, + peer: &impl MessageSink<(MqttMessage, EntityMetadata)>, + ) { + let sender = LoggingSender::new("Mapper MQTT".into(), peer.get_sender()); + for channel in config { + self.registered_message_handlers + .entry(channel) + .or_default() + .push(sender.clone()); + } + } +} + impl MessageSink for C8yMapperBuilder { fn get_sender(&self) -> DynSender { self.box_builder.get_sender().sender_clone() @@ -444,8 +482,6 @@ impl Builder for C8yMapperBuilder { mqtt_publisher.clone(), self.http_proxy, self.auth_proxy, - self.uploader, - self.downloader, ) .map_err(|err| RuntimeError::ActorError(Box::new(err)))?; @@ -459,6 +495,7 @@ impl Builder for C8yMapperBuilder { timer_sender, bridge_monitor_box, self.message_handlers, + self.registered_message_handlers, )) } } diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index f77765e9423..2b46d550509 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -35,6 +35,7 @@ const STATE_DIR_NAME: &str = ".tedge-mapper-c8y"; const C8Y_CLOUD: &str = "c8y"; const SUPPORTED_OPERATIONS_DIRECTORY: &str = "operations"; +#[derive(Clone)] pub struct C8yMapperConfig { pub device_id: String, pub device_topic_id: EntityTopicId, diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index cac47e432ae..b81cbf72d16 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -4,13 +4,9 @@ use super::config::MQTT_MESSAGE_SIZE_THRESHOLD; use super::error::CumulocityMapperError; use super::service_monitor; use crate::actor::CmdId; -use crate::actor::IdDownloadRequest; -use crate::actor::IdDownloadResult; use crate::dynamic_discovery::DiscoverOp; use crate::error::ConversionError; use crate::json; -use crate::operations; -use crate::operations::OperationHandler; use anyhow::anyhow; use anyhow::Context; use c8y_api::http_proxy::C8yEndPoint; @@ -58,7 +54,6 @@ use std::fs; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; -use tedge_actors::ClientMessageBox; use tedge_actors::LoggingSender; use tedge_actors::Sender; use tedge_api::commands::RestartCommand; @@ -86,8 +81,6 @@ use tedge_config::TEdgeConfigError; use tedge_config::TopicPrefix; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; -use tedge_uploader_ext::UploadRequest; -use tedge_uploader_ext::UploadResult; use tedge_utils::file::create_directory_with_defaults; use tedge_utils::file::create_file_with_defaults; use tedge_utils::file::FileError; @@ -186,8 +179,6 @@ pub struct CumulocityConverter { pub command_id: IdGenerator, // Keep active command IDs to avoid creation of multiple commands for an operation pub active_commands: HashSet, - - pub operation_handler: OperationHandler, } impl CumulocityConverter { @@ -196,8 +187,6 @@ impl CumulocityConverter { mqtt_publisher: LoggingSender, http_proxy: C8YHttpProxy, auth_proxy: ProxyUrlGenerator, - uploader: ClientMessageBox<(String, UploadRequest), (String, UploadResult)>, - downloader: ClientMessageBox, ) -> Result { let device_id = config.device_id.clone(); let device_topic_id = config.device_topic_id.clone(); @@ -247,15 +236,6 @@ impl CumulocityConverter { let command_id = config.id_generator(); - let operation_handler = OperationHandler::new( - &config, - downloader, - uploader, - mqtt_publisher.clone(), - http_proxy.clone(), - auth_proxy.clone(), - ); - Ok(CumulocityConverter { size_threshold, config: Arc::new(config), @@ -276,7 +256,6 @@ impl CumulocityConverter { auth_proxy, command_id, active_commands: HashSet::new(), - operation_handler, }) } @@ -1186,16 +1165,6 @@ impl CumulocityConverter { Channel::Command { cmd_id, .. } if self.command_id.is_generator_of(cmd_id) => { self.active_commands.insert(cmd_id.clone()); - let entity = self.entity_store.try_get(&source)?; - let external_id = entity.external_id.clone(); - let entity = operations::EntityTarget { - topic_id: entity.topic_id.clone(), - external_id: external_id.clone(), - smartrest_publish_topic: self - .smartrest_publish_topic_for_entity(&entity.topic_id)?, - }; - - self.operation_handler.handle(entity, message.clone()).await; Ok(vec![]) } @@ -1514,10 +1483,6 @@ pub fn get_local_child_devices_list(path: &Path) -> Result, Cumu #[cfg(test)] pub(crate) mod tests { use super::CumulocityConverter; - use crate::actor::IdDownloadRequest; - use crate::actor::IdDownloadResult; - use crate::actor::IdUploadRequest; - use crate::actor::IdUploadResult; use crate::config::C8yMapperConfig; use crate::Capabilities; use anyhow::Result; @@ -1538,7 +1503,6 @@ pub(crate) mod tests { use tedge_actors::test_helpers::FakeServerBox; use tedge_actors::test_helpers::FakeServerBoxBuilder; use tedge_actors::Builder; - use tedge_actors::ClientMessageBox; use tedge_actors::CloneSender; use tedge_actors::LoggingSender; use tedge_actors::MessageReceiver; @@ -3145,23 +3109,8 @@ pub(crate) mod tests { let auth_proxy_port = config.auth_proxy_port; let auth_proxy = ProxyUrlGenerator::new(auth_proxy_addr, auth_proxy_port, Protocol::Http); - let mut uploader_builder: FakeServerBoxBuilder = - FakeServerBox::builder(); - let uploader = ClientMessageBox::new(&mut uploader_builder); - - let mut downloader_builder: FakeServerBoxBuilder = - FakeServerBox::builder(); - let downloader = ClientMessageBox::new(&mut downloader_builder); - - let converter = CumulocityConverter::new( - config, - mqtt_publisher, - http_proxy, - auth_proxy, - uploader, - downloader, - ) - .unwrap(); + let converter = + CumulocityConverter::new(config, mqtt_publisher, http_proxy, auth_proxy).unwrap(); (converter, c8y_proxy_builder.build()) } diff --git a/crates/extensions/c8y_mapper_ext/src/lib.rs b/crates/extensions/c8y_mapper_ext/src/lib.rs index 91e3881b971..e5fc286986f 100644 --- a/crates/extensions/c8y_mapper_ext/src/lib.rs +++ b/crates/extensions/c8y_mapper_ext/src/lib.rs @@ -1,3 +1,5 @@ +use c8y_api::json_c8y_deserializer::C8yDeviceControlOperation; + pub mod actor; pub mod alarm_converter; pub mod availability; @@ -9,7 +11,7 @@ pub mod error; mod fragments; mod inventory; pub mod json; -mod operations; +pub mod operations; mod serializer; pub mod service_monitor; #[cfg(test)] @@ -24,6 +26,19 @@ pub struct Capabilities { pub device_profile: bool, } +impl Capabilities { + pub fn is_enabled(&self, operation: &C8yDeviceControlOperation) -> bool { + match operation { + C8yDeviceControlOperation::LogfileRequest(_) => self.log_upload, + C8yDeviceControlOperation::UploadConfigFile(_) => self.config_snapshot, + C8yDeviceControlOperation::DownloadConfigFile(_) => self.config_update, + C8yDeviceControlOperation::Firmware(_) => self.firmware_update, + C8yDeviceControlOperation::DeviceProfile(_) => self.device_profile, + _ => true, + } + } +} + #[cfg(test)] impl Default for Capabilities { fn default() -> Self { diff --git a/crates/extensions/c8y_mapper_ext/src/operations/actor.rs b/crates/extensions/c8y_mapper_ext/src/operations/actor.rs new file mode 100644 index 00000000000..b584b98df1e --- /dev/null +++ b/crates/extensions/c8y_mapper_ext/src/operations/actor.rs @@ -0,0 +1,337 @@ +//! Actor handles c8y operations. +//! +//! First, Cumulocity starts an operation like `c8y_SoftwareUpdate` or `c8y_UploadConfigFile`. This +//! is converted by the mapper into a local thin-edge.io command, that is executed by tedge-agent. +//! As the agent executes a command that corresponds to the operation we need to report on that +//! operation progress by sending smartrest messages like `Set operation to EXECUTING`. +//! +//! The handler ignores clearing messages that it receives, as it alone should send clearing +//! messages. + +use std::collections::HashMap; +use std::future::Future; +use std::sync::Arc; + +use super::handler::is_operation_status_transition_valid; +use super::handler::RunningOperation; +use super::handlers::OperationContext; +use super::handlers::OperationMessage; +use super::handlers::OperationOutcome; +use super::OperationHandler; +use crate::actor::PublishMessage; +use async_trait::async_trait; +use tedge_actors::Actor; +use tedge_actors::CloneSender; +use tedge_actors::MessageReceiver; +use tedge_actors::RuntimeError; +use tedge_actors::Sender; +use tedge_actors::SimpleMessageBox; +use tedge_api::mqtt_topics::Channel; +use tedge_api::workflow::GenericCommandState; +use tedge_mqtt_ext::MqttMessage; +use tokio::sync::Mutex; +use tracing::debug; +use tracing::error; +use tracing::warn; + +pub struct OperationHandlerActor { + pub(super) messages: SimpleMessageBox, + pub(super) operation_handler: OperationHandler, + pub(super) running_operations: RunningOperations, +} + +#[async_trait] +impl Actor for OperationHandlerActor { + fn name(&self) -> &str { + "OperationHandler" + } + + async fn run(mut self) -> Result<(), RuntimeError> { + while let Some(input_message) = self.messages.recv().await { + self.handle_operation_message(input_message).await; + } + + Ok(()) + } +} + +impl OperationHandlerActor { + async fn handle_operation_message(&mut self, message: OperationMessage) { + let context = self.operation_handler.context.clone(); + + // input validation + let Ok((_, channel)) = context + .mqtt_schema + .entity_channel_of(&message.message.topic) + else { + return; + }; + + let Channel::Command { cmd_id, .. } = channel else { + return; + }; + + // don't process sub-workflow calls + if cmd_id.starts_with("sub:") { + return; + } + + if !context.command_id.is_generator_of(cmd_id.as_str()) { + return; + } + + let topic = message.message.topic.clone(); + + let mut message_box = self.messages.sender_clone(); + self.running_operations + .report(message, |outcome| async move { + match outcome { + OperationOutcome::Ignored => {} + OperationOutcome::Executing { extra_messages } => { + for m in extra_messages { + message_box.send(PublishMessage(m)).await.unwrap(); + } + } + OperationOutcome::Finished { messages } => { + for m in messages { + message_box.send(PublishMessage(m)).await.unwrap(); + } + + let clearing_message = MqttMessage::new(&topic, []).with_retain(); + message_box + .send(PublishMessage(clearing_message)) + .await + .unwrap(); + } + } + }) + .await; + } +} + +pub(super) struct RunningOperations { + pub(super) current_statuses: Arc, RunningOperation>>>, + pub(super) context: Arc, +} + +impl RunningOperations { + // If operation status transition hasn't been handled yet, spawn a task that will handle it. + async fn report(&mut self, message: OperationMessage, f: F) + where + F: FnOnce(OperationOutcome) -> Fut + Send + 'static, + Fut: Future + Send, + { + let topic = message.message.topic.name.as_str(); + let status = match GenericCommandState::from_command_message(&message.message) { + // clearing message was either echoed back to us by MQTT broker, or was published by + // some other MQTT client; the latter shouldn't really happen, but the former is + // expected + Ok(command) if command.is_cleared() => { + debug!(topic = %topic, "unexpected clearing message"); + return; + } + Err(err) => { + error!(%err, ?message, "could not parse command payload"); + return; + } + Ok(command) => command.status, + }; + + let context = self.context.clone(); + let mut current_statuses = self.current_statuses.lock().await; + let current_operation = current_statuses.get(topic); + + match current_operation { + None => { + let topic: Arc = topic.into(); + let handle = tokio::spawn(async move { + let outcome = context.report(message).await; + f(outcome).await; + }); + current_statuses.insert(topic, RunningOperation { status, handle }); + } + + // if we have task running, check if new status is allowed and then spawn a new task + // that also waits for old transition to complete + Some(current_operation) => { + let previous_status = ¤t_operation.status; + if status == current_operation.status.as_str() { + debug!( + "already handling operation message with this topic and status, ignoring" + ); + return; + } + + // we got a new status, check if it's not invalid and then await previous one and + // handle the new one + if !is_operation_status_transition_valid(previous_status, &status) { + warn!( + topic = %topic, + previous = previous_status, + next = status, + "attempted invalid status transition, ignoring" + ); + return; + } + + // remove currently running operation task from the hashmap and spawn a new one that + // also waits on the old one + let topic: Arc = topic.into(); + + let _current_statuses = self.current_statuses.clone(); + let _topic = topic.clone(); + + let handle = tokio::spawn(async move { + let outcome = context.report(message).await; + if let OperationOutcome::Finished { .. } = outcome { + _current_statuses.lock().await.remove(&*_topic); + } + f(outcome).await; + }); + + current_statuses.insert(topic, RunningOperation { handle, status }); + } + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::actor::IdDownloadRequest; + use crate::actor::IdDownloadResult; + use crate::actor::IdUploadRequest; + use crate::actor::IdUploadResult; + use crate::actor::PublishMessage; + use crate::operations::builder::OperationHandlerBuilder; + use crate::operations::handler::OperationHandlerConfig; + use crate::Capabilities; + use c8y_api::http_proxy::C8yEndPoint; + use c8y_auth_proxy::url::Protocol; + use c8y_auth_proxy::url::ProxyUrlGenerator; + use c8y_http_proxy::messages::C8YRestRequest; + use c8y_http_proxy::messages::C8YRestResult; + use tedge_actors::test_helpers::FakeServerBox; + use tedge_actors::test_helpers::FakeServerBoxBuilder; + use tedge_actors::Actor; + use tedge_actors::Builder; + use tedge_actors::Sender; + use tedge_actors::SimpleMessageBox; + use tedge_actors::SimpleMessageBoxBuilder; + use tedge_api::commands::ConfigSnapshotCmd; + use tedge_api::commands::ConfigSnapshotCmdPayload; + use tedge_api::entity_store::EntityMetadata; + use tedge_api::mqtt_topics::EntityTopicId; + use tedge_api::mqtt_topics::IdGenerator; + use tedge_api::mqtt_topics::MqttSchema; + use tedge_api::CommandStatus; + use tedge_config::AutoLogUpload; + use tedge_config::TopicPrefix; + use tedge_mqtt_ext::MqttMessage; + use tedge_test_utils::fs::TempTedgeDir; + use tokio::task::JoinHandle; + use tracing::Level; + + #[tokio::test] + // #[should_panic] + async fn panics_when_task_panics() { + tedge_config::system_services::set_log_level(Level::DEBUG); + let TestHandle { + mut mqtt, + handle: actor_handle, + .. + } = spawn_operation_actor().await; + + let mqtt_schema = MqttSchema::new(); + let entity_topic_id = EntityTopicId::default_main_device(); + let entity_metadata = EntityMetadata::main_device("anything".to_string()); + + // spawn an operation to see if it's successfully joined when it's completed. + // particular operation used is not important, because we want to test only the handler. + // it would be even better if we could define some inline operation so test could be shorter + // TODO(marcel): don't assume operation implementations when testing the handler + let command = ConfigSnapshotCmd { + target: entity_topic_id, + cmd_id: "c8y-mapper-1".to_string(), + payload: ConfigSnapshotCmdPayload { + status: CommandStatus::Successful, + tedge_url: Some("asdf".to_string()), + config_type: "typeA".to_string(), + path: None, + log_path: None, + }, + }; + let message = command.command_message(&mqtt_schema); + + mqtt.send((message, entity_metadata)).await.unwrap(); + drop(mqtt); + + actor_handle.await.unwrap(); + } + + struct TestHandle { + handle: JoinHandle<()>, + mqtt: SimpleMessageBox, + _dl: FakeServerBox, + _ul: FakeServerBox, + _c8y_proxy: FakeServerBox, + _ttd: TempTedgeDir, + } + + async fn spawn_operation_actor() -> TestHandle { + let auth_proxy_addr = "127.0.0.1".into(); + let auth_proxy_port = 8001; + let auth_proxy_protocol = Protocol::Http; + + let ttd = TempTedgeDir::new(); + let config = OperationHandlerConfig { + capabilities: Capabilities::default(), + auto_log_upload: AutoLogUpload::OnFailure, + tedge_http_host: Arc::from("127.0.0.1:8000"), + tmp_dir: ttd.utf8_path().into(), + software_management_api: tedge_config::SoftwareManagementApiFlag::Legacy, + mqtt_schema: MqttSchema::with_root("te".to_string()), + c8y_endpoint: C8yEndPoint::new("c8y.url", "c8y.url", "device_id"), + c8y_prefix: TopicPrefix::try_from("c8y").unwrap(), + auth_proxy: ProxyUrlGenerator::new( + auth_proxy_addr, + auth_proxy_port, + auth_proxy_protocol, + ), + id_generator: IdGenerator::new("c8y"), + smartrest_use_operation_id: true, + }; + + let mut mqtt_builder: SimpleMessageBoxBuilder< + PublishMessage, + (MqttMessage, EntityMetadata), + > = SimpleMessageBoxBuilder::new("MQTT", 10); + let mut c8y_proxy_builder: FakeServerBoxBuilder = + FakeServerBoxBuilder::default(); + let mut uploader_builder: FakeServerBoxBuilder = + FakeServerBoxBuilder::default(); + let mut downloader_builder: FakeServerBoxBuilder = + FakeServerBoxBuilder::default(); + + let operation_handler_builder = OperationHandlerBuilder::new( + config, + &mut mqtt_builder, + &mut uploader_builder, + &mut downloader_builder, + &mut c8y_proxy_builder, + ); + + let actor = operation_handler_builder.build(); + let handle = tokio::spawn(async move { actor.run().await.unwrap() }); + + TestHandle { + handle, + mqtt: mqtt_builder.build(), + _dl: downloader_builder.build(), + _ul: uploader_builder.build(), + _c8y_proxy: c8y_proxy_builder.build(), + _ttd: ttd, + } + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/operations/builder.rs b/crates/extensions/c8y_mapper_ext/src/operations/builder.rs new file mode 100644 index 00000000000..98373b0b151 --- /dev/null +++ b/crates/extensions/c8y_mapper_ext/src/operations/builder.rs @@ -0,0 +1,135 @@ +use c8y_api::smartrest::topic::C8yTopic; +use c8y_http_proxy::handle::C8YHttpProxy; +use c8y_http_proxy::messages::C8YRestRequest; +use c8y_http_proxy::messages::C8YRestResult; +use tedge_actors::futures::channel::mpsc; +use tedge_actors::Builder; +use tedge_actors::ClientMessageBox; +use tedge_actors::CloneSender; +use tedge_actors::MessageSink; +use tedge_actors::MessageSource; +use tedge_actors::NoConfig; +use tedge_actors::RuntimeRequestSink; +use tedge_actors::Service; +use tedge_actors::SimpleMessageBoxBuilder; +use tedge_api::entity_store::EntityMetadata; +use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::ChannelFilter; +use tedge_mqtt_ext::MqttMessage; + +use crate::actor::IdDownloadRequest; +use crate::actor::IdDownloadResult; +use crate::actor::IdUploadRequest; +use crate::actor::IdUploadResult; +use crate::actor::PublishMessage; + +use super::actor::OperationHandlerActor; +use super::actor::RunningOperations; +use super::handler::OperationHandlerConfig; +use super::handlers::OperationMessage; +use super::EntityTarget; +use super::OperationHandler; + +pub struct OperationHandlerBuilder { + operation_handler: OperationHandler, + box_builder: SimpleMessageBoxBuilder, +} + +impl OperationHandlerBuilder { + pub fn new( + config: OperationHandlerConfig, + + mqtt: &mut (impl MessageSource<(MqttMessage, EntityMetadata), Vec> + + MessageSink), + uploader: &mut impl Service, + downloader: &mut impl Service, + + http: &mut impl Service, + ) -> Self { + // if there are any outgoing MQTT messages, send them immediately + let (operation_handler_sender, _) = mpsc::channel::(10); + + let uploader = ClientMessageBox::new(uploader); + let downloader = ClientMessageBox::new(downloader); + + let c8y_http_proxy = C8YHttpProxy::new(http); + + // TODO(marcel): discarding EntityFilter portion because C8yMapperActor doesn't support it, perhaps it should + let channel_filter = OperationHandler::topic_filter(&config.capabilities) + .into_iter() + .map(|f| f.1) + .collect::>(); + + let mut box_builder: SimpleMessageBoxBuilder = + SimpleMessageBoxBuilder::new("OperationHandlerActor", 10); + + box_builder.connect_mapped_source(channel_filter, mqtt, Self::mqtt_message_parser(&config)); + + mqtt.connect_source(NoConfig, &mut box_builder); + + let operation_handler = OperationHandler::new( + config, + downloader, + uploader, + operation_handler_sender.sender_clone(), + c8y_http_proxy, + ); + + Self { + operation_handler, + box_builder, + } + } + + fn mqtt_message_parser( + config: &OperationHandlerConfig, + ) -> impl Fn((MqttMessage, EntityMetadata)) -> Option { + let mqtt_schema = config.mqtt_schema.clone(); + let prefix = config.c8y_prefix.clone(); + + move |(message, metadata)| { + let (_, channel) = mqtt_schema.entity_channel_of(&message.topic).unwrap(); + + // if not Command, then CommandMetadata + let Channel::Command { operation, cmd_id } = channel else { + return None; + }; + + let smartrest_publish_topic = C8yTopic::smartrest_response_topic(&metadata, &prefix) + .expect("should create a valid topic"); + + Some(OperationMessage { + message, + operation, + cmd_id: cmd_id.into(), + entity: EntityTarget { + topic_id: metadata.topic_id, + external_id: metadata.external_id, + smartrest_publish_topic, + }, + }) + } + } +} + +impl Builder for OperationHandlerBuilder { + type Error = std::convert::Infallible; + + fn try_build(self) -> Result { + let context = self.operation_handler.context.clone(); + Ok(OperationHandlerActor { + operation_handler: self.operation_handler, + messages: self.box_builder.build(), + running_operations: RunningOperations { + current_statuses: Default::default(), + context, + }, + }) + } +} + +impl RuntimeRequestSink for OperationHandlerBuilder { + fn get_signal_sender(&self) -> tedge_actors::DynSender { + self.box_builder.get_signal_sender() + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/operations/c8y_operations.rs b/crates/extensions/c8y_mapper_ext/src/operations/c8y_operations.rs new file mode 100644 index 00000000000..8dd233e9973 --- /dev/null +++ b/crates/extensions/c8y_mapper_ext/src/operations/c8y_operations.rs @@ -0,0 +1,406 @@ +//! Keeps track of c8y operations received from the cloud and responds to them. + +use std::{collections::HashMap, sync::Arc}; + +use c8y_api::{ + http_proxy::C8yEndPoint, + json_c8y_deserializer::{ + C8yDeviceControlOperation, C8yDeviceProfile, C8yDownloadConfigFile, C8yFirmware, + C8yLogfileRequest, C8yOperation, C8yRestart, C8ySoftwareUpdate, C8yUploadConfigFile, + }, +}; +use c8y_auth_proxy::url::ProxyUrlGenerator; +use tedge_api::{ + commands::{ + ConfigSnapshotCmdPayload, ConfigUpdateCmdPayload, FirmwareUpdateCmdPayload, + LogUploadCmdPayload, + }, + device_profile::DeviceProfileCmdPayload, + entity_store::EntityMetadata, + mqtt_topics::{Channel, EntityTopicId, IdGenerator, MqttSchema, OperationType}, + CommandStatus, DownloadInfo, Jsonify, RestartCommand, +}; +use tedge_mqtt_ext::MqttMessage; +use tracing::{error, warn}; + +use crate::{ + error::{ConversionError, CumulocityMapperError}, + Capabilities, +}; + +type OperationId = Arc; + +#[derive(Debug, Clone)] +struct C8yOperations { + active_c8y_operations: HashMap, + capabilities: Capabilities, + xid_to_metadata: HashMap, EntityMetadata>, + mqtt_schema: MqttSchema, + command_id: IdGenerator, + c8y_endpoint: C8yEndPoint, + auth_proxy: ProxyUrlGenerator, + tedge_http_host: Arc, +} + +impl C8yOperations { + pub fn new(capabilities: Capabilities) -> Self { + Self { + active_c8y_operations: HashMap::new(), + capabilities, + xid_to_metadata: HashMap::new(), + mqtt_schema: MqttSchema::new(), + command_id: IdGenerator::new("peniz"), + c8y_endpoint: C8yEndPoint::new("peniz", "peniz", "peniz"), + auth_proxy: ProxyUrlGenerator::new( + "peniz".into(), + 2137, + c8y_auth_proxy::url::Protocol::Http, + ), + tedge_http_host: Arc::from("peniz"), + } + } + + pub fn register(&mut self, operation: C8yOperation) { + let entity_xid = &operation.external_source.external_id; + + let c8y_device_control_operation = + C8yDeviceControlOperation::from_json_object(&operation.extras).unwrap(); + + if !self.capabilities.is_enabled(&c8y_device_control_operation) { + warn!("Received an operation which is disabled in configuration"); + return; + } + + let entity_metadata = self.xid_to_metadata.get(entity_xid.as_str()).unwrap(); + let cmd_id = self.command_id.new_id(); + + let msgs = match c8y_device_control_operation { + C8yDeviceControlOperation::Restart(request) => self + .forward_restart_request(entity_metadata, cmd_id) + .unwrap(), + C8yDeviceControlOperation::SoftwareUpdate(request) => self + .forward_software_request(entity_metadata, cmd_id, request) + .unwrap(), + C8yDeviceControlOperation::LogfileRequest(request) => self + .convert_log_upload_request(entity_metadata, cmd_id, request) + .unwrap(), + C8yDeviceControlOperation::UploadConfigFile(request) => self + .convert_config_snapshot_request(entity_metadata, cmd_id, request) + .unwrap(), + C8yDeviceControlOperation::DownloadConfigFile(request) => self + .convert_config_update_request(entity_metadata, cmd_id, request) + .unwrap(), + C8yDeviceControlOperation::Firmware(request) => self + .convert_firmware_update_request(entity_metadata, cmd_id, request) + .unwrap(), + C8yDeviceControlOperation::DeviceProfile(request) => { + if let Some(profile_name) = operation.extras.get("profileName") { + self.convert_device_profile_request( + entity_metadata, + cmd_id, + request, + serde_json::from_value(profile_name.clone()).unwrap(), + ) + .unwrap() + } else { + error!("Received a c8y_DeviceProfile without a profile name"); + vec![] + } + } + C8yDeviceControlOperation::Custom => { + // Ignores custom and static template operations unsupported by thin-edge + // However, these operations can be addressed by SmartREST that is published together with JSON over MQTT + vec![] + } + }; + + let id = Arc::from(operation.op_id.as_str()); + self.active_c8y_operations.insert(id, operation); + + // send local MQTT command + } + + fn forward_restart_request( + &self, + entity: &EntityMetadata, + cmd_id: String, + ) -> Result, CumulocityMapperError> { + let command = RestartCommand::new(&entity.topic_id, cmd_id); + let message = command.command_message(&self.mqtt_schema); + Ok(vec![message]) + } + + fn forward_software_request( + &self, + entity: &EntityMetadata, + cmd_id: String, + software_update_request: C8ySoftwareUpdate, + ) -> Result, CumulocityMapperError> { + let mut command = + software_update_request.into_software_update_command(&entity.topic_id, cmd_id)?; + + command.payload.update_list.iter_mut().for_each(|modules| { + modules.modules.iter_mut().for_each(|module| { + if let Some(url) = &mut module.url { + *url = if let Some(cumulocity_url) = + self.c8y_endpoint.maybe_tenant_url(url.url()) + { + DownloadInfo::new(self.auth_proxy.proxy_url(cumulocity_url).as_ref()) + } else { + DownloadInfo::new(url.url()) + }; + } + }); + }); + + let message = command.command_message(&self.mqtt_schema); + Ok(vec![message]) + } + + /// Convert c8y_UploadConfigFile JSON over MQTT operation to ThinEdge config_snapshot command + pub fn convert_config_snapshot_request( + &self, + entity: &EntityMetadata, + cmd_id: String, + config_upload_request: C8yUploadConfigFile, + ) -> Result, CumulocityMapperError> { + let channel = Channel::Command { + operation: OperationType::ConfigSnapshot, + cmd_id: cmd_id.clone(), + }; + let topic = self.mqtt_schema.topic_for(&entity.topic_id, &channel); + + // Replace '/' with ':' to avoid creating unexpected directories in file transfer repo + let tedge_url = format!( + "http://{}/tedge/file-transfer/{}/config_snapshot/{}-{}", + &self.tedge_http_host, + entity.external_id.as_ref(), + config_upload_request.config_type.replace('/', ":"), + cmd_id + ); + + let request = ConfigSnapshotCmdPayload { + status: CommandStatus::Init, + tedge_url: Some(tedge_url), + config_type: config_upload_request.config_type, + path: None, + log_path: None, + }; + + // Command messages must be retained + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) + } + + /// Convert c8y_LogfileRequest operation to a ThinEdge log_upload command + pub fn convert_log_upload_request( + &self, + entity: &EntityMetadata, + cmd_id: String, + log_request: C8yLogfileRequest, + ) -> Result, CumulocityMapperError> { + let channel = Channel::Command { + operation: OperationType::LogUpload, + cmd_id: cmd_id.clone(), + }; + let topic = self.mqtt_schema.topic_for(&entity.topic_id, &channel); + + let tedge_url = format!( + "http://{}/tedge/file-transfer/{}/log_upload/{}-{}", + &self.tedge_http_host, + entity.external_id.as_ref(), + log_request.log_file, + cmd_id + ); + + let request = LogUploadCmdPayload { + status: CommandStatus::Init, + tedge_url, + log_type: log_request.log_file, + date_from: log_request.date_from, + date_to: log_request.date_to, + search_text: Some(log_request.search_text).filter(|s| !s.is_empty()), + lines: log_request.maximum_lines, + log_path: None, + }; + + // Command messages must be retained + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) + } + + /// Convert c8y_Firmware JSON over MQTT operation to ThinEdge firmware_update command. + pub fn convert_firmware_update_request( + &self, + entity: &EntityMetadata, + cmd_id: String, + firmware_request: C8yFirmware, + ) -> Result, CumulocityMapperError> { + let channel = Channel::Command { + operation: OperationType::FirmwareUpdate, + cmd_id, + }; + let topic = self.mqtt_schema.topic_for(&entity.topic_id, &channel); + + let tedge_url = + if let Some(c8y_url) = self.c8y_endpoint.maybe_tenant_url(&firmware_request.url) { + self.auth_proxy.proxy_url(c8y_url).to_string() + } else { + firmware_request.url.clone() + }; + + let request = FirmwareUpdateCmdPayload { + status: CommandStatus::Init, + tedge_url: Some(tedge_url), + remote_url: firmware_request.url, + name: firmware_request.name, + version: firmware_request.version, + log_path: None, + }; + + // Command messages must be retained + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) + } + + /// Upon receiving a SmartREST c8y_DownloadConfigFile request, convert it to a message on the + /// command channel. + pub fn convert_config_update_request( + &self, + entity: &EntityMetadata, + cmd_id: String, + config_download_request: C8yDownloadConfigFile, + ) -> Result, CumulocityMapperError> { + let config_download_request: &C8yDownloadConfigFile = &config_download_request; + let channel = Channel::Command { + operation: OperationType::ConfigUpdate, + cmd_id: cmd_id.to_string(), + }; + let topic = self.mqtt_schema.topic_for(&entity.topic_id, &channel); + + let proxy_url = self + .c8y_endpoint + .maybe_tenant_url(&config_download_request.url) + .map(|cumulocity_url| self.auth_proxy.proxy_url(cumulocity_url).into()); + + let remote_url = proxy_url.unwrap_or(config_download_request.url.to_string()); + + let request = ConfigUpdateCmdPayload { + status: CommandStatus::Init, + tedge_url: None, + remote_url, + config_type: config_download_request.config_type.clone(), + path: None, + log_path: None, + }; + + // Command messages must be retained + let messages = vec![MqttMessage::new(&topic, request.to_json()).with_retain()]; + Ok(messages) + } + + /// Convert c8y_DeviceProfile JSON over MQTT operation to ThinEdge device_profile command. + pub fn convert_device_profile_request( + &self, + entity: &EntityMetadata, + cmd_id: String, + device_profile_request: C8yDeviceProfile, + profile_name: String, + ) -> Result, CumulocityMapperError> { + let channel = Channel::Command { + operation: OperationType::DeviceProfile, + cmd_id, + }; + let topic = self.mqtt_schema.topic_for(&entity.topic_id, &channel); + + let mut request = DeviceProfileCmdPayload { + status: CommandStatus::Init, + name: profile_name, + operations: Vec::new(), + }; + + if let Some(mut firmware) = device_profile_request.firmware { + if let Some(cumulocity_url) = self.c8y_endpoint.maybe_tenant_url(&firmware.url) { + firmware.url = self.auth_proxy.proxy_url(cumulocity_url).into(); + } + request.add_firmware(firmware.into()); + } + + if let Some(mut software) = device_profile_request.software { + software.lists.iter_mut().for_each(|module| { + if let Some(url) = &mut module.url { + if let Some(cumulocity_url) = self.c8y_endpoint.maybe_tenant_url(url) { + *url = self.auth_proxy.proxy_url(cumulocity_url).into(); + } + } + }); + request.add_software(software.try_into()?); + } + + for mut config in device_profile_request.configuration { + if let Some(cumulocity_url) = self.c8y_endpoint.maybe_tenant_url(&config.url) { + config.url = self.auth_proxy.proxy_url(cumulocity_url).into(); + } + request.add_config(config.into()); + } + + // Command messages must be retained + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) + } +} + +/// Converts C8y operations into local MQTT commands +trait IntoCommand { + fn into_command( + self, + topic_id: &EntityTopicId, + cmd_id: String, + mqtt_schema: &MqttSchema, + ) -> Result, anyhow::Error>; +} + +impl IntoCommand for C8yRestart { + fn into_command( + self, + topic_id: &EntityTopicId, + cmd_id: String, + mqtt_schema: &MqttSchema, + ) -> Result, anyhow::Error> { + let command = RestartCommand::new(topic_id, cmd_id); + let message = command.command_message(mqtt_schema); + Ok(vec![message]) + } +} + +// impl IntoCommand for C8ySoftwareUpdate { +// fn into_command( +// self, +// topic_id: &EntityTopicId, +// cmd_id: String, +// mqtt_schema: &MqttSchema, +// ) -> Result, anyhow::Error> { +// let mut command = self.into_software_update_command(topic_id, cmd_id)?; + +// command.payload.update_list.iter_mut().for_each(|modules| { +// modules.modules.iter_mut().for_each(|module| { +// if let Some(url) = &mut module.url { +// *url = if let Some(cumulocity_url) = +// self.c8y_endpoint.maybe_tenant_url(url.url()) +// { +// DownloadInfo::new(self.auth_proxy.proxy_url(cumulocity_url).as_ref()) +// } else { +// DownloadInfo::new(url.url()) +// }; +// } +// }); +// }); + +// let message = command.command_message(&self.mqtt_schema); +// Ok(vec![message]) +// } +// } diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handler.rs b/crates/extensions/c8y_mapper_ext/src/operations/handler.rs index 388fb2bf4fe..f22ad656232 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handler.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handler.rs @@ -8,18 +8,26 @@ use crate::actor::IdUploadResult; use crate::config::C8yMapperConfig; use crate::Capabilities; use c8y_api::http_proxy::C8yEndPoint; +use c8y_auth_proxy::url::Protocol; use c8y_auth_proxy::url::ProxyUrlGenerator; use c8y_http_proxy::handle::C8YHttpProxy; +use camino::Utf8Path; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; use tedge_actors::ClientMessageBox; -use tedge_actors::LoggingSender; +use tedge_actors::DynSender; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::ChannelFilter; use tedge_api::mqtt_topics::EntityFilter; +use tedge_api::mqtt_topics::IdGenerator; +use tedge_api::mqtt_topics::MqttSchema; use tedge_api::workflow::GenericCommandState; +use tedge_config::AutoLogUpload; +use tedge_config::SoftwareManagementApiFlag; +use tedge_config::TopicPrefix; use tedge_mqtt_ext::MqttMessage; +use tokio::task::JoinHandle; use tracing::debug; use tracing::error; use tracing::warn; @@ -42,46 +50,41 @@ use tracing::warn; /// performs an operation in the background in separate tasks. The operation tasks themselves handle /// reporting their success/failure. pub struct OperationHandler { - context: Arc, - running_operations: HashMap, RunningOperation>, + pub(super) context: Arc, + pub(super) running_operations: HashMap, RunningOperation>, } impl OperationHandler { pub fn new( - c8y_mapper_config: &C8yMapperConfig, + config: OperationHandlerConfig, downloader: ClientMessageBox, uploader: ClientMessageBox, - mqtt_publisher: LoggingSender, + mqtt_publisher: DynSender, http_proxy: C8YHttpProxy, - auth_proxy: ProxyUrlGenerator, ) -> Self { Self { context: Arc::new(OperationContext { - capabilities: c8y_mapper_config.capabilities, - auto_log_upload: c8y_mapper_config.auto_log_upload, - tedge_http_host: c8y_mapper_config.tedge_http_host.clone(), - tmp_dir: c8y_mapper_config.tmp_dir.clone(), - mqtt_schema: c8y_mapper_config.mqtt_schema.clone(), - mqtt_publisher: mqtt_publisher.clone(), - software_management_api: c8y_mapper_config.software_management_api, - smart_rest_use_operation_id: c8y_mapper_config.smartrest_use_operation_id, + capabilities: config.capabilities, + auto_log_upload: config.auto_log_upload, + tedge_http_host: config.tedge_http_host, + tmp_dir: config.tmp_dir, + mqtt_schema: config.mqtt_schema, + software_management_api: config.software_management_api, + c8y_endpoint: config.c8y_endpoint, + auth_proxy: config.auth_proxy.clone(), + mqtt_publisher: mqtt_publisher.sender_clone(), + smart_rest_use_operation_id: config.smartrest_use_operation_id, // TODO(marcel): would be good not to generate new ids from running operations, see if // we can remove it somehow - command_id: c8y_mapper_config.id_generator(), + command_id: config.id_generator, downloader, uploader, - c8y_endpoint: C8yEndPoint::new( - &c8y_mapper_config.c8y_host, - &c8y_mapper_config.c8y_mqtt, - &c8y_mapper_config.device_id, - ), http_proxy: http_proxy.clone(), - auth_proxy: auth_proxy.clone(), }), running_operations: Default::default(), @@ -216,6 +219,33 @@ impl OperationHandler { } } + pub fn handle_spawn(&mut self, entity: EntityTarget, message: MqttMessage) -> JoinHandle<()> { + let context = self.context.clone(); + tokio::spawn(async move { + let Ok((_, channel)) = context.mqtt_schema.entity_channel_of(&message.topic) else { + return; + }; + + let Channel::Command { operation, cmd_id } = channel else { + return; + }; + + // don't process sub-workflow calls + if cmd_id.starts_with("sub:") { + return; + } + + let message = OperationMessage { + operation, + entity, + cmd_id: cmd_id.into(), + message, + }; + + context.update(message).await; + }) + } + /// A topic filter for operation types this object can handle. /// /// The MQTT client should subscribe to topics with this filter to receive MQTT messages that it @@ -253,6 +283,11 @@ impl OperationHandler { (AnyEntity, CommandMetadata(OperationType::FirmwareUpdate)), ]); } + topics.extend([ + (AnyEntity, Command(OperationType::Restart)), + (AnyEntity, Command(OperationType::SoftwareList)), + (AnyEntity, Command(OperationType::SoftwareUpdate)), + ]); if capabilities.device_profile { topics.extend([ @@ -264,17 +299,63 @@ impl OperationHandler { topics } } -struct RunningOperation { - handle: tokio::task::JoinHandle<()>, - status: String, + +#[derive(Debug, Clone)] +pub struct OperationHandlerConfig { + pub capabilities: Capabilities, + pub auto_log_upload: AutoLogUpload, + pub tedge_http_host: Arc, + pub tmp_dir: Arc, + pub software_management_api: SoftwareManagementApiFlag, + pub c8y_endpoint: C8yEndPoint, + pub mqtt_schema: MqttSchema, + pub c8y_prefix: TopicPrefix, + pub auth_proxy: ProxyUrlGenerator, + pub id_generator: IdGenerator, + pub smartrest_use_operation_id: bool, +} + +impl OperationHandlerConfig { + fn from_mapper_config(config: &C8yMapperConfig) -> OperationHandlerConfig { + OperationHandlerConfig { + capabilities: config.capabilities, + auto_log_upload: config.auto_log_upload, + tedge_http_host: config.tedge_http_host.clone(), + tmp_dir: config.tmp_dir.clone(), + software_management_api: config.software_management_api, + c8y_endpoint: C8yEndPoint::new(&config.c8y_host, &config.c8y_mqtt, &config.device_id), + mqtt_schema: config.mqtt_schema.clone(), + c8y_prefix: config.c8y_prefix.clone(), + auth_proxy: ProxyUrlGenerator::new( + config.auth_proxy_addr.clone(), + config.auth_proxy_port, + Protocol::Http, + ), + id_generator: config.id_generator(), + smartrest_use_operation_id: config.smartrest_use_operation_id, + } + } +} + +impl C8yMapperConfig { + pub fn to_operation_handler_config(&self) -> OperationHandlerConfig { + OperationHandlerConfig::from_mapper_config(self) + } +} + +#[derive(Debug)] +pub(super) struct RunningOperation { + pub(super) handle: tokio::task::JoinHandle<()>, + pub(super) status: String, } // TODO: logic of which status transitions are valid should be defined in tedge_api and be // considered together with custom statuses of custom workflows -fn is_operation_status_transition_valid(previous: &str, next: &str) -> bool { +pub fn is_operation_status_transition_valid(previous: &str, next: &str) -> bool { #[allow(clippy::match_like_matches_macro)] match (previous, next) { // not really a transition but false to make sure we're not sending multiple smartrest msgs + // FIXME: this will blow if prev and next are empty! (clearing messages) (prev, next) if prev == next => false, // successful and failed are terminal, can't change them @@ -291,13 +372,13 @@ mod tests { use std::time::Duration; - use c8y_auth_proxy::url::Protocol; use c8y_http_proxy::messages::C8YRestRequest; use c8y_http_proxy::messages::C8YRestResult; use tedge_actors::test_helpers::FakeServerBox; use tedge_actors::test_helpers::FakeServerBoxBuilder; use tedge_actors::test_helpers::MessageReceiverExt; use tedge_actors::Builder; + use tedge_actors::LoggingSender; use tedge_actors::MessageReceiver; use tedge_actors::MessageSink; use tedge_actors::Sender; @@ -836,17 +917,12 @@ mod tests { FakeServerBoxBuilder::default(); let downloader = ClientMessageBox::new(&mut downloader_builder); - let auth_proxy_addr = c8y_mapper_config.auth_proxy_addr.clone(); - let auth_proxy_port = c8y_mapper_config.auth_proxy_port; - let auth_proxy = ProxyUrlGenerator::new(auth_proxy_addr, auth_proxy_port, Protocol::Http); - let operation_handler = OperationHandler::new( - &c8y_mapper_config, + c8y_mapper_config.to_operation_handler_config(), downloader, uploader, - mqtt_publisher, + mqtt_publisher.into(), c8y_proxy, - auth_proxy, ); let mqtt = mqtt_builder.build(); diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs index 3de054869f7..4450b1555af 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs @@ -31,7 +31,7 @@ use c8y_http_proxy::handle::C8YHttpProxy; use camino::Utf8Path; use std::sync::Arc; use tedge_actors::ClientMessageBox; -use tedge_actors::LoggingSender; +use tedge_actors::DynSender; use tedge_actors::Sender; use tedge_api::entity_store::EntityExternalId; use tedge_api::mqtt_topics::EntityTopicId; @@ -42,7 +42,6 @@ use tedge_api::workflow::GenericCommandState; use tedge_config::AutoLogUpload; use tedge_config::SoftwareManagementApiFlag; use tedge_mqtt_ext::MqttMessage; -use tedge_mqtt_ext::QoS; use tedge_mqtt_ext::Topic; use tracing::debug; use tracing::error; @@ -55,6 +54,7 @@ pub(super) struct OperationContext { pub(super) tmp_dir: Arc, pub(super) mqtt_schema: MqttSchema, pub(super) software_management_api: SoftwareManagementApiFlag, + pub(super) command_id: IdGenerator, pub(super) smart_rest_use_operation_id: bool, @@ -64,11 +64,33 @@ pub(super) struct OperationContext { pub(super) downloader: ClientMessageBox, pub(super) uploader: ClientMessageBox, - pub(super) mqtt_publisher: LoggingSender, + pub(super) mqtt_publisher: DynSender, } impl OperationContext { + // will be removed pub async fn update(&self, message: OperationMessage) { + let outcome = self.report(message.clone()).await; + let mut mqtt_publisher = self.mqtt_publisher.sender_clone(); + + match outcome { + OperationOutcome::Ignored => {} + OperationOutcome::Executing { extra_messages } => { + for message in extra_messages { + mqtt_publisher.send(message).await.unwrap(); + } + } + OperationOutcome::Finished { messages } => { + for message in messages { + mqtt_publisher.send(message).await.unwrap(); + } + let clearing_message = MqttMessage::new(&message.message.topic, []).with_retain(); + mqtt_publisher.send(clearing_message).await.unwrap(); + } + } + } + + pub async fn report(&self, message: OperationMessage) -> OperationOutcome { let OperationMessage { entity, cmd_id, @@ -81,7 +103,7 @@ impl OperationContext { Ok(command) => command, Err(err) => { error!(%err, ?message, "could not parse command payload"); - return; + return OperationOutcome::Ignored; } }; @@ -105,22 +127,17 @@ impl OperationContext { OperationType::SoftwareList => { let result = self.publish_software_list(&entity, &cmd_id, &message).await; - let mut mqtt_publisher = self.mqtt_publisher.clone(); match result { Err(err) => { error!("Fail to list installed software packages: {err}"); + return OperationOutcome::Finished { messages: vec![] }; } Ok(OperationOutcome::Finished { messages }) => { - for message in messages { - mqtt_publisher.send(message).await.unwrap(); - } + return OperationOutcome::Finished { messages }; } // command is not yet finished, avoid clearing the command topic - Ok(_) => return, + Ok(outcome) => return outcome, } - - clear_command_topic(command, &mut mqtt_publisher).await; - return; } OperationType::SoftwareUpdate => { self.publish_software_update_status(&entity, &cmd_id, &message) @@ -148,8 +165,6 @@ impl OperationContext { } }; - let mut mqtt_publisher = self.mqtt_publisher.clone(); - // unwrap is safe: at this point all local operations that are not regular c8y // operations should be handled above let c8y_operation = to_c8y_operation(&operation).unwrap(); @@ -160,7 +175,7 @@ impl OperationContext { &entity.smartrest_publish_topic, &cmd_id, ) { - OperationOutcome::Ignored => {} + OperationOutcome::Ignored => OperationOutcome::Ignored, OperationOutcome::Executing { mut extra_messages } => { let c8y_state_executing_payload = match self.get_operation_id(&cmd_id) { Some(op_id) if self.smart_rest_use_operation_id => { @@ -175,11 +190,12 @@ impl OperationContext { let mut messages = vec![c8y_state_executing_message]; messages.append(&mut extra_messages); - for message in messages { - mqtt_publisher.send(message).await.unwrap(); + OperationOutcome::Executing { + extra_messages: messages, } } OperationOutcome::Finished { messages } => { + // TODO(marcel): uploading logs should be pulled out if let Err(e) = self .upload_operation_log(&external_id, &cmd_id, &operation, &command) .await @@ -187,11 +203,7 @@ impl OperationContext { error!("failed to upload operation logs: {e}"); } - for message in messages { - mqtt_publisher.send(message).await.unwrap(); - } - - clear_command_topic(command, &mut mqtt_publisher).await; + OperationOutcome::Finished { messages } } } } @@ -258,18 +270,6 @@ impl OperationContext { } } -async fn clear_command_topic( - command: GenericCommandState, - mqtt_publisher: &mut LoggingSender, -) { - let command = command.clear(); - let clearing_message = command.into_message(); - assert!(clearing_message.payload_bytes().is_empty()); - assert!(clearing_message.retain); - assert_eq!(clearing_message.qos, QoS::AtLeastOnce); - mqtt_publisher.send(clearing_message).await.unwrap(); -} - /// Result of an update of operation's state. /// /// When a new MQTT message is received with an updated state of the operation, the mapper needs to diff --git a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs index 9c59f1d56b0..d77006167a6 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs @@ -26,3 +26,9 @@ mod handlers; pub use handlers::EntityTarget; mod upload; + +mod actor; +mod builder; +pub use builder::OperationHandlerBuilder; + +mod c8y_operations; diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index b224bd75dc4..77129421a87 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -9,6 +9,7 @@ use crate::actor::IdUploadResult; use crate::actor::PublishMessage; use crate::availability::AvailabilityBuilder; use crate::operations::OperationHandler; +use crate::operations::OperationHandlerBuilder; use crate::Capabilities; use assert_json_diff::assert_json_include; use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; @@ -26,7 +27,6 @@ use std::time::SystemTime; use tedge_actors::test_helpers::FakeServerBox; use tedge_actors::test_helpers::FakeServerBoxBuilder; use tedge_actors::test_helpers::MessageReceiverExt; -use tedge_actors::Actor; use tedge_actors::Builder; use tedge_actors::MessageReceiver; use tedge_actors::MessageSink; @@ -35,6 +35,7 @@ use tedge_actors::NoMessage; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; +use tedge_api::mqtt_topics::ChannelFilter; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use tedge_config::AutoLogUpload; @@ -2281,21 +2282,7 @@ async fn c8y_mapper_nested_child_service_event_mapping_to_smartrest() { async fn mapper_processes_operations_concurrently() { let num_operations = 20; - let mut fts_server = mockito::Server::new(); - let _mock = fts_server - .mock( - "GET", - "/tedge/file-transfer/test-device/config_snapshot/c8y-mapper-1234", - ) - // make each download block so it doesn't complete before we submit all operations - .with_chunked_body(|_w| { - std::thread::sleep(Duration::from_secs(5)); - Ok(()) - }) - .expect(num_operations) - .create_async() - .await; - let host_port = fts_server.host_with_port(); + let host_port = "localhost:8888"; let cfg_dir = TempTedgeDir::new(); let TestHandle { @@ -2796,12 +2783,10 @@ pub(crate) async fn spawn_c8y_mapper_actor_with_config( let bridge_health_topic = config.bridge_health_topic.clone(); let mut c8y_mapper_builder = C8yMapperBuilder::try_new( - config, + config.clone(), &mut mqtt_builder, &mut c8y_proxy_builder, &mut timer_builder, - &mut uploader_builder, - &mut downloader_builder, &mut fs_watcher_builder, &mut service_monitor_builder, ) @@ -2809,12 +2794,31 @@ pub(crate) async fn spawn_c8y_mapper_actor_with_config( let mut availability_box_builder: SimpleMessageBoxBuilder = SimpleMessageBoxBuilder::new("Availability", 10); - availability_box_builder - .connect_source(AvailabilityBuilder::channels(), &mut c8y_mapper_builder); + availability_box_builder.connect_source::>( + AvailabilityBuilder::channels(), + &mut c8y_mapper_builder, + ); c8y_mapper_builder.connect_source(NoConfig, &mut availability_box_builder); - let actor = c8y_mapper_builder.build(); - tokio::spawn(async move { actor.run().await }); + let operation_handler_builder = OperationHandlerBuilder::new( + config.to_operation_handler_config(), + &mut c8y_mapper_builder, + &mut uploader_builder, + &mut downloader_builder, + &mut c8y_proxy_builder, + ); + + let mut runtime = tedge_actors::Runtime::new(); + + runtime.spawn(operation_handler_builder).await.unwrap(); + runtime.spawn(c8y_mapper_builder).await.unwrap(); + + tokio::spawn(async move { + runtime.run_to_completion().await.unwrap(); + }); + + // let c8y_mapper_actor = c8y_mapper_builder.build(); + // tokio::spawn(async move { c8y_mapper_actor.run().await }); let mut service_monitor_box = service_monitor_builder.build(); let bridge_status_msg = MqttMessage::new(&bridge_health_topic, "1");