diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index 2080c40ec3b..393de82eb3a 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -1,3 +1,4 @@ +use crate::operations::OperationHandler; use crate::Capabilities; use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; use c8y_api::smartrest::error::OperationsError; @@ -198,26 +199,11 @@ impl C8yMapperConfig { topics.add_all(mqtt_schema.topics(AnyEntity, CommandMetadata(cmd))); } - if capabilities.log_upload { - topics.add_all(crate::operations::topic_filter::log_upload_topic_filter( - &mqtt_schema, - )); - } - if capabilities.config_snapshot { - topics.add_all( - crate::operations::topic_filter::config_snapshot_topic_filter(&mqtt_schema), - ); - } - if capabilities.config_update { - topics.add_all(crate::operations::topic_filter::config_update_topic_filter( - &mqtt_schema, - )); - } - if capabilities.firmware_update { - topics.add_all( - crate::operations::topic_filter::firmware_update_topic_filter(&mqtt_schema), - ); - } + let operation_topics = OperationHandler::topic_filter(&capabilities) + .into_iter() + .map(|(e, c)| mqtt_schema.topics(e, c)) + .collect(); + topics.add_all(operation_topics); // Add user configurable external topic filters for topic in tedge_config.c8y.topics.0.clone() { diff --git a/crates/extensions/c8y_mapper_ext/src/lib.rs b/crates/extensions/c8y_mapper_ext/src/lib.rs index 1697ff488cc..26d8c6bf723 100644 --- a/crates/extensions/c8y_mapper_ext/src/lib.rs +++ b/crates/extensions/c8y_mapper_ext/src/lib.rs @@ -17,10 +17,10 @@ mod tests; #[derive(Debug, Clone, Copy, serde::Deserialize)] pub struct Capabilities { - log_upload: bool, - config_snapshot: bool, - config_update: bool, - firmware_update: bool, + pub log_upload: bool, + pub config_snapshot: bool, + pub config_update: bool, + pub firmware_update: bool, } #[cfg(test)] diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handler.rs b/crates/extensions/c8y_mapper_ext/src/operations/handler.rs index 6e19246c398..b930d3b8d4a 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handler.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handler.rs @@ -7,6 +7,7 @@ use crate::actor::IdDownloadResult; use crate::actor::IdUploadRequest; use crate::actor::IdUploadResult; use crate::config::C8yMapperConfig; +use crate::Capabilities; use c8y_api::http_proxy::C8yEndPoint; use c8y_auth_proxy::url::ProxyUrlGenerator; use c8y_http_proxy::handle::C8YHttpProxy; @@ -15,6 +16,8 @@ use std::sync::Arc; use tedge_actors::ClientMessageBox; use tedge_actors::LoggingSender; use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::ChannelFilter; +use tedge_api::mqtt_topics::EntityFilter; use tedge_api::mqtt_topics::IdGenerator; use tedge_mqtt_ext::MqttMessage; use tokio::task::JoinError; @@ -149,6 +152,47 @@ impl OperationHandler { OperationStatus::Terminated => {} } } + + /// A topic filter for operation types this object can handle. + /// + /// The MQTT client should subscribe to topics with this filter to receive MQTT messages that it + /// should then pass to the [`Self::handle`] method. Depending on the tedge configuration, some + /// operations may be disabled and therefore absent in the filter. + pub fn topic_filter(capabilities: &Capabilities) -> Vec<(EntityFilter, ChannelFilter)> { + use tedge_api::mqtt_topics::ChannelFilter::Command; + use tedge_api::mqtt_topics::ChannelFilter::CommandMetadata; + use tedge_api::mqtt_topics::EntityFilter::AnyEntity; + use tedge_api::mqtt_topics::OperationType; + + let mut topics = vec![]; + + if capabilities.log_upload { + topics.extend([ + (AnyEntity, Command(OperationType::LogUpload)), + (AnyEntity, CommandMetadata(OperationType::LogUpload)), + ]); + } + if capabilities.config_snapshot { + topics.extend([ + (AnyEntity, Command(OperationType::ConfigSnapshot)), + (AnyEntity, CommandMetadata(OperationType::ConfigSnapshot)), + ]); + } + if capabilities.config_update { + topics.extend([ + (AnyEntity, Command(OperationType::ConfigUpdate)), + (AnyEntity, CommandMetadata(OperationType::ConfigUpdate)), + ]); + } + if capabilities.firmware_update { + topics.extend([ + (AnyEntity, Command(OperationType::FirmwareUpdate)), + (AnyEntity, CommandMetadata(OperationType::FirmwareUpdate)), + ]); + } + + topics + } } struct RunningOperation { diff --git a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs index 47c9587a3b7..30c4ce6e528 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs @@ -26,5 +26,4 @@ pub use handler::OperationHandler; mod handlers; pub use handlers::EntityTarget; -pub mod topic_filter; mod upload; diff --git a/crates/extensions/c8y_mapper_ext/src/operations/topic_filter.rs b/crates/extensions/c8y_mapper_ext/src/operations/topic_filter.rs deleted file mode 100644 index eae3d866272..00000000000 --- a/crates/extensions/c8y_mapper_ext/src/operations/topic_filter.rs +++ /dev/null @@ -1,43 +0,0 @@ -//! Operations' topic filters. - -use tedge_api::mqtt_topics::ChannelFilter::Command; -use tedge_api::mqtt_topics::ChannelFilter::CommandMetadata; -use tedge_api::mqtt_topics::EntityFilter::AnyEntity; -use tedge_api::mqtt_topics::MqttSchema; -use tedge_api::mqtt_topics::OperationType; -use tedge_mqtt_ext::TopicFilter; - -pub fn config_snapshot_topic_filter(mqtt_schema: &MqttSchema) -> TopicFilter { - [ - mqtt_schema.topics(AnyEntity, Command(OperationType::ConfigSnapshot)), - mqtt_schema.topics(AnyEntity, CommandMetadata(OperationType::ConfigSnapshot)), - ] - .into_iter() - .collect() -} -pub fn log_upload_topic_filter(mqtt_schema: &MqttSchema) -> TopicFilter { - [ - mqtt_schema.topics(AnyEntity, Command(OperationType::LogUpload)), - mqtt_schema.topics(AnyEntity, CommandMetadata(OperationType::LogUpload)), - ] - .into_iter() - .collect() -} - -pub fn config_update_topic_filter(mqtt_schema: &MqttSchema) -> TopicFilter { - [ - mqtt_schema.topics(AnyEntity, Command(OperationType::ConfigUpdate)), - mqtt_schema.topics(AnyEntity, CommandMetadata(OperationType::ConfigUpdate)), - ] - .into_iter() - .collect() -} - -pub fn firmware_update_topic_filter(mqtt_schema: &MqttSchema) -> TopicFilter { - [ - mqtt_schema.topics(AnyEntity, Command(OperationType::FirmwareUpdate)), - mqtt_schema.topics(AnyEntity, CommandMetadata(OperationType::FirmwareUpdate)), - ] - .into_iter() - .collect() -} diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index 5800bfe40f9..daef3881056 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -8,6 +8,7 @@ use crate::actor::IdUploadRequest; use crate::actor::IdUploadResult; use crate::actor::PublishMessage; use crate::availability::AvailabilityBuilder; +use crate::operations::OperationHandler; use crate::Capabilities; use assert_json_diff::assert_json_include; use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; @@ -2779,13 +2780,20 @@ pub(crate) fn test_mapper_config(tmp_dir: &TempTedgeDir) -> C8yMapperConfig { let mut topics = C8yMapperConfig::default_internal_topic_filter(tmp_dir.path(), &"c8y".try_into().unwrap()) .unwrap(); - topics.add_all(crate::operations::topic_filter::log_upload_topic_filter( - &mqtt_schema, - )); - topics.add_all(crate::operations::topic_filter::config_snapshot_topic_filter(&mqtt_schema)); - topics.add_all(crate::operations::topic_filter::config_update_topic_filter( - &mqtt_schema, - )); + + let capabilities = Capabilities { + log_upload: true, + config_snapshot: true, + config_update: true, + firmware_update: true, + }; + + let operation_topics = OperationHandler::topic_filter(&capabilities) + .into_iter() + .map(|(e, c)| mqtt_schema.topics(e, c)) + .collect(); + topics.add_all(operation_topics); + topics.add_all(C8yMapperConfig::default_external_topic_filter()); C8yMapperConfig::new( @@ -2801,7 +2809,7 @@ pub(crate) fn test_mapper_config(tmp_dir: &TempTedgeDir) -> C8yMapperConfig { c8y_host, tedge_http_host, topics, - Capabilities::default(), + capabilities, auth_proxy_addr, auth_proxy_port, Protocol::Http,