Skip to content

Commit

Permalink
Services publish registration message on startup
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Guzik <[email protected]>
  • Loading branch information
Bravo555 committed Oct 16, 2023
1 parent 4629541 commit 7e6d64c
Show file tree
Hide file tree
Showing 23 changed files with 405 additions and 124 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bin/c8y-device-management/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ clap = { workspace = true }
env_logger = { workspace = true }
log = { workspace = true }
tedge_actors = { workspace = true }
tedge_api = { workspace = true }
tedge_config = { workspace = true }
tedge_downloader_ext = { workspace = true }
tedge_file_system_ext = { workspace = true }
Expand Down
31 changes: 30 additions & 1 deletion crates/bin/c8y-device-management/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Context;
use c8y_config_manager::ConfigManagerBuilder;
use c8y_config_manager::ConfigManagerConfig;
use c8y_firmware_manager::FirmwareManagerBuilder;
Expand All @@ -9,6 +10,10 @@ use c8y_log_manager::LogManagerConfig;
use clap::Parser;
use std::path::PathBuf;
use tedge_actors::Runtime;
use tedge_api::mqtt_topics::DeviceTopicId;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::Service;
use tedge_config::TEdgeConfigLocation;
use tedge_config::TEdgeConfigRepository;
use tedge_config::DEFAULT_TEDGE_CONFIG_PATH;
Expand Down Expand Up @@ -92,7 +97,31 @@ async fn main() -> anyhow::Result<()> {
)?;

// Instantiate health monitor actor
let health_actor = HealthMonitorBuilder::new(PLUGIN_NAME, &mut mqtt_actor);
// TODO: take a user-configurable service topic id
let mqtt_device_topic_id = &tedge_config
.mqtt
.device_topic_id
.parse::<EntityTopicId>()
.unwrap();

let service_topic_id = mqtt_device_topic_id
.to_default_service_topic_id(PLUGIN_NAME)
.with_context(|| {
format!(
"Device topic id {mqtt_device_topic_id} currently needs default scheme, e.g: 'device/DEVICE_NAME//'",
)
})?;
let service = Service {
service_topic_id,
device_topic_id: DeviceTopicId::new(mqtt_device_topic_id.clone()),
};
let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.to_string());
let health_actor = HealthMonitorBuilder::from_service_topic_id(
service,
&mut mqtt_actor,
&mqtt_schema,
tedge_config.service.ty.clone(),
);

// Shutdown on SIGINT
let signal_actor = SignalActor::builder(&runtime.get_handle());
Expand Down
3 changes: 3 additions & 0 deletions crates/core/tedge_agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub struct AgentConfig {
pub data_dir: DataDir,
pub mqtt_device_topic_id: EntityTopicId,
pub mqtt_topic_root: Arc<str>,
pub service_type: String,
}

impl AgentConfig {
Expand Down Expand Up @@ -114,6 +115,7 @@ impl AgentConfig {
log_dir,
mqtt_topic_root,
mqtt_device_topic_id,
service_type: tedge_config.service.ty.clone(),
})
}
}
Expand Down Expand Up @@ -199,6 +201,7 @@ impl Agent {
service,
&mut mqtt_actor_builder,
&mqtt_schema,
self.config.service_type.clone(),
);

// Tedge to Te topic converter
Expand Down
49 changes: 43 additions & 6 deletions crates/core/tedge_api/src/entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@
// TODO: move entity business logic to its own module

use crate::entity_store;
use crate::mqtt_topics::Channel;
use crate::mqtt_topics::EntityTopicId;
use crate::mqtt_topics::MqttSchema;
use crate::mqtt_topics::TopicIdError;
use log::debug;
use mqtt_channel::Message;
use serde_json::json;
use serde_json::Value as JsonValue;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::Display;
use thiserror::Error;

/// Represents an "Entity topic identifier" portion of the MQTT topic
Expand Down Expand Up @@ -473,6 +476,16 @@ pub enum EntityType {
Service,
}

impl Display for EntityType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EntityType::MainDevice => write!(f, "device"),
EntityType::ChildDevice => write!(f, "child-device"),
EntityType::Service => write!(f, "service"),
}
}
}

