Skip to content

Commit

Permalink
OperationHandler::topic_filter
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Guzik <[email protected]>
  • Loading branch information
Bravo555 committed Jul 29, 2024
1 parent 80e6962 commit 8c484d0
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 76 deletions.
26 changes: 6 additions & 20 deletions crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::operations::OperationHandler;
use crate::Capabilities;
use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic;
use c8y_api::smartrest::error::OperationsError;
Expand Down Expand Up @@ -198,26 +199,11 @@ impl C8yMapperConfig {
topics.add_all(mqtt_schema.topics(AnyEntity, CommandMetadata(cmd)));
}

if capabilities.log_upload {
topics.add_all(crate::operations::topic_filter::log_upload_topic_filter(
&mqtt_schema,
));
}
if capabilities.config_snapshot {
topics.add_all(
crate::operations::topic_filter::config_snapshot_topic_filter(&mqtt_schema),
);
}
if capabilities.config_update {
topics.add_all(crate::operations::topic_filter::config_update_topic_filter(
&mqtt_schema,
));
}
if capabilities.firmware_update {
topics.add_all(
crate::operations::topic_filter::firmware_update_topic_filter(&mqtt_schema),
);
}
let operation_topics = OperationHandler::topic_filter(&capabilities)
.into_iter()
.map(|(e, c)| mqtt_schema.topics(e, c))
.collect();
topics.add_all(operation_topics);

// Add user configurable external topic filters
for topic in tedge_config.c8y.topics.0.clone() {
Expand Down
8 changes: 4 additions & 4 deletions crates/extensions/c8y_mapper_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ mod tests;

#[derive(Debug, Clone, Copy, serde::Deserialize)]
pub struct Capabilities {
log_upload: bool,
config_snapshot: bool,
config_update: bool,
firmware_update: bool,
pub log_upload: bool,
pub config_snapshot: bool,
pub config_update: bool,
pub firmware_update: bool,
}

#[cfg(test)]
Expand Down
44 changes: 44 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/operations/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::actor::IdDownloadResult;
use crate::actor::IdUploadRequest;
use crate::actor::IdUploadResult;
use crate::config::C8yMapperConfig;
use crate::Capabilities;
use c8y_api::http_proxy::C8yEndPoint;
use c8y_auth_proxy::url::ProxyUrlGenerator;
use c8y_http_proxy::handle::C8YHttpProxy;
Expand All @@ -15,6 +16,8 @@ use std::sync::Arc;
use tedge_actors::ClientMessageBox;
use tedge_actors::LoggingSender;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::ChannelFilter;
use tedge_api::mqtt_topics::EntityFilter;
use tedge_api::mqtt_topics::IdGenerator;
use tedge_mqtt_ext::MqttMessage;
use tokio::task::JoinError;
Expand Down Expand Up @@ -149,6 +152,47 @@ impl OperationHandler {
OperationStatus::Terminated => {}
}
}

/// A topic filter for operation types this object can handle.
///
/// The MQTT client should subscribe to topics with this filter to receive MQTT messages that it
/// should then pass to the [`Self::handle`] method. Depending on the tedge configuration, some
/// operations may be disabled and therefore absent in the filter.
pub fn topic_filter(capabilities: &Capabilities) -> Vec<(EntityFilter, ChannelFilter)> {
use tedge_api::mqtt_topics::ChannelFilter::Command;
use tedge_api::mqtt_topics::ChannelFilter::CommandMetadata;
use tedge_api::mqtt_topics::EntityFilter::AnyEntity;
use tedge_api::mqtt_topics::OperationType;

let mut topics = vec![];

if capabilities.log_upload {
topics.extend([
(AnyEntity, Command(OperationType::LogUpload)),
(AnyEntity, CommandMetadata(OperationType::LogUpload)),
]);
}
if capabilities.config_snapshot {
topics.extend([
(AnyEntity, Command(OperationType::ConfigSnapshot)),
(AnyEntity, CommandMetadata(OperationType::ConfigSnapshot)),
]);
}
if capabilities.config_update {
topics.extend([
(AnyEntity, Command(OperationType::ConfigUpdate)),
(AnyEntity, CommandMetadata(OperationType::ConfigUpdate)),
]);
}
if capabilities.firmware_update {
topics.extend([
(AnyEntity, Command(OperationType::FirmwareUpdate)),
(AnyEntity, CommandMetadata(OperationType::FirmwareUpdate)),
]);
}

topics
}
}

struct RunningOperation {
Expand Down
1 change: 0 additions & 1 deletion crates/extensions/c8y_mapper_ext/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,4 @@ pub use handler::OperationHandler;
mod handlers;
pub use handlers::EntityTarget;

pub mod topic_filter;
mod upload;
43 changes: 0 additions & 43 deletions crates/extensions/c8y_mapper_ext/src/operations/topic_filter.rs

This file was deleted.

24 changes: 16 additions & 8 deletions crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::actor::IdUploadRequest;
use crate::actor::IdUploadResult;
use crate::actor::PublishMessage;
use crate::availability::AvailabilityBuilder;
use crate::operations::OperationHandler;
use crate::Capabilities;
use assert_json_diff::assert_json_include;
use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic;
Expand Down Expand Up @@ -2779,13 +2780,20 @@ pub(crate) fn test_mapper_config(tmp_dir: &TempTedgeDir) -> C8yMapperConfig {
let mut topics =
C8yMapperConfig::default_internal_topic_filter(tmp_dir.path(), &"c8y".try_into().unwrap())
.unwrap();
topics.add_all(crate::operations::topic_filter::log_upload_topic_filter(
&mqtt_schema,
));
topics.add_all(crate::operations::topic_filter::config_snapshot_topic_filter(&mqtt_schema));
topics.add_all(crate::operations::topic_filter::config_update_topic_filter(
&mqtt_schema,
));

let capabilities = Capabilities {
log_upload: true,
config_snapshot: true,
config_update: true,
firmware_update: true,
};

let operation_topics = OperationHandler::topic_filter(&capabilities)
.into_iter()
.map(|(e, c)| mqtt_schema.topics(e, c))
.collect();
topics.add_all(operation_topics);

topics.add_all(C8yMapperConfig::default_external_topic_filter());

C8yMapperConfig::new(
Expand All @@ -2801,7 +2809,7 @@ pub(crate) fn test_mapper_config(tmp_dir: &TempTedgeDir) -> C8yMapperConfig {
c8y_host,
tedge_http_host,
topics,
Capabilities::default(),
capabilities,
auth_proxy_addr,
auth_proxy_port,
Protocol::Http,
Expand Down

0 comments on commit 8c484d0

Please sign in to comment.