From a453080aa88411095b1818d9c215e66e5c452f2e Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 28 Oct 2024 12:08:43 +0100 Subject: [PATCH] feat(sdk): Find and remove duplicated events in `RoomEvents`. This patch uses the new `Deduplicator` type, along with `LinkeChunk::remove_item_at` to remove duplicated events. When a new event is received, the older one is removed. --- .../src/event_cache/linked_chunk/mod.rs | 12 + crates/matrix-sdk/src/event_cache/store.rs | 522 +++++++++++++++++- 2 files changed, 512 insertions(+), 22 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs b/crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs index 1bfda5f9df7..c39f15b68ca 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs @@ -948,6 +948,18 @@ impl Position { pub fn index(&self) -> usize { self.1 } + + /// Move the index part (see [`Self::index`]) to the left, i.e. subtract 1. + /// + /// # Panic + /// + /// This method will panic if it will overflow, i.e. if the index is 0. + pub(super) fn move_index_to_the_left(&mut self) { + self.1 = self + .1 + .checked_sub(1) + .expect("Cannot move position's index to the left because it's already 0"); + } } /// An iterator over a [`LinkedChunk`] that traverses the chunk in backward diff --git a/crates/matrix-sdk/src/event_cache/store.rs b/crates/matrix-sdk/src/event_cache/store.rs index eb7e771dbd4..cbf7c20689f 100644 --- a/crates/matrix-sdk/src/event_cache/store.rs +++ b/crates/matrix-sdk/src/event_cache/store.rs @@ -12,11 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; +use std::{cmp::Ordering, fmt}; use matrix_sdk_common::deserialized_responses::SyncTimelineEvent; +use ruma::OwnedEventId; +use tracing::error; -use super::linked_chunk::{Chunk, ChunkIdentifier, Error, Iter, LinkedChunk, Position}; +use super::{ + deduplicator::{Decoration, Deduplicator}, + linked_chunk::{Chunk, ChunkIdentifier, Error, Iter, LinkedChunk, Position}, +}; /// An alias for the real event type. pub(crate) type Event = SyncTimelineEvent; @@ -34,6 +39,9 @@ const DEFAULT_CHUNK_CAPACITY: usize = 128; pub struct RoomEvents { /// The real in-memory storage for all the events. chunks: LinkedChunk, + + /// The events deduplicator instance to help finding duplicates. + deduplicator: Deduplicator, } impl Default for RoomEvents { @@ -45,7 +53,7 @@ impl Default for RoomEvents { impl RoomEvents { /// Build a new [`RoomEvents`] struct with zero events. pub fn new() -> Self { - Self { chunks: LinkedChunk::new() } + Self { chunks: LinkedChunk::new(), deduplicator: Deduplicator::new() } } /// Clear all events. @@ -59,9 +67,18 @@ impl RoomEvents { pub fn push_events(&mut self, events: I) where I: IntoIterator, - I::IntoIter: ExactSizeIterator, { - self.chunks.push_items_back(events) + let (unique_events, duplicated_event_ids) = + self.filter_duplicated_events(events.into_iter()); + + // Remove the _old_ duplicated events! + // + // We don't have to worry the removals can change the position of the existing + // events, because we are pushing all _new_ `events` at the back. + self.remove_events(duplicated_event_ids); + + // Push new `events`. + self.chunks.push_items_back(unique_events); } /// Push a gap after all events or gaps. @@ -70,12 +87,21 @@ impl RoomEvents { } /// Insert events at a specified position. - pub fn insert_events_at(&mut self, events: I, position: Position) -> Result<(), Error> + pub fn insert_events_at(&mut self, events: I, mut position: Position) -> Result<(), Error> where I: IntoIterator, - I::IntoIter: ExactSizeIterator, { - self.chunks.insert_items_at(events, position) + let (unique_events, duplicated_event_ids) = + self.filter_duplicated_events(events.into_iter()); + + // Remove the _old_ duplicated events! + // + // We **have to worry* the removals can change the position of the + // existing events. We **have** to update the `position` + // argument value for each removal. + self.remove_events_and_update_position(duplicated_event_ids, &mut position); + + self.chunks.insert_items_at(unique_events, position) } /// Insert a gap at a specified position. @@ -97,9 +123,19 @@ impl RoomEvents { ) -> Result<&Chunk, Error> where I: IntoIterator, - I::IntoIter: ExactSizeIterator, { - self.chunks.replace_gap_at(events, gap_identifier) + let (unique_events, duplicated_event_ids) = + self.filter_duplicated_events(events.into_iter()); + + // Remove the _old_ duplicated events! + // + // We don't have to worry the removals can change the position of the existing + // events, because we are replacing a gap: its identifier will not change + // because of the removals. + self.remove_events(duplicated_event_ids); + + // Replace the gap by new events. + self.chunks.replace_gap_at(unique_events, gap_identifier) } /// Search for a chunk, and return its identifier. @@ -130,6 +166,126 @@ impl RoomEvents { pub fn events(&self) -> impl Iterator { self.chunks.items() } + + /// Deduplicate `events` considering all events in `Self::chunks`. + /// + /// The returned tuple contains (i) the unique events, and (ii) the + /// duplicated events (by ID). + fn filter_duplicated_events<'a, I>(&'a mut self, events: I) -> (Vec, Vec) + where + I: Iterator + 'a, + { + let mut duplicated_event_ids = Vec::new(); + + let deduplicated_events = self + .deduplicator + .scan_and_learn(events, self) + .filter_map(|decorated_event| match decorated_event { + Decoration::Unique(event) => Some(event), + Decoration::Duplicated(event) => { + error!(event_id = ?event.event_id(), "Found a duplicated event"); + + duplicated_event_ids.push(event.event_id().expect("The event has no ID")); + + // Keep the new event! + Some(event) + } + Decoration::Invalid(event) => { + error!(?event, "Found an invalid event"); + + None + } + }) + .collect(); + + (deduplicated_events, duplicated_event_ids) + } +} + +// Private implementations, implementation specific. +impl RoomEvents { + /// Remove some events from `Self::chunks`. + /// + /// This method iterates over all event IDs in `event_ids` and removes the + /// associated event (if it exists) from `Self::chunks`. + /// + /// This is used to remove duplicated events, see + /// [`Self::filter_duplicated_events`]. + fn remove_events(&mut self, event_ids: Vec) { + for event_id in event_ids { + let Some(event_position) = self.revents().find_map(|(position, event)| { + (event.event_id().as_ref() == Some(&event_id)).then_some(position) + }) else { + error!(?event_id, "An event has been detected, but it's position seems unknown"); + + continue; + }; + + self.chunks + .remove_item_at(event_position) + .expect("Failed to remove an event we have just found"); + } + } + + /// Remove all events from `Self::chunks` and update a fix [`Position`]. + /// + /// This method iterates over all event IDs in `event_ids` and removes the + /// associated event (if it exists) from `Self::chunks`, exactly like + /// [`Self::remove_events`]. The difference is that it will update a + /// [`Position`] according to the removals. + /// + /// This is used to remove duplicated events, see + /// [`Self::filter_duplicated_events`]. + fn remove_events_and_update_position( + &mut self, + event_ids: Vec, + position: &mut Position, + ) { + for event_id in event_ids { + let Some(event_position) = self.revents().find_map(|(position, event)| { + (event.event_id().as_ref() == Some(&event_id)).then_some(position) + }) else { + error!(?event_id, "An event has been detected, but it's position seems unknown"); + + continue; + }; + + self.chunks + .remove_item_at(event_position) + .expect("Failed to remove an event we have just found"); + + // A `Position` is composed of a `ChunkIdentifier` and an index. + // The `ChunkIdentifier` is stable, i.e. it won't change if an + // event is removed in another chunk. It means we only need to + // update `position` if the removal happened in **the same + // chunk**. + if event_position.chunk_identifier() == position.chunk_identifier() { + // Now we can compare the the position indices. + match event_position.index().cmp(&position.index()) { + // `event_position`'s index < `position`'s index + Ordering::Less => { + // An event has been removed _before_ the new + // events: `position` needs to be shifted to the + // left by 1. + position.move_index_to_the_left(); + } + + // `event_position`'s index == `position`'s index + Ordering::Equal => { + // An event has been removed at the same position of + // the new events: `position` does _NOT_ need tp be + // modified. + } + + // `event_position`'s index > `position`'s index + Ordering::Greater => { + // An event has been removed _after_ the new events: + // `position` does _NOT_ need to be modified. + } + } + } + } + } } impl fmt::Debug for RoomEvents { @@ -205,11 +361,11 @@ mod tests { let event_builder = EventBuilder::new(); let (event_id_0, event_0) = new_event(&event_builder, "$ev0"); + let (event_id_1, event_1) = new_event(&event_builder, "$ev1"); let mut room_events = RoomEvents::new(); - room_events.push_events([event_0.clone()]); - room_events.push_events([event_0]); + room_events.push_events([event_0.clone(), event_1]); { let mut events = room_events.events(); @@ -219,6 +375,26 @@ mod tests { assert_eq!(position.index(), 0); assert_eq!(event.event_id().unwrap(), event_id_0); + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_1); + + assert!(events.next().is_none()); + } + + // Everything is alright. Now let's push a duplicated event. + room_events.push_events([event_0]); + + { + let mut events = room_events.events(); + + // The first `event_id_0` has been removed. + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_1); + assert_let!(Some((position, event)) = events.next()); assert_eq!(position.chunk_identifier(), 0); assert_eq!(position.index(), 1); @@ -317,25 +493,25 @@ mod tests { } #[test] - fn test_insert_events_at_with_dupicates() { + fn test_insert_events_at_with_duplicates() { let event_builder = EventBuilder::new(); let (event_id_0, event_0) = new_event(&event_builder, "$ev0"); let (event_id_1, event_1) = new_event(&event_builder, "$ev1"); + let (event_id_2, event_2) = new_event(&event_builder, "$ev2"); + let (event_id_3, event_3) = new_event(&event_builder, "$ev3"); let mut room_events = RoomEvents::new(); - room_events.push_events([event_0, event_1.clone()]); + room_events.push_events([event_0.clone(), event_1, event_2]); - let position_of_event_1 = room_events + let position_of_event_2 = room_events .events() .find_map(|(position, event)| { - (event.event_id().unwrap() == event_id_1).then_some(position) + (event.event_id().unwrap() == event_id_2).then_some(position) }) .unwrap(); - room_events.insert_events_at([event_1], position_of_event_1).unwrap(); - { let mut events = room_events.events(); @@ -352,8 +528,38 @@ mod tests { assert_let!(Some((position, event)) = events.next()); assert_eq!(position.chunk_identifier(), 0); assert_eq!(position.index(), 2); + assert_eq!(event.event_id().unwrap(), event_id_2); + + assert!(events.next().is_none()); + } + + // Everything is alright. Now let's insert a duplicated events! + room_events.insert_events_at([event_0, event_3], position_of_event_2).unwrap(); + + { + let mut events = room_events.events(); + + // The first `event_id_0` has been removed. + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); assert_eq!(event.event_id().unwrap(), event_id_1); + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_0); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 2); + assert_eq!(event.event_id().unwrap(), event_id_3); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 3); + assert_eq!(event.event_id().unwrap(), event_id_2); + assert!(events.next().is_none()); } } @@ -472,10 +678,11 @@ mod tests { let (event_id_0, event_0) = new_event(&event_builder, "$ev0"); let (event_id_1, event_1) = new_event(&event_builder, "$ev1"); + let (event_id_2, event_2) = new_event(&event_builder, "$ev2"); let mut room_events = RoomEvents::new(); - room_events.push_events([event_0.clone()]); + room_events.push_events([event_0.clone(), event_1]); room_events.push_gap(Gap { prev_token: "hello".to_owned() }); let chunk_identifier_of_gap = room_events @@ -484,8 +691,6 @@ mod tests { .unwrap() .chunk_identifier(); - room_events.replace_gap_at([event_0, event_1], chunk_identifier_of_gap).unwrap(); - { let mut events = room_events.events(); @@ -494,6 +699,26 @@ mod tests { assert_eq!(position.index(), 0); assert_eq!(event.event_id().unwrap(), event_id_0); + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_1); + + assert!(events.next().is_none()); + } + + // Everything is alright. Now let's replace a gap with a duplicated event. + room_events.replace_gap_at([event_0, event_2], chunk_identifier_of_gap).unwrap(); + + { + let mut events = room_events.events(); + + // The first `event_id_0` has been removed. + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_1); + assert_let!(Some((position, event)) = events.next()); assert_eq!(position.chunk_identifier(), 2); assert_eq!(position.index(), 0); @@ -502,7 +727,7 @@ mod tests { assert_let!(Some((position, event)) = events.next()); assert_eq!(position.chunk_identifier(), 2); assert_eq!(position.index(), 1); - assert_eq!(event.event_id().unwrap(), event_id_1); + assert_eq!(event.event_id().unwrap(), event_id_2); assert!(events.next().is_none()); } @@ -519,4 +744,257 @@ mod tests { assert!(chunks.next().is_none()); } } + + #[test] + fn test_remove_events() { + let event_builder = EventBuilder::new(); + + let (event_id_0, event_0) = new_event(&event_builder, "$ev0"); + let (event_id_1, event_1) = new_event(&event_builder, "$ev1"); + let (event_id_2, event_2) = new_event(&event_builder, "$ev2"); + let (event_id_3, event_3) = new_event(&event_builder, "$ev3"); + + // Push some events. + let mut room_events = RoomEvents::new(); + room_events.push_events([event_0, event_1, event_2, event_3]); + + // Remove some events. + room_events.remove_events(vec![event_id_1, event_id_3]); + + let mut events = room_events.events(); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_0); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_2); + + assert!(events.next().is_none()); + } + + #[test] + fn test_remove_events_unknown_event() { + let event_builder = EventBuilder::new(); + + let (event_id_0, _event_0) = new_event(&event_builder, "$ev0"); + + // Push ZERO event. + let mut room_events = RoomEvents::new(); + + // Remove one undefined event. + // No error is expected. + room_events.remove_events(vec![event_id_0]); + + let mut events = room_events.events(); + assert!(events.next().is_none()); + } + + #[test] + fn test_remove_events_and_update_position() { + let event_builder = EventBuilder::new(); + + let (event_id_0, event_0) = new_event(&event_builder, "$ev0"); + let (event_id_1, event_1) = new_event(&event_builder, "$ev1"); + let (event_id_2, event_2) = new_event(&event_builder, "$ev2"); + let (event_id_3, event_3) = new_event(&event_builder, "$ev3"); + let (event_id_4, event_4) = new_event(&event_builder, "$ev4"); + let (event_id_5, event_5) = new_event(&event_builder, "$ev5"); + let (event_id_6, event_6) = new_event(&event_builder, "$ev6"); + let (event_id_7, event_7) = new_event(&event_builder, "$ev7"); + let (event_id_8, event_8) = new_event(&event_builder, "$ev8"); + + // Push some events. + let mut room_events = RoomEvents::new(); + room_events.push_events([event_0, event_1, event_2, event_3, event_4, event_5, event_6]); + room_events.push_gap(Gap { prev_token: "raclette".to_owned() }); + room_events.push_events([event_7, event_8]); + + fn position_of(room_events: &RoomEvents, event_id: &EventId) -> Position { + room_events + .events() + .find_map(|(position, event)| { + (event.event_id().unwrap() == event_id).then_some(position) + }) + .unwrap() + } + + // In the same chunk… + { + // Get the position of `event_4`. + let mut position = position_of(&room_events, &event_id_4); + + // Remove one event BEFORE `event_4`. + // + // The position must move to the left by 1. + { + let previous_position = position; + room_events.remove_events_and_update_position(vec![event_id_0], &mut position); + + assert_eq!(previous_position.chunk_identifier(), position.chunk_identifier()); + assert_eq!(previous_position.index() - 1, position.index()); + + // It still represents the position of `event_4`. + assert_eq!(position, position_of(&room_events, &event_id_4)); + } + + // Remove one event AFTER `event_4`. + // + // The position must not move. + { + let previous_position = position; + room_events.remove_events_and_update_position(vec![event_id_5], &mut position); + + assert_eq!(previous_position.chunk_identifier(), position.chunk_identifier()); + assert_eq!(previous_position.index(), position.index()); + + // It still represents the position of `event_4`. + assert_eq!(position, position_of(&room_events, &event_id_4)); + } + + // Remove one event: `event_4`. + // + // The position must not move. + { + let previous_position = position; + room_events.remove_events_and_update_position(vec![event_id_4], &mut position); + + assert_eq!(previous_position.chunk_identifier(), position.chunk_identifier()); + assert_eq!(previous_position.index(), position.index()); + } + + // Check the events. + { + let mut events = room_events.events(); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_1); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_2); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 2); + assert_eq!(event.event_id().unwrap(), event_id_3); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 3); + assert_eq!(event.event_id().unwrap(), event_id_6); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 2); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_7); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 2); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_8); + + assert!(events.next().is_none()); + } + } + + // In another chunk… + { + // Get the position of `event_7`. + let mut position = position_of(&room_events, &event_id_7); + + // Remove one event BEFORE `event_7`. + // + // The position must not move because it happens in another chunk. + { + let previous_position = position; + room_events.remove_events_and_update_position(vec![event_id_1], &mut position); + + assert_eq!(previous_position.chunk_identifier(), position.chunk_identifier()); + assert_eq!(previous_position.index(), position.index()); + + // It still represents the position of `event_7`. + assert_eq!(position, position_of(&room_events, &event_id_7)); + } + + // Check the events. + { + let mut events = room_events.events(); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_2); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_3); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 2); + assert_eq!(event.event_id().unwrap(), event_id_6); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 2); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_7); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 2); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_8); + + assert!(events.next().is_none()); + } + } + + // In the same chunk, but remove multiple events, just for the fun and to ensure + // the loop works correctly. + { + // Get the position of `event_6`. + let mut position = position_of(&room_events, &event_id_6); + + // Remove three events BEFORE `event_6`. + // + // The position must move. + { + let previous_position = position; + room_events.remove_events_and_update_position( + vec![event_id_2, event_id_3, event_id_7], + &mut position, + ); + + assert_eq!(previous_position.chunk_identifier(), position.chunk_identifier()); + assert_eq!(previous_position.index() - 2, position.index()); + + // It still represents the position of `event_6`. + assert_eq!(position, position_of(&room_events, &event_id_6)); + } + + // Check the events. + { + let mut events = room_events.events(); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_6); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 2); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_8); + + assert!(events.next().is_none()); + } + } + } }