Skip to content

Commit

Permalink
Bypass duplicate message filtering on mapper startup with config flag
Browse files Browse the repository at this point in the history
Signed-off-by: Krzysztof Piotrowski <[email protected]>
  • Loading branch information
Ruadhri17 committed Jan 31, 2024
1 parent 947d999 commit 36bda79
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,10 @@ define_tedge_config! {
/// Enable auto registration feature
#[tedge_config(example = "true", default(value = true))]
auto_register: bool,

/// Determines if entity state on tedge should be sent to cloud on startup
#[tedge_config(example = "true", default(value = true))]
clean_start: bool,
},
},

Expand Down
2 changes: 2 additions & 0 deletions crates/core/c8y_api/src/json_c8y.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ mod tests {
dummy_external_id_validator,
5,
&temp_dir,
true,
)
.unwrap();

Expand Down Expand Up @@ -756,6 +757,7 @@ mod tests {
dummy_external_id_validator,
5,
&temp_dir,
true,
)
.unwrap();

Expand Down
57 changes: 32 additions & 25 deletions crates/core/tedge_api/src/entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ type ExternalIdValidatorFn =
/// |tid, xid| tid.to_string().into(),
/// |xid| Ok(xid.into()),
/// 5,
/// "/tmp"
/// "/tmp",
/// true
/// );
/// ```
pub struct EntityStore {
Expand All @@ -153,6 +154,7 @@ pub struct EntityStore {
}

impl EntityStore {
#[allow(clippy::too_many_arguments)]
pub fn with_main_device_and_default_service_type<MF, SF, P>(
mqtt_schema: MqttSchema,
main_device: EntityRegistrationMessage,
Expand All @@ -161,6 +163,7 @@ impl EntityStore {
external_id_validator_fn: SF,
telemetry_cache_size: usize,
log_dir: P,
clean_start: bool,
) -> Result<Self, InitError>
where
MF: Fn(&EntityTopicId, &EntityExternalId) -> EntityExternalId,
Expand All @@ -187,7 +190,7 @@ impl EntityStore {
twin_data: Map::new(),
};

let message_log = MessageLogWriter::new(log_dir.as_ref())?;
let message_log = MessageLogWriter::new(log_dir.as_ref(), clean_start)?;

let mut entity_store = EntityStore {
mqtt_schema: mqtt_schema.clone(),
Expand All @@ -201,7 +204,9 @@ impl EntityStore {
message_log,
};

entity_store.load_from_message_log(log_dir.as_ref());
if !clean_start {
entity_store.load_from_message_log(log_dir.as_ref());
}

Ok(entity_store)
}
Expand Down Expand Up @@ -1068,7 +1073,7 @@ mod tests {
#[test]
fn registers_main_device() {
let temp_dir = tempfile::tempdir().unwrap();
let store = new_entity_store(&temp_dir);
let store = new_entity_store(&temp_dir, true);

assert_eq!(store.main_device(), &EntityTopicId::default_main_device());
assert!(store.get(&EntityTopicId::default_main_device()).is_some());
Expand All @@ -1077,7 +1082,7 @@ mod tests {
#[test]
fn lists_child_devices() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

// If the @parent info is not provided, it is assumed to be an immediate
// child of the main device.
Expand Down Expand Up @@ -1115,7 +1120,7 @@ mod tests {
#[test]
fn lists_services() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

// Services are namespaced under devices, so `parent` is not necessary
let updated_entities = store
Expand Down Expand Up @@ -1157,7 +1162,7 @@ mod tests {
#[test]
fn list_ancestors() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

// Assert no ancestors of main device
assert!(store
Expand Down Expand Up @@ -1264,7 +1269,7 @@ mod tests {
#[test]
fn list_ancestors_external_ids() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

// Assert ancestor external ids of main device
assert!(store
Expand Down Expand Up @@ -1375,7 +1380,7 @@ mod tests {
#[test]
fn auto_register_service() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

let service_topic_id = EntityTopicId::default_child_service("child1", "service1").unwrap();
let res = store.auto_register_entity(&service_topic_id).unwrap();
Expand Down Expand Up @@ -1406,7 +1411,7 @@ mod tests {
#[test]
fn auto_register_child_device() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

let child_topic_id = EntityTopicId::default_child_device("child2").unwrap();
let res = store.auto_register_entity(&child_topic_id).unwrap();
Expand All @@ -1426,7 +1431,7 @@ mod tests {
#[test]
fn auto_register_custom_topic_scheme_not_supported() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);
assert_matches!(
store.auto_register_entity(&EntityTopicId::from_str("custom/child2//").unwrap()),
Err(Error::NonDefaultTopicScheme(_))
Expand All @@ -1436,7 +1441,7 @@ mod tests {
#[test]
fn register_main_device_custom_scheme() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

// Register main device with custom topic scheme
let main_topic_id = EntityTopicId::from_str("custom/main//").unwrap();
Expand Down Expand Up @@ -1504,7 +1509,7 @@ mod tests {
#[test]
fn external_id_validation() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap();
let res = store.update(EntityRegistrationMessage {
Expand All @@ -1522,7 +1527,7 @@ mod tests {
#[test]
fn update_twin_data_new_fragment() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

let topic_id = EntityTopicId::default_main_device();
let updated = store
Expand All @@ -1547,7 +1552,7 @@ mod tests {
#[test]
fn update_twin_data_update_existing_fragment() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

let topic_id = EntityTopicId::default_main_device();
let _ = store
Expand Down Expand Up @@ -1580,7 +1585,7 @@ mod tests {
#[test]
fn update_twin_data_remove_fragment() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

let topic_id = EntityTopicId::default_main_device();

Expand Down Expand Up @@ -1630,6 +1635,7 @@ mod tests {
dummy_external_id_sanitizer,
5,
&temp_dir,
true,
)
.unwrap();

Expand Down Expand Up @@ -1677,7 +1683,7 @@ mod tests {
#[test]
fn duplicate_registration_message_ignored() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, false);
let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap();
let reg_message = EntityRegistrationMessage {
topic_id: entity_topic_id.clone(),
Expand All @@ -1694,15 +1700,15 @@ mod tests {
assert!(affected_entities.0.is_empty());

// Duplicate registration ignore even after the entity store is restored from the disk
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, false);
let affected_entities = store.update(reg_message).unwrap();
assert!(affected_entities.0.is_empty());
}

#[test]
fn duplicate_registration_message_ignored_after_twin_update() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, false);
let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap();
let reg_message = EntityRegistrationMessage {
topic_id: entity_topic_id.clone(),
Expand All @@ -1729,15 +1735,15 @@ mod tests {
assert!(affected_entities.0.is_empty());

// Duplicate registration ignore even after the entity store is restored from the disk
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, false);
let affected_entities = store.update(reg_message).unwrap();
assert!(affected_entities.0.is_empty());
}

#[test]
fn early_child_device_registrations_processed_only_after_parent_registration() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

let child0_topic_id = EntityTopicId::default_child_device("child0").unwrap();
let child000_topic_id = EntityTopicId::default_child_device("child000").unwrap();
Expand Down Expand Up @@ -1773,7 +1779,7 @@ mod tests {
assert!(affected_entities.0.is_empty());

// Reload the entity store from the persistent log
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, true);

// Assert that duplicate registrations are still ignored
let affected_entities = store.update(child000_reg_message).unwrap();
Expand All @@ -1791,7 +1797,7 @@ mod tests {
let twin_fragment_value = json!("bar");

{
let mut store = new_entity_store(&temp_dir);
let mut store = new_entity_store(&temp_dir, false);
store
.update(
EntityRegistrationMessage::new_custom(
Expand Down Expand Up @@ -1822,7 +1828,7 @@ mod tests {

{
// Reload the entity store using the same persistent file
let store = new_entity_store(&temp_dir);
let store = new_entity_store(&temp_dir, false);
let mut expected_entity_metadata =
EntityMetadata::child_device("child1".into()).unwrap();
expected_entity_metadata
Expand All @@ -1843,7 +1849,7 @@ mod tests {
}
}

fn new_entity_store(temp_dir: &TempDir) -> EntityStore {
fn new_entity_store(temp_dir: &TempDir, clean_start: bool) -> EntityStore {
EntityStore::with_main_device_and_default_service_type(
MqttSchema::default(),
EntityRegistrationMessage {
Expand All @@ -1858,6 +1864,7 @@ mod tests {
dummy_external_id_sanitizer,
5,
temp_dir,
clean_start,
)
.unwrap()
}
Expand Down
21 changes: 14 additions & 7 deletions crates/core/tedge_api/src/message_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,22 @@ pub struct MessageLogWriter {
}

impl MessageLogWriter {
pub fn new<P>(log_dir: P) -> Result<MessageLogWriter, std::io::Error>
pub fn new<P>(log_dir: P, truncate: bool) -> Result<MessageLogWriter, std::io::Error>
where
P: AsRef<Path>,
{
let file = OpenOptions::new()
.create(true)
.append(true)
.open(log_dir.as_ref().join(LOG_FILE_NAME))?;

let file = if truncate {
OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(log_dir.as_ref().join(LOG_FILE_NAME))?
} else {
OpenOptions::new()
.create(true)
.append(true)
.open(log_dir.as_ref().join(LOG_FILE_NAME))?
};
// If the file is empty append the version information as a header
let metadata = file.metadata()?;
let file_is_empty = metadata.len() == 0;
Expand Down Expand Up @@ -127,7 +134,7 @@ mod tests {

// Populate the log
{
let mut message_log = MessageLogWriter::new(&temp_dir).unwrap();
let mut message_log = MessageLogWriter::new(&temp_dir, true).unwrap();
let mut message_log_reader = MessageLogReader::new(&temp_dir).unwrap();

assert_eq!(message_log_reader.next_message().unwrap(), None);
Expand Down
5 changes: 5 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub struct C8yMapperConfig {
pub auth_proxy_protocol: Protocol,
pub mqtt_schema: MqttSchema,
pub enable_auto_register: bool,
pub clean_start: bool,
}

impl C8yMapperConfig {
Expand All @@ -70,6 +71,7 @@ impl C8yMapperConfig {
auth_proxy_protocol: Protocol,
mqtt_schema: MqttSchema,
enable_auto_register: bool,
clean_start: bool,
) -> Self {
let ops_dir = config_dir.join("operations").join("c8y");
let state_dir = config_dir.join(STATE_DIR_NAME);
Expand All @@ -94,6 +96,7 @@ impl C8yMapperConfig {
auth_proxy_protocol,
mqtt_schema,
enable_auto_register,
clean_start,
}
}

Expand Down Expand Up @@ -135,6 +138,7 @@ impl C8yMapperConfig {

let mut topics = Self::default_internal_topic_filter(&config_dir)?;
let enable_auto_register = tedge_config.c8y.entity_store.auto_register;
let clean_start = tedge_config.c8y.entity_store.clean_start;

// Add feature topic filters
for cmd in [
Expand Down Expand Up @@ -190,6 +194,7 @@ impl C8yMapperConfig {
auth_proxy_protocol,
mqtt_schema,
enable_auto_register,
clean_start,
))
}

Expand Down
2 changes: 2 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ impl CumulocityConverter {
Self::validate_external_id,
EARLY_MESSAGE_BUFFER_SIZE,
config.state_dir.clone(),
config.clean_start,
)
.unwrap();

Expand Down Expand Up @@ -3412,6 +3413,7 @@ pub(crate) mod tests {
auth_proxy_protocol,
MqttSchema::default(),
true,
true,
)
}
fn create_c8y_converter_from_config(
Expand Down
1 change: 1 addition & 0 deletions crates/extensions/c8y_mapper_ext/src/service_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ mod tests {
crate::converter::CumulocityConverter::validate_external_id,
5,
&temp_dir,
true,
)
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2623,6 +2623,7 @@ pub(crate) async fn spawn_c8y_mapper_actor(
Protocol::Http,
MqttSchema::default(),
true,
true,
);

let mut mqtt_builder: SimpleMessageBoxBuilder<MqttMessage, MqttMessage> =
Expand Down
Loading

0 comments on commit 36bda79

Please sign in to comment.