Skip to content

Commit

Permalink
add OperationHandlerActor
Browse files Browse the repository at this point in the history
A preliminary OperationHandlerActor which still uses OperationHandler
underneath, but receives command messages using the actor runtime.

This initial implementation shows the limitations of the entity store,
which is now a part of CumulocityConverter, and other actors don't have
any access to it. Because OperationHandlerActor needs some information
about a registered entity (smartrest publish topic), I have added
another `MessageSource<(MqttMessage, EntityMetadata)>` impl to the
`C8yMapperActor`, but this obviously very hacky.

When moving more things out of CumulocityConverter into other actors,
we'll need some organized way to access entity metadata (and entity
store) from other actors. First thing that comes to mind is making an
`EntityStoreActor` that manages the entity store and allows queries and
updates to the store via incoming messages.

But we probably shouldn't allow all actors to modify the entity store
freely, and we'll need to make sure what to do when entities are deleted
or updated.

Signed-off-by: Marcel Guzik <[email protected]>
  • Loading branch information
Bravo555 committed Aug 14, 2024
1 parent bc45ad1 commit c345ca5
Show file tree
Hide file tree
Showing 11 changed files with 693 additions and 111 deletions.
12 changes: 11 additions & 1 deletion crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl TEdgeComponent for CumulocityMapper {
MqttActorBuilder::new(service_monitor_client_config(&tedge_config)?);

let mut c8y_mapper_actor = C8yMapperBuilder::try_new(
c8y_mapper_config,
c8y_mapper_config.clone(),
&mut mqtt_actor,
&mut c8y_http_proxy_actor,
&mut timer_actor,
Expand All @@ -225,6 +225,15 @@ impl TEdgeComponent for CumulocityMapper {
)?;

let c8y_prefix = &tedge_config.c8y.bridge.topic_prefix;

let operation_handler_actor = c8y_mapper_ext::operations::OperationHandlerBuilder::new(
c8y_mapper_config.to_operation_handler_config(),
&mut c8y_mapper_actor,
&mut uploader_actor,
&mut downloader_actor,
&mut c8y_http_proxy_actor,
);

// Adaptor translating commands sent on te/device/main///cmd/+/+ into requests on tedge/commands/req/+/+
// and translating the responses received on tedge/commands/res/+/+ to te/device/main///cmd/+/+
let old_to_new_agent_adapter = OldAgentAdapter::builder(c8y_prefix, &mut mqtt_actor);
Expand Down Expand Up @@ -254,6 +263,7 @@ impl TEdgeComponent for CumulocityMapper {
if let Some(availability_actor) = availability_actor {
runtime.spawn(availability_actor).await?;
}
runtime.spawn(operation_handler_actor).await?;
runtime.run_to_completion().await?;

Ok(())
Expand Down
49 changes: 49 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tedge_actors::Sender;
use tedge_actors::Service;
use tedge_actors::SimpleMessageBox;
use tedge_actors::SimpleMessageBoxBuilder;
use tedge_api::entity_store::EntityMetadata;
use tedge_api::entity_store::EntityRegistrationMessage;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::ChannelFilter;
Expand Down Expand Up @@ -80,6 +81,9 @@ pub struct C8yMapperActor {
timer_sender: LoggingSender<SyncStart>,
bridge_status_messages: SimpleMessageBox<MqttMessage, MqttMessage>,
message_handlers: HashMap<ChannelFilter, Vec<LoggingSender<MqttMessage>>>,
// these handlers require entity metadata, so the entity already has to be registered
registered_message_handlers:
HashMap<ChannelFilter, Vec<LoggingSender<(MqttMessage, EntityMetadata)>>>,
}

#[async_trait]
Expand Down Expand Up @@ -140,6 +144,10 @@ impl C8yMapperActor {
timer_sender: LoggingSender<SyncStart>,
bridge_status_messages: SimpleMessageBox<MqttMessage, MqttMessage>,
message_handlers: HashMap<ChannelFilter, Vec<LoggingSender<MqttMessage>>>,
registered_message_handlers: HashMap<
ChannelFilter,
Vec<LoggingSender<(MqttMessage, EntityMetadata)>>,
>,
) -> Self {
Self {
converter,
Expand All @@ -148,6 +156,7 @@ impl C8yMapperActor {
timer_sender,
bridge_status_messages,
message_handlers,
registered_message_handlers,
}
}

Expand Down Expand Up @@ -267,6 +276,25 @@ impl C8yMapperActor {
sender.send(message.clone()).await?;
}
}

if let Some(message_handler) = self.registered_message_handlers.get_mut(&channel.into()) {
let (entity, _) = self
.converter
.mqtt_schema
.entity_channel_of(&message.topic)
.expect("message should've been confirmed to be using MQTT topic scheme v1");

let entity = self
.converter
.entity_store
.get(&entity)
.expect("entity should've already been registered");

for sender in message_handler {
sender.send((message.clone(), entity.clone())).await?;
}
}

Ok(())
}

Expand Down Expand Up @@ -336,6 +364,8 @@ pub struct C8yMapperBuilder {
auth_proxy: ProxyUrlGenerator,
bridge_monitor_builder: SimpleMessageBoxBuilder<MqttMessage, MqttMessage>,
message_handlers: HashMap<ChannelFilter, Vec<LoggingSender<MqttMessage>>>,
registered_message_handlers:
HashMap<ChannelFilter, Vec<LoggingSender<(MqttMessage, EntityMetadata)>>>,
}

impl C8yMapperBuilder {
Expand Down Expand Up @@ -382,6 +412,7 @@ impl C8yMapperBuilder {
);

let message_handlers = HashMap::new();
let registered_message_handlers = HashMap::new();

Ok(Self {
config,
Expand All @@ -394,6 +425,7 @@ impl C8yMapperBuilder {
auth_proxy,
bridge_monitor_builder,
message_handlers,
registered_message_handlers,
})
}

Expand Down Expand Up @@ -426,6 +458,22 @@ impl MessageSource<MqttMessage, Vec<ChannelFilter>> for C8yMapperBuilder {
}
}

impl MessageSource<(MqttMessage, EntityMetadata), Vec<ChannelFilter>> for C8yMapperBuilder {
fn connect_sink(
&mut self,
config: Vec<ChannelFilter>,
peer: &impl MessageSink<(MqttMessage, EntityMetadata)>,
) {
let sender = LoggingSender::new("Mapper MQTT".into(), peer.get_sender());
for channel in config {
self.registered_message_handlers
.entry(channel)
.or_default()
.push(sender.clone());
}
}
}

impl MessageSink<PublishMessage> for C8yMapperBuilder {
fn get_sender(&self) -> DynSender<PublishMessage> {
self.box_builder.get_sender().sender_clone()
Expand Down Expand Up @@ -459,6 +507,7 @@ impl Builder<C8yMapperActor> for C8yMapperBuilder {
timer_sender,
bridge_monitor_box,
self.message_handlers,
self.registered_message_handlers,
))
}
}
1 change: 1 addition & 0 deletions crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const STATE_DIR_NAME: &str = ".tedge-mapper-c8y";
const C8Y_CLOUD: &str = "c8y";
const SUPPORTED_OPERATIONS_DIRECTORY: &str = "operations";

#[derive(Clone)]
pub struct C8yMapperConfig {
pub device_id: String,
pub device_topic_id: EntityTopicId,
Expand Down
24 changes: 0 additions & 24 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ use crate::actor::IdDownloadResult;
use crate::dynamic_discovery::DiscoverOp;
use crate::error::ConversionError;
use crate::json;
use crate::operations;
use crate::operations::OperationHandler;
use anyhow::anyhow;
use anyhow::Context;
use c8y_api::http_proxy::C8yEndPoint;
Expand Down Expand Up @@ -184,8 +182,6 @@ pub struct CumulocityConverter {
pub command_id: IdGenerator,
// Keep active command IDs to avoid creation of multiple commands for an operation
pub active_commands: HashSet<CmdId>,

pub operation_handler: OperationHandler,
}

impl CumulocityConverter {
Expand Down Expand Up @@ -245,15 +241,6 @@ impl CumulocityConverter {

let command_id = config.id_generator();

let operation_handler = OperationHandler::new(
&config,
downloader,
uploader,
mqtt_publisher.clone(),
http_proxy.clone(),
auth_proxy.clone(),
);

Ok(CumulocityConverter {
size_threshold,
config: Arc::new(config),
Expand All @@ -274,7 +261,6 @@ impl CumulocityConverter {
auth_proxy,
command_id,
active_commands: HashSet::new(),
operation_handler,
})
}

Expand Down Expand Up @@ -1169,16 +1155,6 @@ impl CumulocityConverter {
Channel::Command { cmd_id, .. } if self.command_id.is_generator_of(cmd_id) => {
self.active_commands.insert(cmd_id.clone());

let entity = self.entity_store.try_get(&source)?;
let external_id = entity.external_id.clone();
let entity = operations::EntityTarget {
topic_id: entity.topic_id.clone(),
external_id: external_id.clone(),
smartrest_publish_topic: self
.smartrest_publish_topic_for_entity(&entity.topic_id)?,
};

self.operation_handler.handle(entity, message.clone()).await;
Ok(vec![])
}

Expand Down
2 changes: 1 addition & 1 deletion crates/extensions/c8y_mapper_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub mod error;
mod fragments;
mod inventory;
pub mod json;
mod operations;
pub mod operations;
mod serializer;
pub mod service_monitor;
#[cfg(test)]
Expand Down
Loading

0 comments on commit c345ca5

Please sign in to comment.