Skip to content

Commit

Permalink
refactor: Refactor health status code
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh committed Jun 7, 2024
1 parent 0d49756 commit 7530af5
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 141 deletions.
55 changes: 0 additions & 55 deletions crates/core/c8y_api/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,3 @@
pub mod bridge {
use mqtt_channel::MqttMessage;
use tedge_api::main_device_health_topic;
use tedge_api::MQTT_BRIDGE_DOWN_PAYLOAD;
use tedge_api::MQTT_BRIDGE_UP_PAYLOAD;

pub fn is_c8y_bridge_established(message: &MqttMessage, service: &str) -> bool {
let c8y_bridge_health_topic = main_device_health_topic(service);
match message.payload_str() {
Ok(payload) => {
message.topic.name == c8y_bridge_health_topic
&& (payload == MQTT_BRIDGE_UP_PAYLOAD
|| payload == MQTT_BRIDGE_DOWN_PAYLOAD
|| is_valid_status_payload(payload))
}
Err(_err) => false,
}
}

#[derive(serde::Deserialize)]
struct HealthStatus<'a> {
status: &'a str,
}

fn is_valid_status_payload(payload: &str) -> bool {
serde_json::from_str::<HealthStatus>(payload)
.map_or(false, |h| h.status == "up" || h.status == "down")
}
}

pub mod child_device {
use crate::smartrest::topic::C8yTopic;
use mqtt_channel::MqttMessage;
Expand All @@ -40,28 +10,3 @@ pub mod child_device {
)
}
}

#[cfg(test)]
mod tests {
use mqtt_channel::MqttMessage;
use mqtt_channel::Topic;
use test_case::test_case;

use crate::utils::bridge::is_c8y_bridge_established;

const C8Y_BRIDGE_HEALTH_TOPIC: &str =
"te/device/main/service/tedge-mapper-bridge-c8y/status/health";

#[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "1", true)]
#[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "0", true)]
#[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "bad payload", false)]
#[test_case("tedge/not/health/topic", "1", false)]
#[test_case("tedge/not/health/topic", "0", false)]
fn test_bridge_is_established(topic: &str, payload: &str, expected: bool) {
let topic = Topic::new(topic).unwrap();
let message = MqttMessage::new(&topic, payload);

let actual = is_c8y_bridge_established(&message, "tedge-mapper-bridge-c8y");
assert_eq!(actual, expected);
}
}
58 changes: 53 additions & 5 deletions crates/core/tedge_api/src/health.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,37 @@
use crate::mqtt_topics::Channel;
use crate::mqtt_topics::EntityTopicId;
use crate::mqtt_topics::MqttSchema;
use crate::mqtt_topics::ServiceTopicId;
use clock::Clock;
use clock::WallClock;
use log::error;
use mqtt_channel::MqttMessage;
use mqtt_channel::Topic;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use std::process;
use std::sync::Arc;
use tedge_utils::timestamp::TimeFormat;

pub const MQTT_BRIDGE_UP_PAYLOAD: &str = "1";
pub const MQTT_BRIDGE_DOWN_PAYLOAD: &str = "0";
pub const MOSQUITTO_BRIDGE_PREFIX: &str = "mosquitto-";
pub const MOSQUITTO_BRIDGE_SUFFIX: &str = "-bridge";
pub const MOSQUITTO_BRIDGE_UP_PAYLOAD: &str = "1";
pub const MOSQUITTO_BRIDGE_DOWN_PAYLOAD: &str = "0";

pub const UP_STATUS: &str = "up";
pub const DOWN_STATUS: &str = "down";
pub const UNKNOWN_STATUS: &str = "unknown";

