Skip to content

Commit

Permalink
Health status parsing with serde
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh committed Jun 7, 2024
1 parent 7530af5 commit 4914cff
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 82 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

162 changes: 134 additions & 28 deletions crates/core/tedge_api/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,12 @@ use mqtt_channel::Topic;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use serde_json::Value as JsonValue;
use std::fmt::Display;
use std::process;
use std::sync::Arc;
use tedge_utils::timestamp::TimeFormat;

pub const MOSQUITTO_BRIDGE_PREFIX: &str = "mosquitto-";
pub const MOSQUITTO_BRIDGE_SUFFIX: &str = "-bridge";
pub const MOSQUITTO_BRIDGE_UP_PAYLOAD: &str = "1";
pub const MOSQUITTO_BRIDGE_DOWN_PAYLOAD: &str = "0";

pub const UP_STATUS: &str = "up";
pub const DOWN_STATUS: &str = "down";
pub const UNKNOWN_STATUS: &str = "unknown";

pub fn service_health_topic(
mqtt_schema: &MqttSchema,
device_topic_id: &EntityTopicId,
Expand Down Expand Up @@ -104,42 +97,78 @@ impl ServiceHealthTopic {
}
}

#[derive(Debug)]
pub struct HealthTopicError;

#[derive(Deserialize, Serialize, Debug, Default)]
pub struct HealthStatus {
#[serde(default = "default_status")]
pub status: String,
pub status: Status,
pub pid: Option<u32>,
pub time: Option<JsonValue>,
}

fn default_status() -> String {
"unknown".to_string()
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum Status {
Up,
Down,
#[serde(untagged)]
Other(String),
}

impl HealthStatus {
pub fn from_mosquitto_bridge_payload_str(payload: &str) -> Self {
let status = match payload {
MOSQUITTO_BRIDGE_UP_PAYLOAD => UP_STATUS,
MOSQUITTO_BRIDGE_DOWN_PAYLOAD => DOWN_STATUS,
_ => UNKNOWN_STATUS,
impl Default for Status {
fn default() -> Self {
Status::Other("unknown".to_string())
}
}

impl Display for Status {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let status = match self {
Status::Up => "up",
Status::Down => "down",
Status::Other(val) if val.is_empty() => "unknown",
Status::Other(val) => val,
};
HealthStatus {
status: status.into(),
write!(f, "{}", status)
}
}

#[derive(Debug)]
pub struct HealthTopicError;

impl HealthStatus {
pub fn try_from_health_status_message(
message: &MqttMessage,
mqtt_schema: &MqttSchema,
) -> Result<Self, HealthTopicError> {
if let Ok((topic_id, Channel::Health)) = mqtt_schema.entity_channel_of(&message.topic) {
let health_status = if entity_is_mosquitto_bridge_service(&topic_id) {
let status = match message.payload_str() {
Ok("1") => Status::Up,
Ok("0") => Status::Down,
_ => Status::default(),
};
HealthStatus {
status,
pid: None,
time: None,
}
} else {
serde_json::from_slice(message.payload()).unwrap_or_default()
};
Ok(health_status)
} else {
Err(HealthTopicError)
}
}

pub fn is_valid(&self) -> bool {
self.status == UP_STATUS || self.status == DOWN_STATUS
self.status == Status::Up || self.status == Status::Down
}
}

pub fn entity_is_mosquitto_bridge_service(entity_topic_id: &EntityTopicId) -> bool {
entity_topic_id
.default_service_name()
.filter(|name| {
name.starts_with(MOSQUITTO_BRIDGE_PREFIX) && name.ends_with(MOSQUITTO_BRIDGE_SUFFIX)
})
.filter(|name| name.starts_with("mosquitto-") && name.ends_with("-bridge"))
.is_some()
}

Expand All @@ -148,6 +177,83 @@ mod tests {
use super::*;
use assert_matches::assert_matches;
use serde_json::Value;
use test_case::test_case;

#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
r#"{"status":"up"}"#,
Status::Up;
"service-health-status-up"
)]
#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
r#"{"status":"down"}"#,
Status::Down;
"service-health-status-down"
)]
#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
r#"{"status":"foo"}"#,
Status::Other("foo".into());
"service-health-status-other-value"
)]
#[test_case(
"te/device/child/service/tedge-mapper-c8y/status/health",
r#"{"pid":1234,"status":"up"}"#,
Status::Up;
"service-health-status-with-extra-fields"
)]
#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
r#"{"pid":"123456"}"#,
Status::Other("unknown".into());
"service-health-status-no-value"
)]
#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
r#"{"status":""}"#,
Status::Other("".into());
"service-health-status-empty-value"
)]
#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
"{}",
Status::default();
"service-health-status-empty-message"
)]
#[test_case(
"te/device/main/service/mosquitto-xyz-bridge/status/health",
"1",
Status::Up;
"mosquitto-bridge-service-health-status-up"
)]
#[test_case(
"te/device/main/service/mosquitto-xyz-bridge/status/health",
"0",
Status::Down;
"mosquitto-bridge-service-health-status-down"
)]
#[test_case(
"te/device/main/service/mosquitto-xyz-bridge/status/health",
"invalid payload",
Status::default();
"mosquitto-bridge-service-health-status-invalid-payload"
)]
#[test_case(
"te/device/main/service/tedge-mapper-bridge-c8y/status/health",
r#"{"status":"up"}"#,
Status::Up;
"builtin-bridge-service-health-status-up"
)]
fn parse_heath_status(health_topic: &str, health_payload: &str, expected_status: Status) {
let mqtt_schema = MqttSchema::new();
let topic = Topic::new_unchecked(health_topic);
let health_message = MqttMessage::new(&topic, health_payload.as_bytes().to_owned());

let health_status =
HealthStatus::try_from_health_status_message(&health_message, &mqtt_schema);
assert_eq!(health_status.unwrap().status, expected_status);
}