impl EntityMetadata {
/// Creates a entity metadata for the main device.
pub fn main_device(device_id: String) -> Self {
Expand Down Expand Up @@ -550,6 +563,14 @@ impl EntityRegistrationMessage {
// Serialize/Deserialize.
#[must_use]
pub fn new(message: &Message) -> Option<Self> {
let topic_id = message
.topic
.name
.strip_prefix(MQTT_ROOT)
.and_then(|s| s.strip_prefix('/'))?;

dbg!(topic_id);

let payload = parse_entity_register_payload(message.payload_bytes())?;

let JsonValue::Object(mut properties) = payload else {
Expand Down Expand Up @@ -596,12 +617,6 @@ impl EntityRegistrationMessage {
None
};

let topic_id = message
.topic
.name
.strip_prefix(MQTT_ROOT)
.and_then(|s| s.strip_prefix('/'))?;

let other = JsonValue::Object(properties);

assert_eq!(other.get("@id"), None);
Expand All @@ -627,6 +642,28 @@ impl EntityRegistrationMessage {
other: serde_json::json!({}),
}
}

// TODO: manual serialize impl
pub fn to_mqtt_message(mut self, mqtt_schema: &MqttSchema) -> Message {
let mut props = serde_json::Map::new();

props.insert("@type".to_string(), self.r#type.to_string().into());

if let Some(external_id) = self.external_id {
props.insert("@id".to_string(), external_id.as_ref().to_string().into());
}

if let Some(parent) = self.parent {
props.insert("@parent".to_string(), parent.to_string().into());
}

props.append(self.other.as_object_mut().unwrap());

let message = serde_json::to_string(&props).unwrap();

let message_topic = mqtt_schema.topic_for(&self.topic_id, &Channel::EntityMetadata);
Message::new(&message_topic, message).with_retain()
}
}

impl TryFrom<&Message> for EntityRegistrationMessage {
Expand Down
43 changes: 33 additions & 10 deletions crates/core/tedge_api/src/mqtt_topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ impl MqttSchema {
/// "te/device/child001///a/sensors/meta"
/// );
/// ```
// TODO: add test
pub fn topic_for(&self, entity: &EntityTopicId, channel: &Channel) -> mqtt_channel::Topic {
let topic = format!("{}/{}/{}", self.root, entity, channel);
let topic = format!("{}/{}{}", self.root, entity, channel);
mqtt_channel::Topic::new(&topic).unwrap()
}

Expand Down Expand Up @@ -272,6 +273,16 @@ impl EntityTopicId {
format!("device/{child}/service/{service}").parse()
}

/// Assuming `self` is a device in default MQTT scheme, create an
/// `EntityTopicId` for a service on that device.
///
/// Returns `None` if `self` is not in default MQTT scheme or if `service`
/// is an invalid service name.
pub fn default_service_for_device(&self, service: &str) -> Option<Self> {
let device_name = self.default_device_name()?;
Self::default_child_service(device_name, service).ok()
}

/// Returns true if the current topic id matches the default topic scheme:
/// - device/<device-id>// : for devices
/// - device/<device-id>/service/<service-id> : for services
Expand Down Expand Up @@ -400,6 +411,12 @@ impl ServiceTopicId {
}
}

impl From<EntityTopicId> for ServiceTopicId {
fn from(value: EntityTopicId) -> Self {
Self::new(value)
}
}

impl Display for ServiceTopicId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
Expand Down Expand Up @@ -427,6 +444,12 @@ impl DeviceTopicId {
}
}

impl From<EntityTopicId> for DeviceTopicId {
fn from(value: EntityTopicId) -> Self {
Self::new(value)
}
}

#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)]
pub enum TopicIdError {
#[error("An entity topic identifier has at most 4 segments")]
Expand Down Expand Up @@ -517,20 +540,20 @@ impl Display for Channel {
match self {
Channel::EntityMetadata => Ok(()),

Channel::Measurement { measurement_type } => write!(f, "m/{measurement_type}"),
Channel::Measurement { measurement_type } => write!(f, "/m/{measurement_type}"),
Channel::MeasurementMetadata { measurement_type } => {
write!(f, "m/{measurement_type}/meta")
write!(f, "/m/{measurement_type}/meta")
}

Channel::Event { event_type } => write!(f, "e/{event_type}"),
Channel::EventMetadata { event_type } => write!(f, "e/{event_type}/meta"),
Channel::Event { event_type } => write!(f, "/e/{event_type}"),
Channel::EventMetadata { event_type } => write!(f, "/e/{event_type}/meta"),

Channel::Alarm { alarm_type } => write!(f, "a/{alarm_type}"),
Channel::AlarmMetadata { alarm_type } => write!(f, "a/{alarm_type}/meta"),
Channel::Alarm { alarm_type } => write!(f, "/a/{alarm_type}"),
Channel::AlarmMetadata { alarm_type } => write!(f, "/a/{alarm_type}/meta"),

Channel::Command { operation, cmd_id } => write!(f, "cmd/{operation}/{cmd_id}"),
Channel::CommandMetadata { operation } => write!(f, "cmd/{operation}"),
Channel::Health => write!(f, "status/health"),
Channel::Command { operation, cmd_id } => write!(f, "/cmd/{operation}/{cmd_id}"),
Channel::CommandMetadata { operation } => write!(f, "/cmd/{operation}"),
Channel::Health => write!(f, "/status/health"),
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions crates/core/tedge_mapper/src/core/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ pub async fn start_basic_actors(
device_topic_id: DeviceTopicId::new("device/main//".parse::<EntityTopicId>().unwrap()),
};
let mqtt_schema = MqttSchema::with_root(config.mqtt.topic_root.clone());
let health_actor =
HealthMonitorBuilder::from_service_topic_id(service, &mut mqtt_actor, &mqtt_schema);
let health_actor = HealthMonitorBuilder::from_service_topic_id(
service,
&mut mqtt_actor,
&mqtt_schema,
config.service.ty.clone(),
);

// Shutdown on SIGINT
let signal_actor = SignalActor::builder(&runtime.get_handle());
Expand Down
Loading

0 comments on commit 7e6d64c

Please sign in to comment.