// FIXME: doesn't account for custom topic root, use MQTT scheme API here
pub fn main_device_health_topic(service: &str) -> String {
format!("te/device/main/service/{service}/status/health")
pub fn service_health_topic(
mqtt_schema: &MqttSchema,
device_topic_id: &EntityTopicId,
service: &str,
) -> Topic {
mqtt_schema.topic_for(
&device_topic_id.default_service_for_device(service).unwrap(),
&Channel::Health,
)
}

/// Encodes a valid health topic.
Expand Down Expand Up @@ -95,6 +107,42 @@ impl ServiceHealthTopic {
#[derive(Debug)]
pub struct HealthTopicError;

#[derive(Deserialize, Serialize, Debug, Default)]
pub struct HealthStatus {
#[serde(default = "default_status")]
pub status: String,
}

fn default_status() -> String {
"unknown".to_string()
}

impl HealthStatus {
pub fn from_mosquitto_bridge_payload_str(payload: &str) -> Self {
let status = match payload {
MOSQUITTO_BRIDGE_UP_PAYLOAD => UP_STATUS,
MOSQUITTO_BRIDGE_DOWN_PAYLOAD => DOWN_STATUS,
_ => UNKNOWN_STATUS,
};
HealthStatus {
status: status.into(),
}
}

pub fn is_valid(&self) -> bool {
self.status == UP_STATUS || self.status == DOWN_STATUS
}
}

pub fn entity_is_mosquitto_bridge_service(entity_topic_id: &EntityTopicId) -> bool {
entity_topic_id
.default_service_name()
.filter(|name| {
name.starts_with(MOSQUITTO_BRIDGE_PREFIX) && name.ends_with(MOSQUITTO_BRIDGE_SUFFIX)
})
.is_some()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
6 changes: 0 additions & 6 deletions crates/core/tedge_api/src/mqtt_topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,6 @@ impl EntityTopicId {
pub fn as_str(&self) -> &str {
self.0.as_str()
}

// FIXME: can also match "device/bridge//" or "/device/main/service/my_custom_bridge"
// should match ONLY the single mapper bridge
pub fn is_bridge_health_topic(&self) -> bool {
self.as_str().contains("bridge")
}
}

/// Contains a topic id of the service itself and the associated device.
Expand Down
16 changes: 14 additions & 2 deletions crates/core/tedge_mapper/src/aws/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ use async_trait::async_trait;
use aws_mapper_ext::converter::AwsConverter;
use clock::WallClock;
use mqtt_channel::TopicFilter;
use std::str::FromStr;
use tedge_actors::ConvertingActor;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::service_health_topic;
use tedge_config::TEdgeConfig;
use tedge_mqtt_bridge::use_key_and_cert;
use tedge_mqtt_bridge::BridgeConfig;
use tedge_mqtt_bridge::MqttBridgeActorBuilder;
use tracing::warn;

const AWS_MAPPER_NAME: &str = "tedge-mapper-aws";
const BUILT_IN_BRIDGE_NAME: &str = "tedge-mapper-bridge-aws";

pub struct AwsMapper;

Expand All @@ -32,8 +36,12 @@ impl TEdgeComponent for AwsMapper {
) -> Result<(), anyhow::Error> {
let (mut runtime, mut mqtt_actor) =
start_basic_actors(self.session_name(), &tedge_config).await?;

let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone());
if tedge_config.mqtt.bridge.built_in {
let device_id = tedge_config.device.id.try_read(&tedge_config)?;
let device_topic_id = EntityTopicId::from_str(&tedge_config.mqtt.device_topic_id)?;

let rules = built_in_bridge_rules(device_id)?;

let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new(
Expand All @@ -47,17 +55,21 @@ impl TEdgeComponent for AwsMapper {
&tedge_config.aws.root_cert_path,
&tedge_config,
)?;

let health_topic =
service_health_topic(&mqtt_schema, &device_topic_id, BUILT_IN_BRIDGE_NAME);

let bridge_actor = MqttBridgeActorBuilder::new(
&tedge_config,
"tedge-mapper-bridge-aws".to_owned(),
BUILT_IN_BRIDGE_NAME,
&health_topic,
rules,
cloud_config,
)
.await;
runtime.spawn(bridge_actor).await?;
}
let clock = Box::new(WallClock);
let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone());
let aws_converter = AwsConverter::new(
tedge_config.aws.mapper.timestamp,
clock,
Expand Down
14 changes: 13 additions & 1 deletion crates/core/tedge_mapper/src/az/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ use async_trait::async_trait;
use az_mapper_ext::converter::AzureConverter;
use clock::WallClock;
use mqtt_channel::TopicFilter;
use std::str::FromStr;
use tedge_actors::ConvertingActor;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::service_health_topic;
use tedge_config::TEdgeConfig;
use tedge_mqtt_bridge::use_key_and_cert;
use tedge_mqtt_bridge::BridgeConfig;
use tedge_mqtt_bridge::MqttBridgeActorBuilder;
use tracing::warn;

const AZURE_MAPPER_NAME: &str = "tedge-mapper-az";
const BUILT_IN_BRIDGE_NAME: &str = "tedge-mapper-bridge-az";

pub struct AzureMapper;

Expand All @@ -32,7 +36,11 @@ impl TEdgeComponent for AzureMapper {
) -> Result<(), anyhow::Error> {
let (mut runtime, mut mqtt_actor) =
start_basic_actors(self.session_name(), &tedge_config).await?;
let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone());

if tedge_config.mqtt.bridge.built_in {
let device_topic_id = EntityTopicId::from_str(&tedge_config.mqtt.device_topic_id)?;

let remote_clientid = tedge_config.device.id.try_read(&tedge_config)?;
let rules = built_in_bridge_rules(remote_clientid)?;

Expand All @@ -55,9 +63,13 @@ impl TEdgeComponent for AzureMapper {
&tedge_config,
)?;

let health_topic =
service_health_topic(&mqtt_schema, &device_topic_id, BUILT_IN_BRIDGE_NAME);

let bridge_actor = MqttBridgeActorBuilder::new(
&tedge_config,
"tedge-mapper-bridge-az".to_owned(),
BUILT_IN_BRIDGE_NAME,
&health_topic,
rules,
cloud_config,
)
Expand Down
5 changes: 3 additions & 2 deletions crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl TEdgeComponent for CumulocityMapper {
let last_will_message_bridge =
c8y_api::smartrest::inventory::service_creation_message_payload(
mapper_service_external_id.as_ref(),
&c8y_mapper_config.bridge_service_name(),
&c8y_mapper_config.bridge_service_name,
service_type.as_str(),
"down",
)?;
Expand All @@ -175,7 +175,8 @@ impl TEdgeComponent for CumulocityMapper {
.spawn(
MqttBridgeActorBuilder::new(
&tedge_config,
c8y_mapper_config.bridge_service_name(),
&c8y_mapper_config.bridge_service_name,
&c8y_mapper_config.bridge_health_topic,
tc,
cloud_config,
)
Expand Down
14 changes: 4 additions & 10 deletions crates/extensions/c8y_mapper_ext/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use super::dynamic_discovery::process_inotify_events;
use crate::converter::UploadContext;
use crate::converter::UploadOperationLog;
use crate::operations::FtsDownloadOperationType;
use crate::service_monitor::is_c8y_bridge_established;
use async_trait::async_trait;
use c8y_api::smartrest::smartrest_serializer::fail_operation;
use c8y_api::smartrest::smartrest_serializer::succeed_static_operation;
use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations;
use c8y_api::smartrest::smartrest_serializer::SmartRest;
use c8y_api::utils::bridge::is_c8y_bridge_established;
use c8y_auth_proxy::url::ProxyUrlGenerator;
use c8y_http_proxy::handle::C8YHttpProxy;
use c8y_http_proxy::messages::C8YRestRequest;
Expand All @@ -32,7 +32,6 @@ use tedge_actors::Sender;
use tedge_actors::Service;
use tedge_actors::SimpleMessageBox;
use tedge_actors::SimpleMessageBoxBuilder;
use tedge_api::main_device_health_topic;
use tedge_downloader_ext::DownloadRequest;
use tedge_downloader_ext::DownloadResult;
use tedge_file_system_ext::FsWatchEvent;
Expand Down Expand Up @@ -68,7 +67,6 @@ pub struct C8yMapperActor {
mqtt_publisher: LoggingSender<MqttMessage>,
timer_sender: LoggingSender<SyncStart>,
bridge_status_messages: SimpleMessageBox<MqttMessage, MqttMessage>,
c8y_bridge_service_name: String,
}

#[async_trait]
Expand All @@ -81,7 +79,7 @@ impl Actor for C8yMapperActor {
if !self.converter.config.bridge_in_mapper {
// Wait till the c8y bridge is established
while let Some(message) = self.bridge_status_messages.recv().await {
if is_c8y_bridge_established(&message, &self.c8y_bridge_service_name) {
if is_c8y_bridge_established(&message, &self.converter.config.bridge_health_topic) {
break;
}
}
Expand Down Expand Up @@ -127,15 +125,13 @@ impl C8yMapperActor {
mqtt_publisher: LoggingSender<MqttMessage>,
timer_sender: LoggingSender<SyncStart>,
bridge_status_messages: SimpleMessageBox<MqttMessage, MqttMessage>,
c8y_bridge_service_name: String,
) -> Self {
Self {
converter,
messages,
mqtt_publisher,
timer_sender,
bridge_status_messages,
c8y_bridge_service_name,
}
}

Expand Down Expand Up @@ -351,9 +347,9 @@ impl C8yMapperBuilder {

let bridge_monitor_builder: SimpleMessageBoxBuilder<MqttMessage, MqttMessage> =
SimpleMessageBoxBuilder::new("ServiceMonitor", 1);
let bridge_health_topic = main_device_health_topic(&config.bridge_service_name());

service_monitor.connect_sink(
bridge_health_topic.as_str().try_into().unwrap(),
config.bridge_health_topic.clone().into(),
&bridge_monitor_builder,
);

Expand Down Expand Up @@ -397,7 +393,6 @@ impl Builder<C8yMapperActor> for C8yMapperBuilder {
LoggingSender::new("C8yMapper => Uploader".into(), self.upload_sender);
let downloader_sender =
LoggingSender::new("C8yMapper => Downloader".into(), self.download_sender);
let c8y_bridge_service_name = self.config.bridge_service_name();

let converter = CumulocityConverter::new(
self.config,
Expand All @@ -418,7 +413,6 @@ impl Builder<C8yMapperActor> for C8yMapperBuilder {
mqtt_publisher,
timer_sender,
bridge_monitor_box,
c8y_bridge_service_name,
))
}
}
Loading

0 comments on commit 7530af5

Please sign in to comment.