Skip to content

Commit

Permalink
Merge pull request #2386 from PradeepKiruvale/te_error
Browse files Browse the repository at this point in the history
Use mqtt v1 topic to publish thin-edge.io errors to, e.g. te/errors
  • Loading branch information
reubenmiller authored Oct 31, 2023
2 parents 180681b + 0f0870e commit e11b64f
Show file tree
Hide file tree
Showing 18 changed files with 116 additions and 95 deletions.
45 changes: 21 additions & 24 deletions crates/core/tedge_api/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::mqtt_topics::MqttSchema;
use crate::mqtt_topics::ServiceTopicId;
use clock::Clock;
use clock::WallClock;
use log::error;
use mqtt_channel::Message;
use mqtt_channel::Topic;
use serde_json::json;
Expand Down Expand Up @@ -45,32 +46,28 @@ impl ServiceHealthTopic {
}

pub fn up_message(&self) -> Message {
let timestamp = WallClock
.now()
.format(&time::format_description::well_known::Rfc3339);
match timestamp {
Ok(timestamp) => {
let health_status = json!({
"status": "up",
"pid": process::id(),
"time": timestamp
})
.to_string();
let now = WallClock.now();
let timestamp = now
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_else(|err| {
error!(
"Health message: Failed to convert timestamp to Rfc3339 format due to: {err}"
);
format!("{}", now)
});

let response_topic_health = Topic::new_unchecked(self.as_str());
let health_status = json!({
"status": "up",
"pid": process::id(),
"time": timestamp
})
.to_string();

Message::new(&response_topic_health, health_status)
.with_qos(mqtt_channel::QoS::AtLeastOnce)
.with_retain()
}
Err(e) => {
let error_topic = Topic::new_unchecked("tedge/errors");
let error_msg = format!(
"Health message: Failed to convert timestamp to Rfc3339 format due to: {e}"
);
Message::new(&error_topic, error_msg).with_qos(mqtt_channel::QoS::AtLeastOnce)
}
}
let response_topic_health = Topic::new_unchecked(self.as_str());

Message::new(&response_topic_health, health_status)
.with_qos(mqtt_channel::QoS::AtLeastOnce)
.with_retain()
}
}

Expand Down
10 changes: 10 additions & 0 deletions crates/core/tedge_api/src/mqtt_topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ impl MqttSchema {
pub fn capability_topic_for(&self, target: &EntityTopicId, operation: OperationType) -> Topic {
self.topic_for(target, &Channel::CommandMetadata { operation })
}

/// Build a new error topic using the given schema for the root prefix.
/// ```
/// use mqtt_channel::Topic;
/// let te = tedge_api::mqtt_topics::MqttSchema::with_root("thin-edge".to_string());
/// assert_eq!(te.error_topic(), Topic::new_unchecked("thin-edge/errors"));
/// ```
pub fn error_topic(&self) -> Topic {
Topic::new_unchecked(&format!("{0}/errors", self.root))
}
}

