Skip to content

Commit

Permalink
Fix service status reporting when the mapper goes down
Browse files Browse the repository at this point in the history
Signed-off-by: James Rhodes <[email protected]>
  • Loading branch information
jarhodes314 committed May 17, 2024
1 parent 746ced9 commit fc33478
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 58 deletions.
32 changes: 22 additions & 10 deletions crates/core/c8y_api/src/smartrest/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,21 @@ pub fn service_creation_message(
ancestors: &[String],
prefix: &TopicPrefix,
) -> Result<MqttMessage, InvalidValueError> {
Ok(MqttMessage::new(
&publish_topic_from_ancestors(ancestors, prefix),
service_creation_message_payload(service_id, service_name, service_type, service_status)?,
))
}

/// Create a SmartREST message for creating a service on device.
/// The provided ancestors list must contain all the parents of the given service
/// starting from its immediate parent device.
pub fn service_creation_message_payload(
service_id: &str,
service_name: &str,
service_type: &str,
service_status: &str,
) -> Result<String, InvalidValueError> {
// TODO: most of this noise can be eliminated by implementing `Serialize`/`Deserialize` for smartrest format
if service_id.is_empty() {
return Err(InvalidValueError {
Expand All @@ -94,16 +109,13 @@ pub fn service_creation_message(
});
}

Ok(MqttMessage::new(
&publish_topic_from_ancestors(ancestors, prefix),
fields_to_csv_string(&[
"102",
service_id,
service_type,
service_name,
service_status,
]),
))
Ok(fields_to_csv_string(&[
"102",
service_id,
service_type,
service_name,
service_status,
]))
}

/// Create a SmartREST message for updating service status.
Expand Down
113 changes: 65 additions & 48 deletions crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ use tedge_config::TEdgeConfig;
use tedge_downloader_ext::DownloaderActor;
use tedge_file_system_ext::FsWatchActorBuilder;
use tedge_http_ext::HttpActor;
use tedge_mqtt_bridge::rumqttc::LastWill;
use tedge_mqtt_bridge::use_key_and_cert;
use tedge_mqtt_bridge::BridgeConfig;
use tedge_mqtt_bridge::MqttBridgeActorBuilder;
use tedge_mqtt_bridge::QoS;
use tedge_mqtt_ext::MqttActorBuilder;
use tedge_timer_ext::TimerActor;
use tedge_uploader_ext::UploaderActor;
Expand Down Expand Up @@ -89,14 +91,68 @@ impl TEdgeComponent for CumulocityMapper {
&tedge_config,
)?;

let bridge_actor = MqttBridgeActorBuilder::new(
&tedge_config,
c8y_mapper_config.bridge_service_name(),
tc,
cloud_config,
)
.await;
runtime.spawn(bridge_actor).await?;
let main_device_xid: EntityExternalId =
tedge_config.device.id.try_read(&tedge_config)?.into();
let service_type = &tedge_config.service.ty;
let service_type = if service_type.is_empty() {
"service".to_string()
} else {
service_type.to_string()
};

// FIXME: this will not work if `mqtt.device_topic_id` is not in default scheme

// there is one mapper instance per cloud per thin-edge instance, perhaps we should use some
// predefined topic id instead of trying to derive it from current device?
let entity_topic_id: EntityTopicId = tedge_config
.mqtt
.device_topic_id
.clone()
.parse()
.context("Invalid device_topic_id")?;

let mapper_service_topic_id = entity_topic_id
.default_service_for_device(CUMULOCITY_MAPPER_NAME)
.context("Can't derive service name if device topic id not in default scheme")?;

let mapper_service_external_id = CumulocityConverter::map_to_c8y_external_id(
&mapper_service_topic_id,
&main_device_xid,
);

let last_will_message_mapper =
c8y_api::smartrest::inventory::service_creation_message_payload(
mapper_service_external_id.as_ref(),
CUMULOCITY_MAPPER_NAME,
service_type.as_str(),
"down",
)?;
let last_will_message_bridge =
c8y_api::smartrest::inventory::service_creation_message_payload(
mapper_service_external_id.as_ref(),
&c8y_mapper_config.bridge_service_name(),
service_type.as_str(),
"down",
)?;

cloud_config.set_last_will(LastWill {
topic: "s/us".into(),
qos: QoS::AtLeastOnce,
message: format!("{last_will_message_bridge}\n{last_will_message_mapper}").into(),
retain: false,
});

runtime
.spawn(
MqttBridgeActorBuilder::new(
&tedge_config,
c8y_mapper_config.bridge_service_name(),
tc,
cloud_config,
)
.await,
)
.await?;
}
let mut jwt_actor = C8YJwtRetriever::builder(
mqtt_config.clone(),
Expand Down Expand Up @@ -157,44 +213,5 @@ impl TEdgeComponent for CumulocityMapper {
}

pub fn service_monitor_client_config(tedge_config: &TEdgeConfig) -> Result<Config, anyhow::Error> {
let main_device_xid: EntityExternalId = tedge_config.device.id.try_read(tedge_config)?.into();
let service_type = &tedge_config.service.ty;
let service_type = if service_type.is_empty() {
"service".to_string()
} else {
service_type.to_string()
};

// FIXME: this will not work if `mqtt.device_topic_id` is not in default scheme

// there is one mapper instance per cloud per thin-edge instance, perhaps we should use some
// predefined topic id instead of trying to derive it from current device?
let entity_topic_id: EntityTopicId = tedge_config
.mqtt
.device_topic_id
.clone()
.parse()
.context("Invalid device_topic_id")?;

let mapper_service_topic_id = entity_topic_id
.default_service_for_device(CUMULOCITY_MAPPER_NAME)
.context("Can't derive service name if device topic id not in default scheme")?;

let mapper_service_external_id =
CumulocityConverter::map_to_c8y_external_id(&mapper_service_topic_id, &main_device_xid);

let last_will_message = c8y_api::smartrest::inventory::service_creation_message(
mapper_service_external_id.as_ref(),
CUMULOCITY_MAPPER_NAME,
service_type.as_str(),
"down",
&[],
&tedge_config.c8y.bridge.topic_prefix,
)?;

let mqtt_config = tedge_config
.mqtt_config()?
.with_session_name("last_will_c8y_mapper")
.with_last_will_message(last_will_message);
Ok(mqtt_config)
Ok(tedge_config.mqtt_config()?)
}
1 change: 1 addition & 0 deletions crates/extensions/tedge_mqtt_bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use async_trait::async_trait;
use certificate::parse_root_certificate::create_tls_config;
use futures::SinkExt;
use futures::StreamExt;
pub use rumqttc;
use rumqttc::AsyncClient;
use rumqttc::ClientError;
use rumqttc::Event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Bridge stops if mapper stops running
${measurements}= Device Should Have Measurements minimum=1 maximum=1 type=CustomMeasurement series=temperature
Log ${measurements}
Execute Command systemctl stop tedge-mapper-c8y
Service Health Status Should Be Down tedge-mapper-bridge-custom-c8y-prefix
Execute Command tedge mqtt pub ${C8Y_TOPIC_PREFIX}/s/us '200,CustomMeasurement,temperature,25'
${measurements}= Device Should Have Measurements minimum=1 maximum=1 type=CustomMeasurement series=temperature
Log ${measurements}
Expand Down

0 comments on commit fc33478

Please sign in to comment.