#[test]
fn is_rfc3339_timestamp() {
Expand Down
35 changes: 16 additions & 19 deletions crates/core/tedge_watchdog/src/systemd_watchdog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ use futures::StreamExt;
use mqtt_channel::MqttMessage;
use mqtt_channel::PubChannel;
use mqtt_channel::Topic;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use std::path::PathBuf;
use std::process;
use std::process::Command;
Expand All @@ -23,6 +20,7 @@ use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::HealthStatus;
use tedge_config::TEdgeConfigLocation;
use tedge_utils::timestamp::IsoOrUnix;
use time::OffsetDateTime;
Expand All @@ -40,14 +38,6 @@ const SERVICE_NAME: &str = "tedge-watchdog";
/// a timing misalignment.
const NOTIFY_SEND_FREQ_RATIO: u64 = 4;

// TODO: extract to common module
#[derive(Debug, Serialize, Deserialize)]
pub struct HealthStatus {
status: String,
pid: u32,
time: Value,
}

pub async fn start_watchdog(tedge_config_dir: PathBuf) -> Result<(), anyhow::Error> {
// Send ready notification to systemd.
notify_systemd(process::id(), "--ready")?;
Expand Down Expand Up @@ -228,11 +218,14 @@ async fn monitor_tedge_service(
{
Ok(health_status) => {
let health_status = health_status?;
debug!(
"Sending notification for {} with pid: {}",
name, health_status.pid
);
notify_systemd(health_status.pid, "WATCHDOG=1")?;
if let Some(pid) = health_status.pid {
debug!("Sending notification for {} with pid: {}", name, pid);
notify_systemd(pid, "WATCHDOG=1")?;
} else {
error!(
"Ignoring invalid health status message from {name} without a `pid` field in it"
)
}
}
Err(_) => {
warn!("No health check response received from {name} in time");
Expand All @@ -255,7 +248,11 @@ async fn get_latest_health_status_message(
if let Ok(message) = message.payload_str() {
debug!("Health response received: {message}");
if let Ok(health_status) = serde_json::from_str::<HealthStatus>(message) {
let datetime = IsoOrUnix::try_from(&health_status.time)?;
if health_status.time.is_none() {
error!("Ignoring invalid health response: {health_status:?} without a `time` field in it");
continue;
}
let datetime = IsoOrUnix::try_from(&health_status.time.clone().unwrap())?;

// Compare to a slightly old timestamp to avoid false negatives from floating-point error in unix timestamps
if datetime.into_inner() >= request_timestamp - Duration::from_millis(10) {
Expand Down Expand Up @@ -339,7 +336,7 @@ mod tests {
get_latest_health_status_message(request_timestamp, &mut receiver).await;

let expected_timestamp = TimeFormat::Rfc3339.to_json(request_timestamp).unwrap();
assert_eq!(health_status.unwrap().time, expected_timestamp);
assert_eq!(health_status.unwrap().time, Some(expected_timestamp));

sender.close_channel();
let base_timestamp = base_timestamp + Duration::from_secs(5);
Expand Down Expand Up @@ -378,6 +375,6 @@ mod tests {

let health_status =
get_latest_health_status_message(request_timestamp, &mut receiver).await;
assert_eq!(health_status.unwrap().time, payload_timestamp);
assert_eq!(health_status.unwrap().time, Some(payload_timestamp));
}
}
6 changes: 5 additions & 1 deletion crates/extensions/c8y_mapper_ext/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ impl Actor for C8yMapperActor {
if !self.converter.config.bridge_in_mapper {
// Wait till the c8y bridge is established
while let Some(message) = self.bridge_status_messages.recv().await {
if is_c8y_bridge_established(&message, &self.converter.config.bridge_health_topic) {
if is_c8y_bridge_established(
&message,
&self.converter.config.mqtt_schema,
&self.converter.config.bridge_health_topic,
) {
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl C8yMapperConfig {
let state_dir = config_dir.join(STATE_DIR_NAME).into();

let bridge_service_name = if bridge_in_mapper {
format!("tedge-mapper-bridge-{}", c8y_prefix)
format!("tedge-mapper-bridge-{c8y_prefix}")
} else {
"mosquitto-c8y-bridge".into()
};
Expand Down
5 changes: 3 additions & 2 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ impl CumulocityConverter {

let ancestors_external_ids = self.entity_store.ancestors_external_ids(entity)?;
Ok(convert_health_status_message(
&self.config.mqtt_schema,
entity_metadata,
&ancestors_external_ids,
message,
Expand Down Expand Up @@ -2877,7 +2878,7 @@ pub(crate) mod tests {
let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await;

let in_topic = "te/device/child1/service/child-service-c8y/status/health";
let in_payload = r#"{"pid":"1234","status":"up","time":"2021-11-16T17:45:40.571760714+01:00","type":"thin-edge.io"}"#;
let in_payload = r#"{"pid":1234,"status":"up","time":"2021-11-16T17:45:40.571760714+01:00","type":"thin-edge.io"}"#;
let in_message = MqttMessage::new(&Topic::new_unchecked(in_topic), in_payload);

let mqtt_schema = MqttSchema::new();
Expand Down Expand Up @@ -2933,7 +2934,7 @@ pub(crate) mod tests {
let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await;

let in_topic = "te/device/main/service/test-tedge-mapper-c8y/status/health";
let in_payload = r#"{"pid":"1234","status":"up","time":"2021-11-16T17:45:40.571760714+01:00","type":"thin-edge.io"}"#;
let in_payload = r#"{"pid":1234,"status":"up","time":"2021-11-16T17:45:40.571760714+01:00","type":"thin-edge.io"}"#;
let in_message = MqttMessage::new(&Topic::new_unchecked(in_topic), in_payload);

let mqtt_schema = MqttSchema::new();
Expand Down
Loading

0 comments on commit 4914cff

Please sign in to comment.