Skip to content

Commit

Permalink
Bypass duplicate message filtering on mapper startup with config flag
Browse files Browse the repository at this point in the history
Signed-off-by: Krzysztof Piotrowski <[email protected]>
  • Loading branch information
Ruadhri17 committed Feb 2, 2024
1 parent 0f3ab0c commit a1c8bda
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,10 @@ define_tedge_config! {
/// Enable auto registration feature
#[tedge_config(example = "true", default(value = true))]
auto_register: bool,

/// On a clean start, the whole state of the device, services and child-devices is resent to the cloud
#[tedge_config(example = "true", default(value = true))]
clean_start: bool,
},
},

Expand Down
2 changes: 2 additions & 0 deletions crates/core/c8y_api/src/json_c8y.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ mod tests {
dummy_external_id_validator,
5,
&temp_dir,
true,
)
.unwrap();

Expand Down Expand Up @@ -756,6 +757,7 @@ mod tests {
dummy_external_id_validator,
5,
&temp_dir,
true,
)
.unwrap();

Expand Down
69 changes: 41 additions & 28 deletions crates/core/tedge_api/src/entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ type ExternalIdValidatorFn =
/// |tid, xid| tid.to_string().into(),
/// |xid| Ok(xid.into()),
/// 5,
/// "/tmp"
/// "/tmp",
/// true
/// );
/// ```
pub struct EntityStore {
Expand All @@ -153,6 +154,7 @@ pub struct EntityStore {
}

impl EntityStore {
#[allow(clippy::too_many_arguments)]
pub fn with_main_device_and_default_service_type<MF, SF, P>(
mqtt_schema: MqttSchema,
main_device: EntityRegistrationMessage,
Expand All @@ -161,6 +163,7 @@ impl EntityStore {
external_id_validator_fn: SF,
telemetry_cache_size: usize,
log_dir: P,
clean_start: bool,
) -> Result<Self, InitError>
where
MF: Fn(&EntityTopicId, &EntityExternalId) -> EntityExternalId,
Expand All @@ -187,11 +190,19 @@ impl EntityStore {
twin_data: Map::new(),
};

let message_log = MessageLogWriter::new(log_dir.as_ref()).map_err(|err| {
InitError::Custom(format!(
"Loading the entity store log for writes failed with {err}",
))
})?;
let message_log = if clean_start {
MessageLogWriter::new_truncated(log_dir.as_ref()).map_err(|err| {
InitError::Custom(format!(
"Loading the entity store log for writes failed with {err}",
))
})?
} else {
MessageLogWriter::new(log_dir.as_ref()).map_err(|err| {
InitError::Custom(format!(
"Loading the entity store log for writes failed with {err}",
))
})?
};

let mut entity_store = EntityStore {
mqtt_schema: mqtt_schema.clone(),
Expand Down Expand Up @@ -1072,7 +1083,7 @@ mod tests {
#[test]
fn registers_main_device() {
let temp_dir = tempfile::tempdir().unwrap();
let store = new_entity_store(&temp_dir);
let store = new_entity_store(&temp_dir, true);

assert_eq!(store.main_device(), &EntityTopicId::default_main_device());
assert!(store.get(&EntityTopicId::default_main_device()).is_some());
Expand All @@ -1081,7 +1092,7 @@ mod tests {
#[test]
fn lists_child_devices() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

// If the @parent info is not provided, it is assumed to be an immediate
// child of the main device.
Expand Down Expand Up @@ -1119,7 +1130,7 @@ mod tests {
#[test]
fn lists_services() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

// Services are namespaced under devices, so `parent` is not necessary
let updated_entities = store
Expand Down Expand Up @@ -1161,7 +1172,7 @@ mod tests {
#[test]
fn list_ancestors() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

// Assert no ancestors of main device
assert!(store
Expand Down Expand Up @@ -1268,7 +1279,7 @@ mod tests {
#[test]
fn list_ancestors_external_ids() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

// Assert ancestor external ids of main device
assert!(store
Expand Down Expand Up @@ -1379,7 +1390,7 @@ mod tests {
#[test]
fn auto_register_service() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

let service_topic_id = EntityTopicId::default_child_service("child1", "service1").unwrap();
let res = store.auto_register_entity(&service_topic_id).unwrap();
Expand Down Expand Up @@ -1410,7 +1421,7 @@ mod tests {
#[test]
fn auto_register_child_device() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

let child_topic_id = EntityTopicId::default_child_device("child2").unwrap();
let res = store.auto_register_entity(&child_topic_id).unwrap();
Expand All @@ -1430,7 +1441,7 @@ mod tests {
#[test]
fn auto_register_custom_topic_scheme_not_supported() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);
assert_matches!(
store.auto_register_entity(&EntityTopicId::from_str("custom/child2//").unwrap()),
Err(Error::NonDefaultTopicScheme(_))
Expand All @@ -1440,7 +1451,7 @@ mod tests {
#[test]
fn register_main_device_custom_scheme() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

// Register main device with custom topic scheme
let main_topic_id = EntityTopicId::from_str("custom/main//").unwrap();
Expand Down Expand Up @@ -1508,7 +1519,7 @@ mod tests {
#[test]
fn external_id_validation() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap();
let res = store.update(EntityRegistrationMessage {
Expand All @@ -1526,7 +1537,7 @@ mod tests {
#[test]
fn update_twin_data_new_fragment() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

let topic_id = EntityTopicId::default_main_device();
let updated = store
Expand All @@ -1551,7 +1562,7 @@ mod tests {
#[test]
fn update_twin_data_update_existing_fragment() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

let topic_id = EntityTopicId::default_main_device();
let _ = store
Expand Down Expand Up @@ -1584,7 +1595,7 @@ mod tests {
#[test]
fn update_twin_data_remove_fragment() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

let topic_id = EntityTopicId::default_main_device();

Expand Down Expand Up @@ -1634,6 +1645,7 @@ mod tests {
dummy_external_id_sanitizer,
5,
&temp_dir,
true,
)
.unwrap();

Expand Down Expand Up @@ -1681,7 +1693,7 @@ mod tests {
#[test]
fn duplicate_registration_message_ignored() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, false);
let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap();
let reg_message = EntityRegistrationMessage {
topic_id: entity_topic_id.clone(),
Expand All @@ -1698,15 +1710,15 @@ mod tests {
assert!(affected_entities.0.is_empty());

// Duplicate registration ignore even after the entity store is restored from the disk
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, false);
let affected_entities = store.update(reg_message).unwrap();
assert!(affected_entities.0.is_empty());
}

#[test]
fn duplicate_registration_message_ignored_after_twin_update() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, false);
let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap();
let reg_message = EntityRegistrationMessage {
topic_id: entity_topic_id.clone(),
Expand All @@ -1733,15 +1745,15 @@ mod tests {
assert!(affected_entities.0.is_empty());

// Duplicate registration ignore even after the entity store is restored from the disk
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, false);
let affected_entities = store.update(reg_message).unwrap();
assert!(affected_entities.0.is_empty());
}

#[test]
fn early_child_device_registrations_processed_only_after_parent_registration() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

let child0_topic_id = EntityTopicId::default_child_device("child0").unwrap();
let child000_topic_id = EntityTopicId::default_child_device("child000").unwrap();
Expand Down Expand Up @@ -1777,7 +1789,7 @@ mod tests {
assert!(affected_entities.0.is_empty());

// Reload the entity store from the persistent log
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

// Assert that duplicate registrations are still ignored
let affected_entities = store.update(child000_reg_message).unwrap();
Expand All @@ -1795,7 +1807,7 @@ mod tests {
let twin_fragment_value = json!("bar");

{
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, false);
store
.update(
EntityRegistrationMessage::new_custom(
Expand Down Expand Up @@ -1826,7 +1838,7 @@ mod tests {

{
// Reload the entity store using the same persistent file
let store = new_entity_store(&temp_dir);
let store = new_entity_store(&temp_dir, false);
let mut expected_entity_metadata =
EntityMetadata::child_device("child1".into()).unwrap();
expected_entity_metadata
Expand All @@ -1847,7 +1859,7 @@ mod tests {
}
}

fn new_entity_store(temp_dir: &TempDir) -> EntityStore {
fn new_entity_store(temp_dir: &TempDir, clean_start: bool) -> EntityStore {
EntityStore::with_main_device_and_default_service_type(
MqttSchema::default(),
EntityRegistrationMessage {
Expand All @@ -1862,6 +1874,7 @@ mod tests {
dummy_external_id_sanitizer,
5,
temp_dir,
clean_start,
)
.unwrap()
}
Expand Down
13 changes: 13 additions & 0 deletions crates/core/tedge_api/src/message_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,19 @@ impl MessageLogWriter {
Ok(MessageLogWriter { writer })
}

pub fn new_truncated<P>(log_dir: P) -> Result<MessageLogWriter, std::io::Error>
where
P: AsRef<Path>,
{
let _ = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(log_dir.as_ref().join(LOG_FILE_NAME))?;

MessageLogWriter::new(log_dir)
}

/// Append the JSON representation of the given message to the log.
/// Each message is appended on a new line.
pub fn append_message(&mut self, message: &MqttMessage) -> Result<(), std::io::Error> {
Expand Down
5 changes: 5 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct C8yMapperConfig {
pub auth_proxy_protocol: Protocol,
pub mqtt_schema: MqttSchema,
pub enable_auto_register: bool,
pub clean_start: bool,
}

impl C8yMapperConfig {
Expand All @@ -72,6 +73,7 @@ impl C8yMapperConfig {
auth_proxy_protocol: Protocol,
mqtt_schema: MqttSchema,
enable_auto_register: bool,
clean_start: bool,
) -> Self {
let ops_dir = config_dir
.join(SUPPORTED_OPERATIONS_DIRECTORY)
Expand All @@ -98,6 +100,7 @@ impl C8yMapperConfig {
auth_proxy_protocol,
mqtt_schema,
enable_auto_register,
clean_start,
}
}

Expand Down Expand Up @@ -139,6 +142,7 @@ impl C8yMapperConfig {

let mut topics = Self::default_internal_topic_filter(&config_dir)?;
let enable_auto_register = tedge_config.c8y.entity_store.auto_register;
let clean_start = tedge_config.c8y.entity_store.clean_start;

// Add feature topic filters
for cmd in [
Expand Down Expand Up @@ -194,6 +198,7 @@ impl C8yMapperConfig {
auth_proxy_protocol,
mqtt_schema,
enable_auto_register,
clean_start,
))
}

Expand Down
2 changes: 2 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ impl CumulocityConverter {
Self::validate_external_id,
EARLY_MESSAGE_BUFFER_SIZE,
config.state_dir.clone(),
config.clean_start,
)
.unwrap();

Expand Down Expand Up @@ -3162,6 +3163,7 @@ pub(crate) mod tests {
auth_proxy_protocol,
MqttSchema::default(),
true,
true,
)
}
fn create_c8y_converter_from_config(
Expand Down
1 change: 1 addition & 0 deletions crates/extensions/c8y_mapper_ext/src/service_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ mod tests {
crate::converter::CumulocityConverter::validate_external_id,
5,
&temp_dir,
true,
)
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2414,6 +2414,7 @@ pub(crate) async fn spawn_c8y_mapper_actor(
Protocol::Http,
MqttSchema::default(),
true,
true,
);

let mut mqtt_builder: SimpleMessageBoxBuilder<MqttMessage, MqttMessage> =
Expand Down
Loading

0 comments on commit a1c8bda

Please sign in to comment.