impl MqttSchema {
Expand Down
5 changes: 4 additions & 1 deletion crates/core/tedge_mapper/src/aws/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tedge_actors::ConvertingActor;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_config::TEdgeConfig;
use tracing::warn;

Expand All @@ -30,7 +31,9 @@ impl TEdgeComponent for AwsMapper {
let (mut runtime, mut mqtt_actor) =
start_basic_actors(self.session_name(), &tedge_config).await?;
let clock = Box::new(WallClock);
let aws_converter = AwsConverter::new(tedge_config.aws.mapper.timestamp, clock);
let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone());
let aws_converter =
AwsConverter::new(tedge_config.aws.mapper.timestamp, clock, mqtt_schema);
let mut aws_converting_actor = ConvertingActor::builder(
"AwsConverter",
aws_converter,
Expand Down
10 changes: 7 additions & 3 deletions crates/core/tedge_mapper/src/az/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tedge_actors::ConvertingActor;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_config::TEdgeConfig;
use tracing::warn;

Expand All @@ -29,9 +30,12 @@ impl TEdgeComponent for AzureMapper {
) -> Result<(), anyhow::Error> {
let (mut runtime, mut mqtt_actor) =
start_basic_actors(self.session_name(), &tedge_config).await?;

let az_converter =
AzureConverter::new(tedge_config.az.mapper.timestamp, Box::new(WallClock));
let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone());
let az_converter = AzureConverter::new(
tedge_config.az.mapper.timestamp,
Box::new(WallClock),
mqtt_schema,
);
let mut az_converting_actor =
ConvertingActor::builder("AzConverter", az_converter, get_topic_filter(&tedge_config));
az_converting_actor.add_input(&mut mqtt_actor);
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_watchdog/src/systemd_watchdog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async fn monitor_tedge_service(
.mqtt_config()?
.with_session_name(mqtt_session_name)
.with_subscriptions(res_topic.into())
.with_initial_message(move || _service_health_topic.clone().up_message())
.with_initial_message(move || _service_health_topic.up_message())
.with_last_will_message(service_health_topic.down_message());

let client = mqtt_channel::Connection::new(&mqtt_config).await?;
Expand Down
54 changes: 27 additions & 27 deletions crates/extensions/aws_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ pub struct AwsConverter {
}

impl AwsConverter {
pub fn new(add_timestamp: bool, clock: Box<dyn Clock>) -> Self {
pub fn new(add_timestamp: bool, clock: Box<dyn Clock>, mqtt_schema: MqttSchema) -> Self {
let size_threshold = SizeThreshold(AWS_MQTT_THRESHOLD);
AwsConverter {
add_timestamp,
clock,
size_threshold,
mqtt_schema: MqttSchema::default(),
mqtt_schema: mqtt_schema.clone(),
}
}

Expand Down Expand Up @@ -153,7 +153,7 @@ impl AwsConverter {

fn new_error_message(&self, error: ConversionError) -> MqttMessage {
error!("Mapping error: {}", error);
MqttMessage::new(&Topic::new_unchecked("tedge/errors"), error.to_string())
MqttMessage::new(&self.mqtt_schema.error_topic(), error.to_string())
}
}

Expand Down Expand Up @@ -210,13 +210,13 @@ mod tests {

#[test]
fn convert_error() {
let mut converter = AwsConverter::new(true, Box::new(TestClock));
let mut converter = AwsConverter::new(true, Box::new(TestClock), MqttSchema::default());

let input = "Invalid JSON";

let output = converter.convert(&new_tedge_message(input)).unwrap();

assert_eq!(output.first().unwrap().topic.name, "tedge/errors");
assert_eq!(output.first().unwrap().topic.name, "te/errors");
assert_eq!(
extract_first_message_payload(output),
"expected value at line 1 column 1"
Expand All @@ -225,7 +225,7 @@ mod tests {

#[test]
fn try_convert_invalid_json_returns_error() {
let mut converter = AwsConverter::new(false, Box::new(TestClock));
let mut converter = AwsConverter::new(false, Box::new(TestClock), MqttSchema::default());

let input = "This is not Thin Edge JSON";
let result = converter.try_convert(&new_tedge_message(input));
Expand All @@ -234,8 +234,8 @@ mod tests {

#[test]
fn try_convert_exceeding_threshold_returns_error() {
let mut converter =
AwsConverter::new(false, Box::new(TestClock)).with_threshold(SizeThreshold(1));
let mut converter = AwsConverter::new(false, Box::new(TestClock), MqttSchema::default())
.with_threshold(SizeThreshold(1));

let _topic = "te/device/main///m/".to_string();
let input = r#"{"temperature": 21.3}"#;
Expand All @@ -255,7 +255,7 @@ mod tests {
#[test]
fn converting_input_without_timestamp_produces_output_without_timestamp_given_add_timestamp_is_false(
) {
let mut converter = AwsConverter::new(false, Box::new(TestClock));
let mut converter = AwsConverter::new(false, Box::new(TestClock), MqttSchema::default());

let input = r#"{
"temperature": 23.0
Expand All @@ -277,7 +277,7 @@ mod tests {
#[test]
fn converting_input_with_timestamp_produces_output_with_timestamp_given_add_timestamp_is_false()
{
let mut converter = AwsConverter::new(false, Box::new(TestClock));
let mut converter = AwsConverter::new(false, Box::new(TestClock), MqttSchema::default());

let input = r#"{
"time" : "2013-06-22T17:03:14.000+02:00",
Expand All @@ -301,7 +301,7 @@ mod tests {
#[test]
fn converting_input_with_timestamp_produces_output_with_timestamp_given_add_timestamp_is_true()
{
let mut converter = AwsConverter::new(true, Box::new(TestClock));
let mut converter = AwsConverter::new(true, Box::new(TestClock), MqttSchema::default());

let input = r#"{
"time" : "2013-06-22T17:03:14.000+02:00",
Expand All @@ -325,7 +325,7 @@ mod tests {
#[test]
fn converting_input_without_timestamp_produces_output_with_timestamp_given_add_timestamp_is_true(
) {
let mut converter = AwsConverter::new(true, Box::new(TestClock));
let mut converter = AwsConverter::new(true, Box::new(TestClock), MqttSchema::default());

let input = r#"{
"temperature": 23.0
Expand All @@ -348,7 +348,7 @@ mod tests {

#[test]
fn converting_input_with_measurement_type() {
let mut converter = AwsConverter::new(true, Box::new(TestClock));
let mut converter = AwsConverter::new(true, Box::new(TestClock), MqttSchema::default());

let input = r#"{
"temperature": 23.0
Expand All @@ -371,7 +371,7 @@ mod tests {

#[test]
fn converting_input_for_child_device_with_measurement_type() {
let mut converter = AwsConverter::new(true, Box::new(TestClock));
let mut converter = AwsConverter::new(true, Box::new(TestClock), MqttSchema::default());

let input = r#"{
"temperature": 23.0
Expand All @@ -397,7 +397,7 @@ mod tests {

#[test]
fn converting_input_for_main_device_service_with_measurement_type() {
let mut converter = AwsConverter::new(true, Box::new(TestClock));
let mut converter = AwsConverter::new(true, Box::new(TestClock), MqttSchema::default());

let input = r#"{
"temperature": 23.0
Expand Down Expand Up @@ -426,7 +426,7 @@ mod tests {

#[test]
fn converting_input_for_child_device_service_with_measurement_type() {
let mut converter = AwsConverter::new(true, Box::new(TestClock));
let mut converter = AwsConverter::new(true, Box::new(TestClock), MqttSchema::default());

let input = r#"{
"temperature": 23.0
Expand Down Expand Up @@ -455,7 +455,7 @@ mod tests {

#[test]
fn converting_bridge_health_status() {
let mut converter = AwsConverter::new(false, Box::new(TestClock));
let mut converter = AwsConverter::new(false, Box::new(TestClock), MqttSchema::default());

let input = "0";
let result = converter.try_convert(&MqttMessage::new(
Expand All @@ -468,7 +468,7 @@ mod tests {

#[test]
fn converting_event_for_main_device() {
let mut converter = AwsConverter::new(true, Box::new(TestClock));
let mut converter = AwsConverter::new(true, Box::new(TestClock), MqttSchema::default());

let input = r#"{
"text": "I raised it",
Expand Down Expand Up @@ -496,7 +496,7 @@ mod tests {

#[test]
fn converting_event_for_child_device() {
let mut converter = AwsConverter::new(true, Box::new(TestClock));
let mut converter = AwsConverter::new(true, Box::new(TestClock), MqttSchema::default());

let input = r#"{
"text": "I raised it",
Expand Down Expand Up @@ -524,7 +524,7 @@ mod tests {

#[test]
fn converting_event_for_main_device_service() {
let mut converter = AwsConverter::new(true, Box::new(TestClock));
let mut converter = AwsConverter::new(true, Box::new(TestClock), MqttSchema::default());

let input = r#"{
"text": "I raised it",
Expand Down Expand Up @@ -555,7 +555,7 @@ mod tests {

#[test]
fn converting_event_for_child_device_service() {
let mut converter = AwsConverter::new(true, Box::new(TestClock));
let mut converter = AwsConverter::new(true, Box::new(TestClock), MqttSchema::default());

let input = r#"{
"text": "I raised it",
Expand Down Expand Up @@ -586,7 +586,7 @@ mod tests {

#[test]
fn converting_alarm_for_main_device() {
let mut converter = AwsConverter::new(true, Box::new(TestClock));
let mut converter = AwsConverter::new(true, Box::new(TestClock), MqttSchema::default());

let input = r#"{
"text":"I raised it",
Expand Down Expand Up @@ -616,7 +616,7 @@ mod tests {

#[test]
fn converting_alarm_for_main_service() {
let mut converter = AwsConverter::new(true, Box::new(TestClock));
let mut converter = AwsConverter::new(true, Box::new(TestClock), MqttSchema::default());

let input = r#"{
"text":"I raised it",
Expand Down Expand Up @@ -649,7 +649,7 @@ mod tests {

#[test]
fn converting_alarm_for_child_device() {
let mut converter = AwsConverter::new(true, Box::new(TestClock));
let mut converter = AwsConverter::new(true, Box::new(TestClock), MqttSchema::default());

let input = r#"{
"text":"I raised it",
Expand Down Expand Up @@ -679,7 +679,7 @@ mod tests {

#[test]
fn converting_alarm_for_child_service() {
let mut converter = AwsConverter::new(true, Box::new(TestClock));
let mut converter = AwsConverter::new(true, Box::new(TestClock), MqttSchema::default());

let input = r#"{
"text":"I raised it",
Expand Down Expand Up @@ -712,7 +712,7 @@ mod tests {

#[test]
fn converting_service_health_status_up_message() {
let mut converter = AwsConverter::new(false, Box::new(TestClock));
let mut converter = AwsConverter::new(false, Box::new(TestClock), MqttSchema::default());

let input = r#"{"pid":1234,"status":"up"}"#;
let result = converter.try_convert(&MqttMessage::new(
Expand All @@ -730,7 +730,7 @@ mod tests {

#[test]
fn converting_service_health_status_down_message() {
let mut converter = AwsConverter::new(false, Box::new(TestClock));
let mut converter = AwsConverter::new(false, Box::new(TestClock), MqttSchema::default());

let input = r#"{"pid":1234,"status":"up"}"#;
let result = converter.try_convert(&MqttMessage::new(
Expand Down
Loading

1 comment on commit e11b64f

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
349 0 3 349 100 59m7.226999999s

Please sign in to comment.