diff --git a/Cargo.lock b/Cargo.lock index f562bfe965a..bd81ba80764 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3127,6 +3127,7 @@ dependencies = [ "futures-executor", "futures-util", "gloo-timers", + "growable-bloom-filter", "http", "image", "imbl", diff --git a/crates/matrix-sdk/Cargo.toml b/crates/matrix-sdk/Cargo.toml index 2b2a5cd7062..ed7fe4f63aa 100644 --- a/crates/matrix-sdk/Cargo.toml +++ b/crates/matrix-sdk/Cargo.toml @@ -84,6 +84,7 @@ eyeball-im = { workspace = true } eyre = { version = "0.6.8", optional = true } futures-core = { workspace = true } futures-util = { workspace = true } +growable-bloom-filter = { workspace = true } http = { workspace = true } imbl = { workspace = true, features = ["serde"] } indexmap = "2.0.2" diff --git a/crates/matrix-sdk/src/event_cache/deduplicator.rs b/crates/matrix-sdk/src/event_cache/deduplicator.rs new file mode 100644 index 00000000000..ca252230fee --- /dev/null +++ b/crates/matrix-sdk/src/event_cache/deduplicator.rs @@ -0,0 +1,262 @@ +// Copyright 2024 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{collections::BTreeSet, sync::Mutex}; + +use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder}; + +use super::store::{Event, RoomEvents}; + +/// Use `Deduplicator` to find duplicates. +/// +/// This type uses a [bloom filter] to efficiently detect duplicates. Every time +/// [`Self::scan_and_learn`] is called, the bloom filter is updated (hence the +/// _learn_ part). Because a bloom filter has (rare) false positives, it is +/// still necessary to provide all existing events to apply a linear search. +/// +/// [bloom filter]: https://en.wikipedia.org/wiki/Bloom_filter +pub(super) struct Deduplicator { + bloom_filter: Mutex, +} + +impl Deduplicator { + const APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS: usize = 800_000; + const DESIRED_FALSE_POSITIVE_RATE: f64 = 0.001; + + /// Create a new `Self`. + pub fn new() -> Self { + Self { + bloom_filter: Mutex::new( + GrowableBloomBuilder::new() + .estimated_insertions(Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS) + .desired_error_ratio(Self::DESIRED_FALSE_POSITIVE_RATE) + .build(), + ), + } + } + + /// Scan a collection of events and detect duplications. + /// + /// This method takes a collection of events `events_to_scan` and returns a + /// new collection of events, where each event is decorated by a + /// [`Decoration`], so that the caller can decide what to do with these + /// events. + /// + /// Each scanned event will update `Self`'s internal state. + /// + /// `existing_events` represents all events of a room that already exist. + pub fn scan_and_learn<'a, I>( + &'a self, + events_to_scan: I, + existing_events: &'a RoomEvents, + ) -> impl Iterator> + 'a + where + I: Iterator + 'a, + { + let mut already_seen = BTreeSet::new(); + + events_to_scan.map(move |event| { + let Some(event_id) = event.event_id() else { + // The event has no `event_id`. + return Decoration::Invalid(event); + }; + + if self.bloom_filter.lock().unwrap().check_and_set(&event_id) { + // Bloom filter has false positives. We are NOT sure the event is NOT present. + // Even if the false positive rate is low, we need to iterate over all events to + // ensure it isn't present. + + // But first, let's ensure `event` is not a duplicate from `events`, i.e. if the + // iterator itself contains duplicated events! We use a `BTreetSet`, otherwise + // using a bloom filter again may generate false positives. + if already_seen.contains(&event_id) { + // The iterator contains a duplicated `event`. + return Decoration::Duplicated(event); + } + + // Now we can iterate over all events to ensure `event` is not present in + // `existing_events`. + let duplicated = existing_events.revents().any(|(_position, other_event)| { + other_event.event_id().as_ref() == Some(&event_id) + }); + + already_seen.insert(event_id); + + if duplicated { + Decoration::Duplicated(event) + } else { + Decoration::Unique(event) + } + } else { + already_seen.insert(event_id); + + // Bloom filter has no false negatives. We are sure the event is NOT present: we + // can keep it in the iterator. + Decoration::Unique(event) + } + }) + } +} + +/// Information about the scanned collection of events. +#[derive(Debug)] +pub(super) enum Decoration { + /// This event is not duplicated. + Unique(I), + + /// This event is duplicated. + Duplicated(I), + + /// This event is invalid (i.e. not well formed). + Invalid(I), +} + +#[cfg(test)] +mod tests { + use assert_matches2::assert_let; + use matrix_sdk_base::deserialized_responses::SyncTimelineEvent; + use matrix_sdk_test::{EventBuilder, ALICE}; + use ruma::{events::room::message::RoomMessageEventContent, owned_event_id, EventId}; + + use super::*; + + fn sync_timeline_event(event_builder: &EventBuilder, event_id: &EventId) -> SyncTimelineEvent { + SyncTimelineEvent::new(event_builder.make_sync_message_event_with_id( + *ALICE, + event_id, + RoomMessageEventContent::text_plain("foo"), + )) + } + + #[test] + fn test_filter_no_duplicate() { + let event_builder = EventBuilder::new(); + + let event_id_0 = owned_event_id!("$ev0"); + let event_id_1 = owned_event_id!("$ev1"); + let event_id_2 = owned_event_id!("$ev2"); + + let event_0 = sync_timeline_event(&event_builder, &event_id_0); + let event_1 = sync_timeline_event(&event_builder, &event_id_1); + let event_2 = sync_timeline_event(&event_builder, &event_id_2); + + let deduplicator = Deduplicator::new(); + let existing_events = RoomEvents::new(); + + let mut events = + deduplicator.scan_and_learn([event_0, event_1, event_2].into_iter(), &existing_events); + + assert_let!(Some(Decoration::Unique(event)) = events.next()); + assert_eq!(event.event_id(), Some(event_id_0)); + + assert_let!(Some(Decoration::Unique(event)) = events.next()); + assert_eq!(event.event_id(), Some(event_id_1)); + + assert_let!(Some(Decoration::Unique(event)) = events.next()); + assert_eq!(event.event_id(), Some(event_id_2)); + + assert!(events.next().is_none()); + } + + #[test] + fn test_filter_duplicates_in_new_events() { + let event_builder = EventBuilder::new(); + + let event_id_0 = owned_event_id!("$ev0"); + let event_id_1 = owned_event_id!("$ev1"); + + let event_0 = sync_timeline_event(&event_builder, &event_id_0); + let event_1 = sync_timeline_event(&event_builder, &event_id_1); + + let deduplicator = Deduplicator::new(); + let existing_events = RoomEvents::new(); + + let mut events = deduplicator.scan_and_learn( + [ + event_0.clone(), // OK + event_0, // Not OK + event_1, // OK + ] + .into_iter(), + &existing_events, + ); + + assert_let!(Some(Decoration::Unique(event)) = events.next()); + assert_eq!(event.event_id(), Some(event_id_0.clone())); + + assert_let!(Some(Decoration::Duplicated(event)) = events.next()); + assert_eq!(event.event_id(), Some(event_id_0)); + + assert_let!(Some(Decoration::Unique(event)) = events.next()); + assert_eq!(event.event_id(), Some(event_id_1)); + + assert!(events.next().is_none()); + } + + #[test] + fn test_filter_duplicates_with_existing_events() { + let event_builder = EventBuilder::new(); + + let event_id_0 = owned_event_id!("$ev0"); + let event_id_1 = owned_event_id!("$ev1"); + let event_id_2 = owned_event_id!("$ev2"); + + let event_0 = sync_timeline_event(&event_builder, &event_id_0); + let event_1 = sync_timeline_event(&event_builder, &event_id_1); + let event_2 = sync_timeline_event(&event_builder, &event_id_2); + + let deduplicator = Deduplicator::new(); + let mut existing_events = RoomEvents::new(); + + // Simulate `event_1` is inserted inside `existing_events`. + { + let mut events = + deduplicator.scan_and_learn([event_1.clone()].into_iter(), &existing_events); + + assert_let!(Some(Decoration::Unique(event_1)) = events.next()); + assert_eq!(event_1.event_id(), Some(event_id_1.clone())); + + assert!(events.next().is_none()); + + drop(events); // make the borrow checker happy. + + // Now we can push `event_1` inside `existing_events`. + existing_events.push_events([event_1]); + } + + // `event_1` will be duplicated. + { + let mut events = deduplicator.scan_and_learn( + [ + event_0, // OK + event_1, // Not OK + event_2, // Ok + ] + .into_iter(), + &existing_events, + ); + + assert_let!(Some(Decoration::Unique(event)) = events.next()); + assert_eq!(event.event_id(), Some(event_id_0)); + + assert_let!(Some(Decoration::Duplicated(event)) = events.next()); + assert_eq!(event.event_id(), Some(event_id_1)); + + assert_let!(Some(Decoration::Unique(event)) = events.next()); + assert_eq!(event.event_id(), Some(event_id_2)); + + assert!(events.next().is_none()); + } + } +} diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk/as_vector.rs b/crates/matrix-sdk/src/event_cache/linked_chunk/as_vector.rs index bff2eecde6b..4a39e5d5a74 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk/as_vector.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk/as_vector.rs @@ -14,7 +14,7 @@ use std::{ collections::VecDeque, - ops::ControlFlow, + ops::{ControlFlow, Not}, sync::{Arc, RwLock}, }; @@ -22,7 +22,7 @@ use eyeball_im::VectorDiff; use super::{ updates::{ReaderToken, Update, UpdatesInner}, - ChunkContent, ChunkIdentifier, Iter, + ChunkContent, ChunkIdentifier, Iter, Position, }; /// A type alias to represent a chunk's length. This is purely for commodity. @@ -253,7 +253,8 @@ impl UpdateToVectorDiff { // // From the `VectorDiff` “point of view”, this optimisation aims at avoiding // removing items to push them again later. - let mut mute_push_items = false; + let mut reattaching = false; + let mut detaching = false; for update in updates { match update { @@ -329,46 +330,22 @@ impl UpdateToVectorDiff { } Update::PushItems { at: position, items } => { - let expected_chunk_identifier = position.chunk_identifier(); + let number_of_chunks = self.chunks.len(); + let (offset, (chunk_index, chunk_length)) = self.map_to_offset(position); - let (chunk_index, offset, chunk_length) = { - let control_flow = self.chunks.iter_mut().enumerate().try_fold( - position.index(), - |offset, (chunk_index, (chunk_identifier, chunk_length))| { - if chunk_identifier == &expected_chunk_identifier { - ControlFlow::Break((chunk_index, offset, chunk_length)) - } else { - ControlFlow::Continue(offset + *chunk_length) - } - }, - ); - - match control_flow { - // Chunk has been found, and all values have been calculated as - // expected. - ControlFlow::Break(values) => values, - - // Chunk has not been found. - ControlFlow::Continue(..) => { - // SAFETY: Assuming `LinkedChunk` and `ObservableUpdates` are not - // buggy, and assuming `Self::chunks` is correctly initialized, it - // is not possible to push items on a chunk that does not exist. If - // this predicate fails, it means `LinkedChunk` or - // `ObservableUpdates` contain a bug. - panic!("Pushing items: The chunk is not found"); - } - } - }; + let is_pushing_back = + chunk_index + 1 == number_of_chunks && position.index() >= *chunk_length; + // Add the number of items to the chunk in `self.chunks`. *chunk_length += items.len(); - // See `mute_push_items` to learn more. - if mute_push_items { + // See `reattaching` to learn more. + if reattaching { continue; } // Optimisation: we can emit a `VectorDiff::Append` in this particular case. - if chunk_index + 1 == self.chunks.len() { + if is_pushing_back && detaching.not() { diffs.push(VectorDiff::Append { values: items.into() }); } // No optimisation: let's emit `VectorDiff::Insert`. @@ -379,15 +356,30 @@ impl UpdateToVectorDiff { } } - Update::DetachLastItems { at } => { - let expected_chunk_identifier = at.chunk_identifier(); - let new_length = at.index(); + Update::RemoveItem { at: position } => { + let (offset, (_chunk_index, chunk_length)) = self.map_to_offset(position); + + // Remove one item to the chunk in `self.chunks`. + *chunk_length -= 1; - let length = self + // See `reattaching` to learn more. + if reattaching { + continue; + } + + // Let's emit a `VectorDiff::Remove`. + diffs.push(VectorDiff::Remove { index: offset }); + } + + Update::DetachLastItems { at: position } => { + let expected_chunk_identifier = position.chunk_identifier(); + let new_length = position.index(); + + let chunk_length = self .chunks .iter_mut() - .find_map(|(chunk_identifier, length)| { - (*chunk_identifier == expected_chunk_identifier).then_some(length) + .find_map(|(chunk_identifier, chunk_length)| { + (*chunk_identifier == expected_chunk_identifier).then_some(chunk_length) }) // SAFETY: Assuming `LinkedChunk` and `ObservableUpdates` are not buggy, and // assuming `Self::chunks` is correctly initialized, it is not possible to @@ -395,23 +387,63 @@ impl UpdateToVectorDiff { // it means `LinkedChunk` or `ObservableUpdates` contain a bug. .expect("Detach last items: The chunk is not found"); - *length = new_length; + *chunk_length = new_length; + + // Entering the _detaching_ mode. + detaching = true; } Update::StartReattachItems => { - // Entering the `reattaching` mode. - mute_push_items = true; + // Entering the _reattaching_ mode. + reattaching = true; } Update::EndReattachItems => { - // Exiting the `reattaching` mode. - mute_push_items = false; + // Exiting the _reattaching_ mode. + reattaching = false; + + // Exiting the _detaching_ mode. + detaching = false; } } } diffs } + + fn map_to_offset(&mut self, position: &Position) -> (usize, (usize, &mut usize)) { + let expected_chunk_identifier = position.chunk_identifier(); + + let (offset, (chunk_index, chunk_length)) = { + let control_flow = self.chunks.iter_mut().enumerate().try_fold( + position.index(), + |offset, (chunk_index, (chunk_identifier, chunk_length))| { + if chunk_identifier == &expected_chunk_identifier { + ControlFlow::Break((offset, (chunk_index, chunk_length))) + } else { + ControlFlow::Continue(offset + *chunk_length) + } + }, + ); + + match control_flow { + // Chunk has been found, and all values have been calculated as + // expected. + ControlFlow::Break(values) => values, + + // Chunk has not been found. + ControlFlow::Continue(..) => { + // SAFETY: Assuming `LinkedChunk` and `ObservableUpdates` are not buggy, and + // assuming `Self::chunks` is correctly initialized, it is not possible to work + // on a chunk that does not exist. If this predicate fails, it means + // `LinkedChunk` or `ObservableUpdates` contain a bug. + panic!("The chunk is not found"); + } + } + }; + + (offset, (chunk_index, chunk_length)) + } } #[cfg(test)] @@ -435,6 +467,9 @@ mod tests { match diff { VectorDiff::Insert { index, value } => accumulator.insert(index, value), VectorDiff::Append { values } => accumulator.append(values), + VectorDiff::Remove { index } => { + accumulator.remove(index); + } diff => unimplemented!("{diff:?}"), } } @@ -578,15 +613,77 @@ mod tests { &[VectorDiff::Insert { index: 0, value: 'm' }], ); + let removed_item = linked_chunk + .remove_item_at(linked_chunk.item_position(|item| *item == 'c').unwrap()) + .unwrap(); + assert_eq!(removed_item, 'c'); + assert_items_eq!( + linked_chunk, + ['m', 'a', 'w'] ['x'] ['y', 'z', 'b'] ['d'] ['i', 'j', 'k'] ['l'] ['e', 'f', 'g'] ['h'] + ); + + // From an `ObservableVector` point of view, it would look like: + // + // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 + // +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ + // | m | a | w | x | y | z | b | d | i | j | k | l | e | f | g | h | + // +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ + // ^ + // | + // `c` has been removed + apply_and_assert_eq(&mut accumulator, as_vector.take(), &[VectorDiff::Remove { index: 7 }]); + + let removed_item = linked_chunk + .remove_item_at(linked_chunk.item_position(|item| *item == 'z').unwrap()) + .unwrap(); + assert_eq!(removed_item, 'z'); + assert_items_eq!( + linked_chunk, + ['m', 'a', 'w'] ['x'] ['y', 'b'] ['d'] ['i', 'j', 'k'] ['l'] ['e', 'f', 'g'] ['h'] + ); + + // From an `ObservableVector` point of view, it would look like: + // + // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + // +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ + // | m | a | w | x | y | b | d | i | j | k | l | e | f | g | h | + // +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ + // ^ + // | + // `z` has been removed + apply_and_assert_eq(&mut accumulator, as_vector.take(), &[VectorDiff::Remove { index: 5 }]); + + linked_chunk + .insert_items_at(['z'], linked_chunk.item_position(|item| *item == 'h').unwrap()) + .unwrap(); + + assert_items_eq!( + linked_chunk, + ['m', 'a', 'w'] ['x'] ['y', 'b'] ['d'] ['i', 'j', 'k'] ['l'] ['e', 'f', 'g'] ['z', 'h'] + ); + + // From an `ObservableVector` point of view, it would look like: + // + // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 + // +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ + // | m | a | w | x | y | b | d | i | j | k | l | e | f | g | z | h | + // +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ + // ^^^^ + // | + // new! + apply_and_assert_eq( + &mut accumulator, + as_vector.take(), + &[VectorDiff::Insert { index: 14, value: 'z' }], + ); + drop(linked_chunk); assert!(as_vector.take().is_empty()); // Finally, ensure the “reconstitued” vector is the one expected. assert_eq!( accumulator, - vector![ - 'm', 'a', 'w', 'x', 'y', 'z', 'b', 'c', 'd', 'i', 'j', 'k', 'l', 'e', 'f', 'g', 'h' - ] + vector!['m', 'a', 'w', 'x', 'y', 'b', 'd', 'i', 'j', 'k', 'l', 'e', 'f', 'g', 'z', 'h'] ); } @@ -622,6 +719,7 @@ mod tests { PushItems { items: Vec }, PushGap, ReplaceLastGap { items: Vec }, + RemoveItem { item: char }, } fn as_vector_operation_strategy() -> impl Strategy { @@ -633,13 +731,16 @@ mod tests { 1 => prop::collection::vec(prop::char::ranges(vec!['a'..='z', 'A'..='Z'].into()), 0..=25) .prop_map(|items| AsVectorOperation::ReplaceLastGap { items }), + + 1 => prop::char::ranges(vec!['a'..='z', 'A'..='Z'].into()) + .prop_map(|item| AsVectorOperation::RemoveItem { item }), ] } proptest! { #[test] fn as_vector_is_correct( - operations in prop::collection::vec(as_vector_operation_strategy(), 10..=50) + operations in prop::collection::vec(as_vector_operation_strategy(), 50..=200) ) { let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history(); let mut as_vector = linked_chunk.as_vector().unwrap(); @@ -662,7 +763,17 @@ mod tests { continue; }; - linked_chunk.replace_gap_at(items, gap_identifier).unwrap(); + linked_chunk.replace_gap_at(items, gap_identifier).expect("Failed to replace a gap"); + } + + AsVectorOperation::RemoveItem { item: expected_item } => { + let Some(position) = linked_chunk + .items().find_map(|(position, item)| (*item == expected_item).then_some(position)) + else { + continue; + }; + + linked_chunk.remove_item_at(position).expect("Failed to remove an item"); } } } @@ -678,6 +789,9 @@ mod tests { vector_from_diffs.append(&mut values); } + VectorDiff::Remove { index } => { + vector_from_diffs.remove(index); + } _ => unreachable!(), } } diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs b/crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs index 644ceb09768..c0730c660f7 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs @@ -406,6 +406,79 @@ impl LinkedChunk { Ok(()) } + /// Remove item at a specified position in the [`LinkedChunk`]. + /// + /// Because the `position` can be invalid, this method returns a + /// `Result`. + pub fn remove_item_at(&mut self, position: Position) -> Result { + let chunk_identifier = position.chunk_identifier(); + let item_index = position.index(); + + let mut chunk_ptr = None; + let removed_item; + + { + let chunk = self + .links + .chunk_mut(chunk_identifier) + .ok_or(Error::InvalidChunkIdentifier { identifier: chunk_identifier })?; + + let can_unlink_chunk = match &mut chunk.content { + ChunkContent::Gap(..) => { + return Err(Error::ChunkIsAGap { identifier: chunk_identifier }) + } + + ChunkContent::Items(current_items) => { + let current_items_length = current_items.len(); + + if item_index > current_items_length { + return Err(Error::InvalidItemIndex { index: item_index }); + } + + removed_item = current_items.remove(item_index); + + if let Some(updates) = self.updates.as_mut() { + updates + .push(Update::RemoveItem { at: Position(chunk_identifier, item_index) }) + } + + current_items.is_empty() + } + }; + + // If the `chunk` can be unlinked, and if the `chunk` is not the first one, we + // can remove it. + if can_unlink_chunk && chunk.is_first_chunk().not() { + // Unlink `chunk`. + chunk.unlink(&mut self.updates); + + chunk_ptr = Some(chunk.as_ptr()); + + // We need to update `self.last` if and only if `chunk` _is_ the last chunk. The + // new last chunk is the chunk before `chunk`. + if chunk.is_last_chunk() { + self.links.last = chunk.previous; + } + } + + self.length -= 1; + + // Stop borrowing `chunk`. + } + + if let Some(chunk_ptr) = chunk_ptr { + // `chunk` has been unlinked. + + // Re-box the chunk, and let Rust does its job. + // + // SAFETY: `chunk` is unlinked and not borrowed anymore. `LinkedChunk` doesn't + // use it anymore, it's a leak. It is time to re-`Box` it and drop it. + let _chunk_boxed = unsafe { Box::from_raw(chunk_ptr.as_ptr()) }; + } + + Ok(removed_item) + } + /// Insert a gap at a specified position in the [`LinkedChunk`]. /// /// Because the `position` can be invalid, this method returns a @@ -852,6 +925,12 @@ impl ChunkIdentifierGenerator { #[repr(transparent)] pub struct ChunkIdentifier(u64); +impl PartialEq for ChunkIdentifier { + fn eq(&self, other: &u64) -> bool { + self.0 == *other + } +} + /// The position of something inside a [`Chunk`]. /// /// It's a pair of a chunk position and an item index. @@ -868,6 +947,18 @@ impl Position { pub fn index(&self) -> usize { self.1 } + + /// Move the index part (see [`Self::index`]) to the left, i.e. subtract 1. + /// + /// # Panic + /// + /// This method will panic if it will overflow, i.e. if the index is 0. + pub(super) fn move_index_to_the_left(&mut self) { + self.1 = self + .1 + .checked_sub(1) + .expect("Cannot move position's index to the left because it's already 0"); + } } /// An iterator over a [`LinkedChunk`] that traverses the chunk in backward @@ -1845,6 +1936,206 @@ mod tests { Ok(()) } + #[test] + fn test_remove_item_at() -> Result<(), Error> { + use super::Update::*; + + let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); + linked_chunk.push_items_back(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k']); + assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e', 'f'] ['g', 'h', 'i'] ['j', 'k']); + assert_eq!(linked_chunk.len(), 11); + + // Ignore previous updates. + let _ = linked_chunk.updates().unwrap().take(); + + // Remove the last item of the middle chunk, 3 times. The chunk is empty after + // that. The chunk is removed. + { + let position_of_f = linked_chunk.item_position(|item| *item == 'f').unwrap(); + let removed_item = linked_chunk.remove_item_at(position_of_f)?; + + assert_eq!(removed_item, 'f'); + assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e'] ['g', 'h', 'i'] ['j', 'k']); + assert_eq!(linked_chunk.len(), 10); + + let position_of_e = linked_chunk.item_position(|item| *item == 'e').unwrap(); + let removed_item = linked_chunk.remove_item_at(position_of_e)?; + + assert_eq!(removed_item, 'e'); + assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d'] ['g', 'h', 'i'] ['j', 'k']); + assert_eq!(linked_chunk.len(), 9); + + let position_of_d = linked_chunk.item_position(|item| *item == 'd').unwrap(); + let removed_item = linked_chunk.remove_item_at(position_of_d)?; + + assert_eq!(removed_item, 'd'); + assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['g', 'h', 'i'] ['j', 'k']); + assert_eq!(linked_chunk.len(), 8); + + assert_eq!( + linked_chunk.updates().unwrap().take(), + &[ + RemoveItem { at: Position(ChunkIdentifier(1), 2) }, + RemoveItem { at: Position(ChunkIdentifier(1), 1) }, + RemoveItem { at: Position(ChunkIdentifier(1), 0) }, + RemoveChunk(ChunkIdentifier(1)), + ] + ); + } + + // Remove the first item of the first chunk, 3 times. The chunk is empty after + // that. The chunk is NOT removed because it's the first chunk. + { + let first_position = linked_chunk.item_position(|item| *item == 'a').unwrap(); + let removed_item = linked_chunk.remove_item_at(first_position)?; + + assert_eq!(removed_item, 'a'); + assert_items_eq!(linked_chunk, ['b', 'c'] ['g', 'h', 'i'] ['j', 'k']); + assert_eq!(linked_chunk.len(), 7); + + let removed_item = linked_chunk.remove_item_at(first_position)?; + + assert_eq!(removed_item, 'b'); + assert_items_eq!(linked_chunk, ['c'] ['g', 'h', 'i'] ['j', 'k']); + assert_eq!(linked_chunk.len(), 6); + + let removed_item = linked_chunk.remove_item_at(first_position)?; + + assert_eq!(removed_item, 'c'); + assert_items_eq!(linked_chunk, [] ['g', 'h', 'i'] ['j', 'k']); + assert_eq!(linked_chunk.len(), 5); + + assert_eq!( + linked_chunk.updates().unwrap().take(), + &[ + RemoveItem { at: Position(ChunkIdentifier(0), 0) }, + RemoveItem { at: Position(ChunkIdentifier(0), 0) }, + RemoveItem { at: Position(ChunkIdentifier(0), 0) }, + ] + ); + } + + // Remove the first item of the middle chunk, 3 times. The chunk is empty after + // that. The chunk is removed. + { + let first_position = linked_chunk.item_position(|item| *item == 'g').unwrap(); + let removed_item = linked_chunk.remove_item_at(first_position)?; + + assert_eq!(removed_item, 'g'); + assert_items_eq!(linked_chunk, [] ['h', 'i'] ['j', 'k']); + assert_eq!(linked_chunk.len(), 4); + + let removed_item = linked_chunk.remove_item_at(first_position)?; + + assert_eq!(removed_item, 'h'); + assert_items_eq!(linked_chunk, [] ['i'] ['j', 'k']); + assert_eq!(linked_chunk.len(), 3); + + let removed_item = linked_chunk.remove_item_at(first_position)?; + + assert_eq!(removed_item, 'i'); + assert_items_eq!(linked_chunk, [] ['j', 'k']); + assert_eq!(linked_chunk.len(), 2); + + assert_eq!( + linked_chunk.updates().unwrap().take(), + &[ + RemoveItem { at: Position(ChunkIdentifier(2), 0) }, + RemoveItem { at: Position(ChunkIdentifier(2), 0) }, + RemoveItem { at: Position(ChunkIdentifier(2), 0) }, + RemoveChunk(ChunkIdentifier(2)), + ] + ); + } + + // Remove the last item of the last chunk, twice. The chunk is empty after that. + // The chunk is removed. + { + let position_of_k = linked_chunk.item_position(|item| *item == 'k').unwrap(); + let removed_item = linked_chunk.remove_item_at(position_of_k)?; + + assert_eq!(removed_item, 'k'); + #[rustfmt::skip] + assert_items_eq!(linked_chunk, [] ['j']); + assert_eq!(linked_chunk.len(), 1); + + let position_of_j = linked_chunk.item_position(|item| *item == 'j').unwrap(); + let removed_item = linked_chunk.remove_item_at(position_of_j)?; + + assert_eq!(removed_item, 'j'); + assert_items_eq!(linked_chunk, []); + assert_eq!(linked_chunk.len(), 0); + + assert_eq!( + linked_chunk.updates().unwrap().take(), + &[ + RemoveItem { at: Position(ChunkIdentifier(3), 1) }, + RemoveItem { at: Position(ChunkIdentifier(3), 0) }, + RemoveChunk(ChunkIdentifier(3)), + ] + ); + } + + // Add a couple more items, delete one, add a gap, and delete more items. + { + linked_chunk.push_items_back(['a', 'b', 'c', 'd']); + + #[rustfmt::skip] + assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d']); + assert_eq!(linked_chunk.len(), 4); + + let position_of_c = linked_chunk.item_position(|item| *item == 'c').unwrap(); + linked_chunk.insert_gap_at((), position_of_c)?; + + assert_items_eq!(linked_chunk, ['a', 'b'] [-] ['c'] ['d']); + assert_eq!(linked_chunk.len(), 4); + + // Ignore updates. + let _ = linked_chunk.updates().unwrap().take(); + + let position_of_c = linked_chunk.item_position(|item| *item == 'c').unwrap(); + let removed_item = linked_chunk.remove_item_at(position_of_c)?; + + assert_eq!(removed_item, 'c'); + assert_items_eq!(linked_chunk, ['a', 'b'] [-] ['d']); + assert_eq!(linked_chunk.len(), 3); + + let position_of_d = linked_chunk.item_position(|item| *item == 'd').unwrap(); + let removed_item = linked_chunk.remove_item_at(position_of_d)?; + + assert_eq!(removed_item, 'd'); + assert_items_eq!(linked_chunk, ['a', 'b'] [-]); + assert_eq!(linked_chunk.len(), 2); + + let first_position = linked_chunk.item_position(|item| *item == 'a').unwrap(); + let removed_item = linked_chunk.remove_item_at(first_position)?; + + assert_eq!(removed_item, 'a'); + assert_items_eq!(linked_chunk, ['b'] [-]); + assert_eq!(linked_chunk.len(), 1); + + let removed_item = linked_chunk.remove_item_at(first_position)?; + + assert_eq!(removed_item, 'b'); + assert_items_eq!(linked_chunk, [] [-]); + assert_eq!(linked_chunk.len(), 0); + + assert_eq!( + linked_chunk.updates().unwrap().take(), + &[ + RemoveItem { at: Position(ChunkIdentifier(6), 0) }, + RemoveChunk(ChunkIdentifier(6)), + RemoveItem { at: Position(ChunkIdentifier(4), 0) }, + RemoveChunk(ChunkIdentifier(4)), + RemoveItem { at: Position(ChunkIdentifier(0), 0) }, + RemoveItem { at: Position(ChunkIdentifier(0), 0) }, + ] + ); + } + + Ok(()) + } + #[test] fn test_insert_gap_at() -> Result<(), Error> { use super::Update::*; diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs b/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs index edf3f992127..d2591b7e9fc 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs @@ -76,6 +76,12 @@ pub enum Update { items: Vec, }, + /// An item has been removed inside a chunk of kind Items. + RemoveItem { + /// The [`Position`] of the item. + at: Position, + }, + /// The last items of a chunk have been detached, i.e. the chunk has been /// truncated. DetachLastItems { diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 1de623c4b86..672b55ea58f 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -77,6 +77,7 @@ use self::{ }; use crate::{client::WeakClient, room::WeakRoom, Client}; +mod deduplicator; mod linked_chunk; mod pagination; mod store; diff --git a/crates/matrix-sdk/src/event_cache/store.rs b/crates/matrix-sdk/src/event_cache/store.rs index 34565b14193..6361b400c25 100644 --- a/crates/matrix-sdk/src/event_cache/store.rs +++ b/crates/matrix-sdk/src/event_cache/store.rs @@ -12,11 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; +use std::{cmp::Ordering, fmt}; use matrix_sdk_common::deserialized_responses::SyncTimelineEvent; +use ruma::OwnedEventId; +use tracing::error; -use super::linked_chunk::{Chunk, ChunkIdentifier, Error, Iter, LinkedChunk, Position}; +use super::{ + deduplicator::{Decoration, Deduplicator}, + linked_chunk::{Chunk, ChunkIdentifier, Error, Iter, LinkedChunk, Position}, +}; + +/// An alias for the real event type. +pub type Event = SyncTimelineEvent; #[derive(Clone, Debug)] pub struct Gap { @@ -27,8 +35,13 @@ pub struct Gap { const DEFAULT_CHUNK_CAPACITY: usize = 128; +/// This type represents all events of a single room. pub struct RoomEvents { - chunks: LinkedChunk, + /// The real in-memory storage for all the events. + chunks: LinkedChunk, + + /// The events deduplicator instance to help finding duplicates. + deduplicator: Deduplicator, } impl Default for RoomEvents { @@ -38,8 +51,9 @@ impl Default for RoomEvents { } impl RoomEvents { + /// Build a new `Self` with zero events. pub fn new() -> Self { - Self { chunks: LinkedChunk::new() } + Self { chunks: LinkedChunk::new(), deduplicator: Deduplicator::new() } } /// Clear all events. @@ -52,10 +66,19 @@ impl RoomEvents { /// The last event in `events` is the most recent one. pub fn push_events(&mut self, events: I) where - I: IntoIterator, - I::IntoIter: ExactSizeIterator, + I: IntoIterator, { - self.chunks.push_items_back(events) + let (unique_events, duplicated_event_ids) = + self.filter_duplicated_events(events.into_iter()); + + // Remove the _old_ duplicated events! + // + // We don't have to worry the removals can change the position of the existing + // events, because we are pushing all _new_ `events` at the back. + self.remove_events(duplicated_event_ids); + + // Push new `events`. + self.chunks.push_items_back(unique_events); } /// Push a gap after all events or gaps. @@ -64,12 +87,21 @@ impl RoomEvents { } /// Insert events at a specified position. - pub fn insert_events_at(&mut self, events: I, position: Position) -> Result<(), Error> + pub fn insert_events_at(&mut self, events: I, mut position: Position) -> Result<(), Error> where - I: IntoIterator, - I::IntoIter: ExactSizeIterator, + I: IntoIterator, { - self.chunks.insert_items_at(events, position) + let (unique_events, duplicated_event_ids) = + self.filter_duplicated_events(events.into_iter()); + + // Remove the _old_ duplicated events! + // + // We **have to worry* the removals can change the position of the + // existing events. We **have** to update the `position` + // argument value for each removal. + self.remove_events_and_update_position(duplicated_event_ids, &mut position); + + self.chunks.insert_items_at(unique_events, position) } /// Insert a gap at a specified position. @@ -88,18 +120,28 @@ impl RoomEvents { &mut self, events: I, gap_identifier: ChunkIdentifier, - ) -> Result<&Chunk, Error> + ) -> Result<&Chunk, Error> where - I: IntoIterator, - I::IntoIter: ExactSizeIterator, + I: IntoIterator, { - self.chunks.replace_gap_at(events, gap_identifier) + let (unique_events, duplicated_event_ids) = + self.filter_duplicated_events(events.into_iter()); + + // Remove the _old_ duplicated events! + // + // We don't have to worry the removals can change the position of the existing + // events, because we are replacing a gap: its identifier will not change + // because of the removals. + self.remove_events(duplicated_event_ids); + + // Replace the gap by new events. + self.chunks.replace_gap_at(unique_events, gap_identifier) } /// Search for a chunk, and return its identifier. pub fn chunk_identifier<'a, P>(&'a self, predicate: P) -> Option where - P: FnMut(&'a Chunk) -> bool, + P: FnMut(&'a Chunk) -> bool, { self.chunks.chunk_identifier(predicate) } @@ -107,23 +149,143 @@ impl RoomEvents { /// Iterate over the chunks, forward. /// /// The oldest chunk comes first. - pub fn chunks(&self) -> Iter<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap> { + pub fn chunks(&self) -> Iter<'_, DEFAULT_CHUNK_CAPACITY, Event, Gap> { self.chunks.chunks() } /// Iterate over the events, backward. /// /// The most recent event comes first. - pub fn revents(&self) -> impl Iterator { + pub fn revents(&self) -> impl Iterator { self.chunks.ritems() } /// Iterate over the events, forward. /// /// The oldest event comes first. - pub fn events(&self) -> impl Iterator { + pub fn events(&self) -> impl Iterator { self.chunks.items() } + + /// Deduplicate `events` considering all events in `Self::chunks`. + /// + /// The returned tuple contains (i) the unique events, and (ii) the + /// duplicated events (by ID). + fn filter_duplicated_events<'a, I>(&'a mut self, events: I) -> (Vec, Vec) + where + I: Iterator + 'a, + { + let mut duplicated_event_ids = Vec::new(); + + let deduplicated_events = self + .deduplicator + .scan_and_learn(events, self) + .filter_map(|decorated_event| match decorated_event { + Decoration::Unique(event) => Some(event), + Decoration::Duplicated(event) => { + error!(event_id = ?event.event_id(), "Found a duplicated event"); + + duplicated_event_ids.push(event.event_id().expect("The event has no ID")); + + // Keep the new event! + Some(event) + } + Decoration::Invalid(event) => { + error!(?event, "Found an invalid event"); + + None + } + }) + .collect(); + + (deduplicated_events, duplicated_event_ids) + } +} + +// Private implementations, implementation specific. +impl RoomEvents { + /// Remove some events from `Self::chunks`. + /// + /// This method iterates over all event IDs in `event_ids` and removes the + /// associated event (if it exists) from `Self::chunks`. + /// + /// This is used to remove duplicated events, see + /// [`Self::filter_duplicated_events`]. + fn remove_events(&mut self, event_ids: Vec) { + for event_id in event_ids { + let Some(event_position) = self.revents().find_map(|(position, event)| { + (event.event_id().as_ref() == Some(&event_id)).then_some(position) + }) else { + error!(?event_id, "An event has been detected, but it's position seems unknown"); + + continue; + }; + + self.chunks + .remove_item_at(event_position) + .expect("Failed to remove an event we have just found"); + } + } + + /// Remove all events from `Self::chunks` and update a fix [`Position`]. + /// + /// This method iterates over all event IDs in `event_ids` and removes the + /// associated event (if it exists) from `Self::chunks`, exactly like + /// [`Self::remove_events`]. The difference is that it will update a + /// [`Position`] according to the removals. + /// + /// This is used to remove duplicated events, see + /// [`Self::filter_duplicated_events`]. + fn remove_events_and_update_position( + &mut self, + event_ids: Vec, + position: &mut Position, + ) { + for event_id in event_ids { + let Some(event_position) = self.revents().find_map(|(position, event)| { + (event.event_id().as_ref() == Some(&event_id)).then_some(position) + }) else { + error!(?event_id, "An event has been detected, but it's position seems unknown"); + + continue; + }; + + self.chunks + .remove_item_at(event_position) + .expect("Failed to remove an event we have just found"); + + // A `Position` is composed of a `ChunkIdentifier` and an index. + // The `ChunkIdentifier` is stable, i.e. it won't change if an + // event is removed in another chunk. It means we only need to + // update `position` if the removal happened in **the same + // chunk**. + if event_position.chunk_identifier() == position.chunk_identifier() { + // Now we can compare the the position indices. + match event_position.index().cmp(&position.index()) { + // `event_position`'s index < `position`'s index + Ordering::Less => { + // An event has been removed _before_ the new + // events: `position` needs to be shifted to the + // left by 1. + position.move_index_to_the_left(); + } + + // `event_position`'s index == `position`'s index + Ordering::Equal => { + // An event has been removed at the same position of + // the new events: `position` does _NOT_ need tp be + // modified. + } + + // `event_position`'s index > `position`'s index + Ordering::Greater => { + // An event has been removed _after_ the new events: + // `position` does _NOT_ need to be modified. + } + } + } + } + } } impl fmt::Debug for RoomEvents { @@ -131,3 +293,708 @@ impl fmt::Debug for RoomEvents { formatter.debug_struct("RoomEvents").field("chunk", &self.chunks).finish() } } + +#[cfg(test)] +mod tests { + use assert_matches2::assert_let; + use matrix_sdk_test::{EventBuilder, ALICE}; + use ruma::{events::room::message::RoomMessageEventContent, EventId, OwnedEventId}; + + use super::*; + + fn new_event(event_builder: &EventBuilder, event_id: &str) -> (OwnedEventId, Event) { + let event_id = EventId::parse(event_id).unwrap(); + + let event = SyncTimelineEvent::new(event_builder.make_sync_message_event_with_id( + *ALICE, + &event_id, + RoomMessageEventContent::text_plain("foo"), + )); + + (event_id, event) + } + + #[test] + fn test_new_room_events_has_zero_events() { + let room_events = RoomEvents::new(); + + assert_eq!(room_events.chunks.len(), 0); + } + + #[test] + fn test_push_events() { + let event_builder = EventBuilder::new(); + + let (event_id_0, event_0) = new_event(&event_builder, "$ev0"); + let (event_id_1, event_1) = new_event(&event_builder, "$ev1"); + let (event_id_2, event_2) = new_event(&event_builder, "$ev2"); + + let mut room_events = RoomEvents::new(); + + room_events.push_events([event_0, event_1]); + room_events.push_events([event_2]); + + { + let mut events = room_events.events(); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_0); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_1); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 2); + assert_eq!(event.event_id().unwrap(), event_id_2); + + assert!(events.next().is_none()); + } + } + + #[test] + fn test_push_events_with_duplicates() { + let event_builder = EventBuilder::new(); + + let (event_id_0, event_0) = new_event(&event_builder, "$ev0"); + let (event_id_1, event_1) = new_event(&event_builder, "$ev1"); + + let mut room_events = RoomEvents::new(); + + room_events.push_events([event_0.clone(), event_1]); + + { + let mut events = room_events.events(); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_0); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_1); + + assert!(events.next().is_none()); + } + + // Everything is alright. Now let's push a duplicated event. + room_events.push_events([event_0]); + + { + let mut events = room_events.events(); + + // The first `event_id_0` has been removed. + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_1); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_0); + + assert!(events.next().is_none()); + } + } + + #[test] + fn test_push_gap() { + let event_builder = EventBuilder::new(); + + let (event_id_0, event_0) = new_event(&event_builder, "$ev0"); + let (event_id_1, event_1) = new_event(&event_builder, "$ev1"); + + let mut room_events = RoomEvents::new(); + + room_events.push_events([event_0]); + room_events.push_gap(Gap { prev_token: "hello".to_owned() }); + room_events.push_events([event_1]); + + { + let mut events = room_events.events(); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_0); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 2); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_1); + + assert!(events.next().is_none()); + } + + { + let mut chunks = room_events.chunks(); + + assert_let!(Some(chunk) = chunks.next()); + assert!(chunk.is_items()); + + assert_let!(Some(chunk) = chunks.next()); + assert!(chunk.is_gap()); + + assert_let!(Some(chunk) = chunks.next()); + assert!(chunk.is_items()); + + assert!(chunks.next().is_none()); + } + } + + #[test] + fn test_insert_events_at() { + let event_builder = EventBuilder::new(); + + let (event_id_0, event_0) = new_event(&event_builder, "$ev0"); + let (event_id_1, event_1) = new_event(&event_builder, "$ev1"); + let (event_id_2, event_2) = new_event(&event_builder, "$ev2"); + + let mut room_events = RoomEvents::new(); + + room_events.push_events([event_0, event_1]); + + let position_of_event_1 = room_events + .events() + .find_map(|(position, event)| { + (event.event_id().unwrap() == event_id_1).then_some(position) + }) + .unwrap(); + + room_events.insert_events_at([event_2], position_of_event_1).unwrap(); + + { + let mut events = room_events.events(); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_0); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_2); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 2); + assert_eq!(event.event_id().unwrap(), event_id_1); + + assert!(events.next().is_none()); + } + } + + #[test] + fn test_insert_events_at_with_duplicates() { + let event_builder = EventBuilder::new(); + + let (event_id_0, event_0) = new_event(&event_builder, "$ev0"); + let (event_id_1, event_1) = new_event(&event_builder, "$ev1"); + let (event_id_2, event_2) = new_event(&event_builder, "$ev2"); + let (event_id_3, event_3) = new_event(&event_builder, "$ev3"); + + let mut room_events = RoomEvents::new(); + + room_events.push_events([event_0.clone(), event_1, event_2]); + + let position_of_event_2 = room_events + .events() + .find_map(|(position, event)| { + (event.event_id().unwrap() == event_id_2).then_some(position) + }) + .unwrap(); + + { + let mut events = room_events.events(); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_0); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_1); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 2); + assert_eq!(event.event_id().unwrap(), event_id_2); + + assert!(events.next().is_none()); + } + + // Everything is alright. Now let's insert a duplicated events! + room_events.insert_events_at([event_0, event_3], position_of_event_2).unwrap(); + + { + let mut events = room_events.events(); + + // The first `event_id_0` has been removed. + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_1); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_0); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 2); + assert_eq!(event.event_id().unwrap(), event_id_3); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 3); + assert_eq!(event.event_id().unwrap(), event_id_2); + + assert!(events.next().is_none()); + } + } + #[test] + fn test_insert_gap_at() { + let event_builder = EventBuilder::new(); + + let (event_id_0, event_0) = new_event(&event_builder, "$ev0"); + let (event_id_1, event_1) = new_event(&event_builder, "$ev1"); + + let mut room_events = RoomEvents::new(); + + room_events.push_events([event_0, event_1]); + + let position_of_event_1 = room_events + .events() + .find_map(|(position, event)| { + (event.event_id().unwrap() == event_id_1).then_some(position) + }) + .unwrap(); + + room_events + .insert_gap_at(Gap { prev_token: "hello".to_owned() }, position_of_event_1) + .unwrap(); + + { + let mut events = room_events.events(); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_0); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 2); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_1); + + assert!(events.next().is_none()); + } + + { + let mut chunks = room_events.chunks(); + + assert_let!(Some(chunk) = chunks.next()); + assert!(chunk.is_items()); + + assert_let!(Some(chunk) = chunks.next()); + assert!(chunk.is_gap()); + + assert_let!(Some(chunk) = chunks.next()); + assert!(chunk.is_items()); + + assert!(chunks.next().is_none()); + } + } + + #[test] + fn test_replace_gap_at() { + let event_builder = EventBuilder::new(); + + let (event_id_0, event_0) = new_event(&event_builder, "$ev0"); + let (event_id_1, event_1) = new_event(&event_builder, "$ev1"); + let (event_id_2, event_2) = new_event(&event_builder, "$ev2"); + + let mut room_events = RoomEvents::new(); + + room_events.push_events([event_0]); + room_events.push_gap(Gap { prev_token: "hello".to_owned() }); + + let chunk_identifier_of_gap = room_events + .chunks() + .find_map(|chunk| chunk.is_gap().then_some(chunk.first_position())) + .unwrap() + .chunk_identifier(); + + room_events.replace_gap_at([event_1, event_2], chunk_identifier_of_gap).unwrap(); + + { + let mut events = room_events.events(); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_0); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 2); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_1); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 2); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_2); + + assert!(events.next().is_none()); + } + + { + let mut chunks = room_events.chunks(); + + assert_let!(Some(chunk) = chunks.next()); + assert!(chunk.is_items()); + + assert_let!(Some(chunk) = chunks.next()); + assert!(chunk.is_items()); + + assert!(chunks.next().is_none()); + } + } + + #[test] + fn test_replace_gap_at_with_duplicates() { + let event_builder = EventBuilder::new(); + + let (event_id_0, event_0) = new_event(&event_builder, "$ev0"); + let (event_id_1, event_1) = new_event(&event_builder, "$ev1"); + let (event_id_2, event_2) = new_event(&event_builder, "$ev2"); + + let mut room_events = RoomEvents::new(); + + room_events.push_events([event_0.clone(), event_1]); + room_events.push_gap(Gap { prev_token: "hello".to_owned() }); + + let chunk_identifier_of_gap = room_events + .chunks() + .find_map(|chunk| chunk.is_gap().then_some(chunk.first_position())) + .unwrap() + .chunk_identifier(); + + { + let mut events = room_events.events(); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_0); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_1); + + assert!(events.next().is_none()); + } + + // Everything is alright. Now let's replace a gap with a duplicated event. + room_events.replace_gap_at([event_0, event_2], chunk_identifier_of_gap).unwrap(); + + { + let mut events = room_events.events(); + + // The first `event_id_0` has been removed. + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_1); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 2); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_0); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 2); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_2); + + assert!(events.next().is_none()); + } + + { + let mut chunks = room_events.chunks(); + + assert_let!(Some(chunk) = chunks.next()); + assert!(chunk.is_items()); + + assert_let!(Some(chunk) = chunks.next()); + assert!(chunk.is_items()); + + assert!(chunks.next().is_none()); + } + } + + #[test] + fn test_remove_events() { + let event_builder = EventBuilder::new(); + + let (event_id_0, event_0) = new_event(&event_builder, "$ev0"); + let (event_id_1, event_1) = new_event(&event_builder, "$ev1"); + let (event_id_2, event_2) = new_event(&event_builder, "$ev2"); + let (event_id_3, event_3) = new_event(&event_builder, "$ev3"); + + // Push some events. + let mut room_events = RoomEvents::new(); + room_events.push_events([event_0, event_1, event_2, event_3]); + + // Remove some events. + room_events.remove_events(vec![event_id_1, event_id_3]); + + let mut events = room_events.events(); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_0); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_2); + + assert!(events.next().is_none()); + } + + #[test] + fn test_remove_events_unknown_event() { + let event_builder = EventBuilder::new(); + + let (event_id_0, _event_0) = new_event(&event_builder, "$ev0"); + + // Push ZERO event. + let mut room_events = RoomEvents::new(); + + // Remove one undefined event. + // No error is expected. + room_events.remove_events(vec![event_id_0]); + + let mut events = room_events.events(); + assert!(events.next().is_none()); + } + + #[test] + fn test_remove_events_and_update_position() { + let event_builder = EventBuilder::new(); + + let (event_id_0, event_0) = new_event(&event_builder, "$ev0"); + let (event_id_1, event_1) = new_event(&event_builder, "$ev1"); + let (event_id_2, event_2) = new_event(&event_builder, "$ev2"); + let (event_id_3, event_3) = new_event(&event_builder, "$ev3"); + let (event_id_4, event_4) = new_event(&event_builder, "$ev4"); + let (event_id_5, event_5) = new_event(&event_builder, "$ev5"); + let (event_id_6, event_6) = new_event(&event_builder, "$ev6"); + let (event_id_7, event_7) = new_event(&event_builder, "$ev7"); + let (event_id_8, event_8) = new_event(&event_builder, "$ev8"); + + // Push some events. + let mut room_events = RoomEvents::new(); + room_events.push_events([event_0, event_1, event_2, event_3, event_4, event_5, event_6]); + room_events.push_gap(Gap { prev_token: "raclette".to_owned() }); + room_events.push_events([event_7, event_8]); + + fn position_of(room_events: &RoomEvents, event_id: &EventId) -> Position { + room_events + .events() + .find_map(|(position, event)| { + (event.event_id().unwrap() == event_id).then_some(position) + }) + .unwrap() + } + + // In the same chunk… + { + // Get the position of `event_4`. + let mut position = position_of(&room_events, &event_id_4); + + // Remove one event BEFORE `event_4`. + // + // The position must move to the left by 1. + { + let previous_position = position.clone(); + room_events.remove_events_and_update_position(vec![event_id_0], &mut position); + + assert_eq!(previous_position.chunk_identifier(), position.chunk_identifier()); + assert_eq!(previous_position.index() - 1, position.index()); + + // It still represents the position of `event_4`. + assert_eq!(position, position_of(&room_events, &event_id_4)); + } + + // Remove one event AFTER `event_4`. + // + // The position must not move. + { + let previous_position = position.clone(); + room_events.remove_events_and_update_position(vec![event_id_5], &mut position); + + assert_eq!(previous_position.chunk_identifier(), position.chunk_identifier()); + assert_eq!(previous_position.index(), position.index()); + + // It still represents the position of `event_4`. + assert_eq!(position, position_of(&room_events, &event_id_4)); + } + + // Remove one event: `event_4`. + // + // The position must not move. + { + let previous_position = position.clone(); + room_events.remove_events_and_update_position(vec![event_id_4], &mut position); + + assert_eq!(previous_position.chunk_identifier(), position.chunk_identifier()); + assert_eq!(previous_position.index(), position.index()); + } + + // Check the events. + { + let mut events = room_events.events(); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_1); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_2); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 2); + assert_eq!(event.event_id().unwrap(), event_id_3); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 3); + assert_eq!(event.event_id().unwrap(), event_id_6); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 2); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_7); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 2); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_8); + + assert!(events.next().is_none()); + } + } + + // In another chunk… + { + // Get the position of `event_7`. + let mut position = position_of(&room_events, &event_id_7); + + // Remove one event BEFORE `event_7`. + // + // The position must not move because it happens in another chunk. + { + let previous_position = position.clone(); + room_events.remove_events_and_update_position(vec![event_id_1], &mut position); + + assert_eq!(previous_position.chunk_identifier(), position.chunk_identifier()); + assert_eq!(previous_position.index(), position.index()); + + // It still represents the position of `event_7`. + assert_eq!(position, position_of(&room_events, &event_id_7)); + } + + // Check the events. + { + let mut events = room_events.events(); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_2); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_3); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 2); + assert_eq!(event.event_id().unwrap(), event_id_6); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 2); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_7); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 2); + assert_eq!(position.index(), 1); + assert_eq!(event.event_id().unwrap(), event_id_8); + + assert!(events.next().is_none()); + } + } + + // In the same chunk, but remove multiple events, just for the fun and to ensure + // the loop works correctly. + { + // Get the position of `event_6`. + let mut position = position_of(&room_events, &event_id_6); + + // Remove three events BEFORE `event_6`. + // + // The position must move. + { + let previous_position = position.clone(); + room_events.remove_events_and_update_position( + vec![event_id_2, event_id_3, event_id_7], + &mut position, + ); + + assert_eq!(previous_position.chunk_identifier(), position.chunk_identifier()); + assert_eq!(previous_position.index() - 2, position.index()); + + // It still represents the position of `event_6`. + assert_eq!(position, position_of(&room_events, &event_id_6)); + } + + // Check the events. + { + let mut events = room_events.events(); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 0); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_6); + + assert_let!(Some((position, event)) = events.next()); + assert_eq!(position.chunk_identifier(), 2); + assert_eq!(position.index(), 0); + assert_eq!(event.event_id().unwrap(), event_id_8); + + assert!(events.next().is_none()); + } + } + } +}