Skip to content

Commit

Permalink
Remove old health topics from c8y mapper (#2353)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bravo555 authored Oct 20, 2023
1 parent 57ae569 commit f96086d
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 96 deletions.
4 changes: 3 additions & 1 deletion crates/core/c8y_api/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ pub mod bridge {

use mqtt_channel::Message;

pub const C8Y_BRIDGE_HEALTH_TOPIC: &str = "tedge/health/mosquitto-c8y-bridge";
// FIXME: doesn't account for custom topic root, use MQTT scheme API here
pub const C8Y_BRIDGE_HEALTH_TOPIC: &str =
"te/device/main/service/mosquitto-c8y-bridge/status/health";
const C8Y_BRIDGE_UP_PAYLOAD: &str = "1";

pub fn is_c8y_bridge_up(message: &Message) -> bool {
Expand Down
8 changes: 6 additions & 2 deletions crates/core/tedge/src/cli/connect/bridge_config_c8y.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use tedge_config::HostPort;
use tedge_config::TemplatesSet;
use tedge_config::MQTT_TLS_PORT;

const C8Y_BRIDGE_HEALTH_TOPIC: &str = "te/device/main/service/mosquitto-c8y-bridge/status/health";

#[derive(Debug, Eq, PartialEq)]
pub struct BridgeConfigC8yParams {
pub mqtt_host: HostPort<MQTT_TLS_PORT>,
Expand Down Expand Up @@ -95,7 +97,9 @@ impl From<BridgeConfigC8yParams> for BridgeConfig {
local_clean_session: false,
notifications: true,
notifications_local_only: true,
notification_topic: "tedge/health/mosquitto-c8y-bridge".into(),

// FIXME: doesn't account for custom topic root, use MQTT scheme API here
notification_topic: C8Y_BRIDGE_HEALTH_TOPIC.into(),
bridge_attempt_unsubscribe: false,
topics,
}
Expand Down Expand Up @@ -171,7 +175,7 @@ fn test_bridge_config_from_c8y_params() -> anyhow::Result<()> {
local_clean_session: false,
notifications: true,
notifications_local_only: true,
notification_topic: "tedge/health/mosquitto-c8y-bridge".into(),
notification_topic: C8Y_BRIDGE_HEALTH_TOPIC.into(),
bridge_attempt_unsubscribe: false,
};

Expand Down
2 changes: 0 additions & 2 deletions crates/core/tedge_agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,6 @@ pub fn create_tedge_to_te_converter(
"tedge/events/+/+",
"tedge/alarms/+/+",
"tedge/alarms/+/+/+",
"tedge/health/+",
"tedge/health/+/+",
]
.try_into()?;

Expand Down
80 changes: 10 additions & 70 deletions crates/core/tedge_api/src/health.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use clock::Clock;
use clock::WallClock;
use std::process;
use std::sync::Arc;

use crate::mqtt_topics::Channel;
use crate::mqtt_topics::MqttSchema;
use crate::mqtt_topics::ServiceTopicId;
use clock::Clock;
use clock::WallClock;
use mqtt_channel::Message;
use mqtt_channel::PubChannel;
use mqtt_channel::Topic;
use mqtt_channel::TopicFilter;
use serde_json::json;
use std::process;
use std::sync::Arc;

/// Encodes a valid health topic.
///
Expand All @@ -30,20 +28,6 @@ impl ServiceHealthTopic {
Self(health_topic.name.into())
}

/// Create a new `ServiceHealthTopic` from a topic in an old topic scheme.
///
/// The argument has to fit old topic scheme, i.e. contain either "tedge/health/SERVICE_NAME" or
/// "tedge/health/CHILD_ID/SERVICE_NAME"
pub fn from_old_topic(topic: String) -> Result<Self, HealthTopicError> {
match topic.split('/').collect::<Vec<&str>>()[..] {
["tedge", "health", _service_name] => {}
["tedge", "health", _child_id, _service_name] => {}
_ => return Err(HealthTopicError),
}

Ok(Self(Arc::from(topic)))
}

pub fn as_str(&self) -> &str {
&self.0
}
Expand Down Expand Up @@ -105,54 +89,6 @@ pub fn health_check_topics(daemon_name: &str) -> TopicFilter {
.expect("Invalid topic filter")
}

pub async fn send_health_status(responses: &mut impl PubChannel, daemon_name: &str) {
let health_message = health_status_up_message(daemon_name);
let _ = responses.send(health_message).await;
}

pub fn health_status_down_message(daemon_name: &str) -> Message {
Message {
topic: Topic::new_unchecked(&format!("tedge/health/{daemon_name}")),
payload: json!({
"status": "down",
"pid": process::id()})
.to_string()
.into(),
qos: mqtt_channel::QoS::AtLeastOnce,
retain: true,
}
}

pub fn health_status_up_message(daemon_name: &str) -> Message {
let clock = Box::new(WallClock);
let timestamp = clock
.now()
.format(&time::format_description::well_known::Rfc3339);
match timestamp {
Ok(time_stamp) => {
let health_status = json!({
"status": "up",
"pid": process::id(),
"time": time_stamp,
})
.to_string();
let response_topic_health =
Topic::new_unchecked(format!("tedge/health/{daemon_name}").as_str());

Message::new(&response_topic_health, health_status)
.with_qos(mqtt_channel::QoS::AtLeastOnce)
.with_retain()
}
Err(e) => {
let error_topic = Topic::new_unchecked("tedge/errors");
let error_msg = format!(
"Health message: Failed to convert timestamp to Rfc3339 format due to: {e}"
);
Message::new(&error_topic, error_msg).with_qos(mqtt_channel::QoS::AtLeastOnce)
}
}
}

pub fn is_bridge_health(topic: &str) -> bool {
if topic.starts_with("tedge/health") {
let substrings: Vec<String> = topic.split('/').map(String::from).collect();
Expand All @@ -169,12 +105,16 @@ pub fn is_bridge_health(topic: &str) -> bool {

#[cfg(test)]
mod tests {
use super::*;
use serde_json::Value;

use super::health_status_up_message;
#[test]
fn is_rfc3339_timestamp() {
let msg = health_status_up_message("test_daemon");
let health_topic = ServiceHealthTopic(Arc::from(
"te/device/main/service/test_daemon/status/health",
));
let msg = health_topic.up_message();

let health_msg_str = msg.payload_str().unwrap();
let deserialized_value: Value =
serde_json::from_str(health_msg_str).expect("Failed to parse JSON");
Expand Down
3 changes: 3 additions & 0 deletions crates/core/tedge_watchdog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,7 @@ pub enum WatchdogError {

#[error(transparent)]
ParseError(#[from] Parse),

#[error(transparent)]
CustomError(#[from] anyhow::Error),
}
50 changes: 39 additions & 11 deletions crates/core/tedge_watchdog/src/systemd_watchdog.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
use crate::error::WatchdogError;
use anyhow::Context;
use freedesktop_entry_parser::parse_entry;
use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
use futures::SinkExt;
use futures::StreamExt;
use mqtt_channel::Message;
use mqtt_channel::PubChannel;
use mqtt_channel::Topic;
use nanoid::nanoid;
use serde::Deserialize;
use serde::Serialize;
use std::path::PathBuf;
use std::process;
use std::process::Command;
use std::process::ExitStatus;
use std::process::Stdio;
use std::process::{self};
use std::time::Instant;
use tedge_api::health::health_status_down_message;
use tedge_api::health::health_status_up_message;
use tedge_api::health::send_health_status;
use tedge_api::health::ServiceHealthTopic;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
Expand All @@ -29,6 +28,8 @@ use tracing::error;
use tracing::info;
use tracing::warn;

const SERVICE_NAME: &str = "tedge-watchdog";

// TODO: extract to common module
#[derive(Debug, Serialize, Deserialize)]
pub struct HealthStatus {
Expand Down Expand Up @@ -149,23 +150,49 @@ async fn monitor_tedge_service(
res_topic: Topic,
interval: u64,
) -> Result<(), WatchdogError> {
let client_id: &str = &format!("{}_{}", name, nanoid!());
let config_repository = tedge_config::TEdgeConfigRepository::new(tedge_config_location);
let tedge_config = config_repository.load()?;

let mqtt_device_topic_id: EntityTopicId = tedge_config
.mqtt
.device_topic_id
.parse()
.context("Can't parse as device topic id")?;

let mqtt_topic_root = &tedge_config.mqtt.topic_root;

let mqtt_session_name = format!("{SERVICE_NAME}#{mqtt_topic_root}/{mqtt_device_topic_id}");

let mqtt_schema = MqttSchema::with_root(mqtt_topic_root.clone());

let service_topic_id = mqtt_device_topic_id
.default_service_for_device(SERVICE_NAME)
.unwrap();
let service_health_topic =
ServiceHealthTopic::from_new_topic(&service_topic_id.into(), &mqtt_schema);

let _service_health_topic = service_health_topic.clone();

let mqtt_config = tedge_config
.mqtt_config()?
.with_session_name(client_id)
.with_session_name(mqtt_session_name)
.with_subscriptions(res_topic.into())
.with_initial_message(|| health_status_up_message("tedge-watchdog"))
.with_last_will_message(health_status_down_message("tedge-watchdog"));
.with_initial_message(move || _service_health_topic.clone().up_message())
.with_last_will_message(service_health_topic.down_message());

let client = mqtt_channel::Connection::new(&mqtt_config).await?;

let mut received = client.received;
let mut publisher = client.published;

info!("Starting watchdog for {} service", name);

// Now the systemd watchdog is done with the initialization and ready for processing the messages
send_health_status(&mut publisher, "tedge-watchdog").await;
let health_status_message = service_health_topic.up_message();
publisher
.send(health_status_message)
.await
.context("Could not send initial health status message")?;

loop {
let message = Message::new(&Topic::new(req_topic)?, "");
Expand Down Expand Up @@ -288,7 +315,8 @@ mod tests {
#[tokio::test]
async fn test_get_latest_health_status_message() -> Result<()> {
let (mut sender, mut receiver) = mpsc::unbounded::<Message>();
let health_topic = Topic::new("tedge/health/test-service").expect("Valid topic");
let health_topic =
Topic::new("te/device/main/service/test-service/status/health").expect("Valid topic");
let base_timestamp = OffsetDateTime::now_utc();

for x in 1..5u64 {
Expand Down
3 changes: 2 additions & 1 deletion crates/extensions/c8y_log_manager/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,8 @@ mod tests {
let (mut mqtt, _http, mut fs) = spawn_log_manager_actor(tempdir.path());

let c8y_s_us = Topic::new_unchecked("c8y/s/us");
let bridge = Topic::new_unchecked("tedge/health/mosquitto-c8y-bridge");
let bridge =
Topic::new_unchecked("te/device/main/service/mosquitto-c8y-bridge/status/health");

assert_eq!(
mqtt.recv().await,
Expand Down
8 changes: 4 additions & 4 deletions tests/RobotFramework/libraries/ThinEdgeIO/ThinEdgeIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,10 +542,10 @@ def mqtt_should_have_topic(
*Examples:*
| ${listen}= | `Should Have MQTT Message` | topic=tedge/${CHILD_SN}/commands/req/config_snapshot | date_from=-5s |
| ${messages}= | `Should Have MQTT Message` | tedge/health/tedge-log-plugin | minimum=1 | minimum=2 |
| ${messages}= | `Should Have MQTT Message` | tedge/health/tedge-log-plugin | minimum=1 | minimum=2 | message_contains="time" |
| ${messages}= | `Should Have MQTT Message` | tedge/health/tedge-log-plugin | minimum=1 | minimum=2 | message_pattern="value":\s*\d+ |
| ${listen}= | `Should Have MQTT Message` | topic=tedge/${CHILD_SN}/commands/req/config_snapshot | date_from=-5s |
| ${messages}= | `Should Have MQTT Message` | te/device/main/service/tedge-log-plugin/status/health | minimum=1 | minimum=2 |
| ${messages}= | `Should Have MQTT Message` | te/device/main/service/tedge-log-plugin/status/health | minimum=1 | minimum=2 | message_contains="time" |
| ${messages}= | `Should Have MQTT Message` | te/device/main/service/tedge-log-plugin/status/health | minimum=1 | minimum=2 | message_pattern="value":\s*\d+ |
"""
result = self._assert_mqtt_topic_messages(
topic,
Expand Down
10 changes: 5 additions & 5 deletions tests/RobotFramework/tests/cumulocity/jwt/jwt_request.robot
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
*** Settings ***
Resource ../../../resources/common.resource
Library Cumulocity
Library ThinEdgeIO
Library ThinEdgeIO

Test Tags theme:c8y theme:tokens
Test Setup Custom Setup
Test Teardown Get Logs

*** Test Cases ***
Retrieve a JWT tokens
${start_time}= Get Unix Timestamp
Execute Command tedge mqtt pub c8y/s/uat ''
${start_time}= Get Unix Timestamp
Execute Command tedge mqtt pub c8y/s/uat ''
${messages}= Should Have MQTT Messages c8y/s/dat maximum=1 date_from=${start_time}
Should Contain ${messages[0]} 71

Expand All @@ -23,5 +23,5 @@ Custom Setup
Stop Service tedge-agent
Stop Service c8y-configuration-plugin
Stop Service tedge-log-plugin
Should Have MQTT Messages tedge/health/mosquitto-c8y-bridge
Sleep 1s wait just in case that the server responds to already sent messages
Should Have MQTT Messages te/device/main/service/mosquitto-c8y-bridge/status/health
Sleep 1s wait just in case that the server responds to already sent messages

2 comments on commit f96086d

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
320 1 3 321 99.69 1h6m4.089s

Failed Tests

Name Message ⏱️ Duration Suite
Publish measurements varying period benchmark.py run --count 1000 ... returned an unexpected exit code stdout: { "ok": false, "iterations": 5, "passed": 3, "failed": 2, "results": [ { "worker": 0, "messages": 1000, "dropped_percent": 0.0, "dropped_messages": 0, "total": 0.12067, "total_non_idle": 0.12067, "idle": 0.0, "parameters": { "count": 1000, "beats": 100, "beats_delay": 0, [ Message content over the limit has been removed. ] 2023-10-20 15:47:06,892 - root - INFO - Starting benchmark: count=1000, beats=100, beats_delay=0ms, period=75ms 2023-10-20 15:47:06,974 - root - INFO - Subscribing to cloud topic 2023-10-20 15:47:07,111 - root - WARNING - Burst time is exceeding the period. Skipping delay. diff=-0.051 2023-10-20 15:47:08,092 - root - INFO - Waiting for last message to be published 2023-10-20 15:47:10,139 - root - INFO - Stopping mqtt client 2023-10-20 15:47:10,206 - root - WARNING - Detected dropped messages 2023-10-20 15:47:11,211 - root - INFO - Starting benchmark: count=1000, beats=100, beats_delay=0ms, period=100ms 2023-10-20 15:47:11,281 - root - INFO - Subscribing to cloud topic 2023-10-20 15:47:12,473 - root - WARNING - Burst time is exceeding the period. Skipping delay. diff=-0.025 2023-10-20 15:47:12,473 - root - INFO - Waiting for last message to be published 2023-10-20 15:47:14,556 - root - INFO - Stopping mqtt client 2023-10-20 15:47:14,607 - root - WARNING - Detected dropped messages 2023-10-20 15:47:15,610 - root - INFO - Finished benchmark Benchmark failed 54.436 s Benchmarks

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
321 0 3 321 100 51m7.169s

Please sign in to comment.