diff --git a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs index 59b047c7395..76904fc731f 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs @@ -133,6 +133,9 @@ pub trait EventCacheStoreIntegrationTests { /// anything. async fn test_rebuild_empty_linked_chunk(&self); + /// Test that loading a linked chunk's metadata works as intended. + async fn test_load_all_chunks_metadata(&self); + /// Test that clear all the rooms' linked chunks works. async fn test_clear_all_linked_chunks(&self); @@ -417,6 +420,72 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore { assert!(chunks.next().is_none()); } + async fn test_load_all_chunks_metadata(&self) { + let room_id = room_id!("!r0:matrix.org"); + let linked_chunk_id = LinkedChunkId::Room(room_id); + + self.handle_linked_chunk_updates( + linked_chunk_id, + vec![ + // new chunk + Update::NewItemsChunk { previous: None, new: CId::new(0), next: None }, + // new items on 0 + Update::PushItems { + at: Position::new(CId::new(0), 0), + items: vec![ + make_test_event(room_id, "hello"), + make_test_event(room_id, "world"), + ], + }, + // a gap chunk + Update::NewGapChunk { + previous: Some(CId::new(0)), + new: CId::new(1), + next: None, + gap: Gap { prev_token: "parmesan".to_owned() }, + }, + // another items chunk + Update::NewItemsChunk { previous: Some(CId::new(1)), new: CId::new(2), next: None }, + // new items on 2 + Update::PushItems { + at: Position::new(CId::new(2), 0), + items: vec![make_test_event(room_id, "sup")], + }, + // and an empty items chunk to finish + Update::NewItemsChunk { previous: Some(CId::new(2)), new: CId::new(3), next: None }, + ], + ) + .await + .unwrap(); + + let metas = self.load_all_chunks_metadata(linked_chunk_id).await.unwrap(); + assert_eq!(metas.len(), 4); + + // The first chunk has two items. + assert_eq!(metas[0].identifier, CId::new(0)); + assert_eq!(metas[0].previous, None); + assert_eq!(metas[0].next, Some(CId::new(1))); + assert_eq!(metas[0].num_items, 2); + + // The second chunk is a gap, so it has 0 items. + assert_eq!(metas[1].identifier, CId::new(1)); + assert_eq!(metas[1].previous, Some(CId::new(0))); + assert_eq!(metas[1].next, Some(CId::new(2))); + assert_eq!(metas[1].num_items, 0); + + // The third event chunk has one item. + assert_eq!(metas[2].identifier, CId::new(2)); + assert_eq!(metas[2].previous, Some(CId::new(1))); + assert_eq!(metas[2].next, Some(CId::new(3))); + assert_eq!(metas[2].num_items, 1); + + // The final event chunk is empty. + assert_eq!(metas[3].identifier, CId::new(3)); + assert_eq!(metas[3].previous, Some(CId::new(2))); + assert_eq!(metas[3].next, None); + assert_eq!(metas[3].num_items, 0); + } + async fn test_linked_chunk_incremental_loading(&self) { let room_id = room_id!("!r0:matrix.org"); let linked_chunk_id = LinkedChunkId::Room(room_id); @@ -1105,6 +1174,13 @@ macro_rules! event_cache_store_integration_tests { event_cache_store.test_rebuild_empty_linked_chunk().await; } + #[async_test] + async fn test_load_all_chunks_metadata() { + let event_cache_store = + get_event_cache_store().await.unwrap().into_event_cache_store(); + event_cache_store.test_load_all_chunks_metadata().await; + } + #[async_test] async fn test_clear_all_linked_chunks() { let event_cache_store = diff --git a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs index e252a9eae0a..dfec281b11c 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs @@ -22,7 +22,7 @@ use async_trait::async_trait; use matrix_sdk_common::{ linked_chunk::{ relational::RelationalLinkedChunk, ChunkIdentifier, ChunkIdentifierGenerator, - LinkedChunkId, Position, RawChunk, Update, + ChunkMetadata, LinkedChunkId, Position, RawChunk, Update, }, ring_buffer::RingBuffer, store_locks::memory_store_helper::try_take_leased_lock, @@ -148,6 +148,17 @@ impl EventCacheStore for MemoryStore { .map_err(|err| EventCacheStoreError::InvalidData { details: err }) } + async fn load_all_chunks_metadata( + &self, + linked_chunk_id: LinkedChunkId<'_>, + ) -> Result, Self::Error> { + let inner = self.inner.read().unwrap(); + inner + .events + .load_all_chunks_metadata(linked_chunk_id) + .map_err(|err| EventCacheStoreError::InvalidData { details: err }) + } + async fn load_last_chunk( &self, linked_chunk_id: LinkedChunkId<'_>, diff --git a/crates/matrix-sdk-base/src/event_cache/store/traits.rs b/crates/matrix-sdk-base/src/event_cache/store/traits.rs index e799eab4795..1a5f9f32150 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/traits.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/traits.rs @@ -17,7 +17,8 @@ use std::{fmt, sync::Arc}; use async_trait::async_trait; use matrix_sdk_common::{ linked_chunk::{ - ChunkIdentifier, ChunkIdentifierGenerator, LinkedChunkId, Position, RawChunk, Update, + ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position, + RawChunk, Update, }, AsyncTraitDeps, }; @@ -77,6 +78,15 @@ pub trait EventCacheStore: AsyncTraitDeps { linked_chunk_id: LinkedChunkId<'_>, ) -> Result>, Self::Error>; + /// Load all of the chunks' metadata for the given [`LinkedChunkId`]. + /// + /// Chunks are unordered, and there's no guarantee that the chunks would + /// form a valid linked chunk after reconstruction. + async fn load_all_chunks_metadata( + &self, + linked_chunk_id: LinkedChunkId<'_>, + ) -> Result, Self::Error>; + /// Load the last chunk of the `LinkedChunk` holding all events of the room /// identified by `room_id`. /// @@ -313,6 +323,13 @@ impl EventCacheStore for EraseEventCacheStoreError { self.0.load_all_chunks(linked_chunk_id).await.map_err(Into::into) } + async fn load_all_chunks_metadata( + &self, + linked_chunk_id: LinkedChunkId<'_>, + ) -> Result, Self::Error> { + self.0.load_all_chunks_metadata(linked_chunk_id).await.map_err(Into::into) + } + async fn load_last_chunk( &self, linked_chunk_id: LinkedChunkId<'_>, diff --git a/crates/matrix-sdk-common/src/linked_chunk/as_vector.rs b/crates/matrix-sdk-common/src/linked_chunk/as_vector.rs index d05ec323afe..4466168853b 100644 --- a/crates/matrix-sdk-common/src/linked_chunk/as_vector.rs +++ b/crates/matrix-sdk-common/src/linked_chunk/as_vector.rs @@ -15,7 +15,7 @@ use std::{ collections::VecDeque, iter::repeat_n, - ops::{ControlFlow, Not}, + ops::ControlFlow, sync::{Arc, RwLock}, }; @@ -25,6 +25,7 @@ use super::{ updates::{ReaderToken, Update, UpdatesInner}, ChunkContent, ChunkIdentifier, Iter, Position, }; +use crate::linked_chunk::ChunkMetadata; /// A type alias to represent a chunk's length. This is purely for commodity. type ChunkLength = usize; @@ -43,7 +44,7 @@ pub struct AsVector { token: ReaderToken, /// Mapper from `Update` to `VectorDiff`. - mapper: UpdateToVectorDiff, + mapper: UpdateToVectorDiff>>, } impl AsVector { @@ -83,20 +84,38 @@ impl AsVector { } } +/// Interface for a type accumulating updates from [`UpdateToVectorDiff::map`], +/// and being returned as a result of this. +pub(super) trait UpdatesAccumulator: Extend> { + /// Create a new accumulator with a rough estimation of the number of + /// updates this accumulator is going to receive. + fn new(num_updates_hint: usize) -> Self; +} + +// Simple implementation for a `Vec>` collection for +// `AsVector`. +impl UpdatesAccumulator for Vec> { + fn new(num_updates_hint: usize) -> Vec> { + Vec::with_capacity(num_updates_hint) + } +} + /// Internal type that converts [`Update`] into [`VectorDiff`]. #[derive(Debug)] -struct UpdateToVectorDiff { +pub(super) struct UpdateToVectorDiff> { /// Pairs of all known chunks and their respective length. This is the only /// required data for this algorithm. - chunks: VecDeque<(ChunkIdentifier, ChunkLength)>, + pub chunks: VecDeque<(ChunkIdentifier, ChunkLength)>, + + _phantom: std::marker::PhantomData<(Item, Acc)>, } -impl UpdateToVectorDiff { +impl> UpdateToVectorDiff { /// Construct [`UpdateToVectorDiff`], based on an iterator of /// [`Chunk`](super::Chunk)s, used to set up its own internal state. /// /// See [`Self::map`] to learn more about the algorithm. - fn new(chunk_iterator: Iter<'_, CAP, Item, Gap>) -> Self { + pub fn new(chunk_iterator: Iter<'_, CAP, Item, Gap>) -> Self { let mut initial_chunk_lengths = VecDeque::new(); for chunk in chunk_iterator { @@ -109,7 +128,18 @@ impl UpdateToVectorDiff { )) } - Self { chunks: initial_chunk_lengths } + Self { chunks: initial_chunk_lengths, _phantom: std::marker::PhantomData } + } + + /// Construct [`UpdateToVectorDiff`], based on a linked chunk's full + /// metadata, used to set up its own internal state. + /// + /// See [`Self::map`] to learn more about the algorithm. + pub fn from_metadata(metas: Vec) -> Self { + let initial_chunk_lengths = + metas.into_iter().map(|meta| (meta.identifier, meta.num_items)).collect(); + + Self { chunks: initial_chunk_lengths, _phantom: std::marker::PhantomData } } /// Map several [`Update`] into [`VectorDiff`]. @@ -172,13 +202,18 @@ impl UpdateToVectorDiff { /// [`LinkedChunk`]: super::LinkedChunk /// [`ChunkContent::Gap`]: super::ChunkContent::Gap /// [`ChunkContent::Content`]: super::ChunkContent::Content - fn map(&mut self, updates: &[Update]) -> Vec> + pub fn map(&mut self, updates: &[Update]) -> Acc where Item: Clone, { - let mut diffs = Vec::with_capacity(updates.len()); + let mut acc = Acc::new(updates.len()); - // A flag specifying when updates are reattaching detached items. + // Flags specifying when updates are reattaching detached items. + // + // TL;DR: This is an optimization to avoid that insertions in the middle of a + // chunk cause a large series of `VectorDiff::Remove` and + // `VectorDiff::Insert` updates for the elements placed after the + // inserted item. // // Why is it useful? // @@ -329,7 +364,7 @@ impl UpdateToVectorDiff { .expect("Removing an index out of the bounds"); // Removing at the same index because each `Remove` shifts items to the left. - diffs.extend(repeat_n(VectorDiff::Remove { index: offset }, number_of_items)); + acc.extend(repeat_n(VectorDiff::Remove { index: offset }, number_of_items)); } Update::PushItems { at: position, items } => { @@ -348,12 +383,12 @@ impl UpdateToVectorDiff { } // Optimisation: we can emit a `VectorDiff::Append` in this particular case. - if is_pushing_back && detaching.not() { - diffs.push(VectorDiff::Append { values: items.into() }); + if is_pushing_back && !detaching { + acc.extend([VectorDiff::Append { values: items.into() }]); } // No optimisation: let's emit `VectorDiff::Insert`. else { - diffs.extend(items.iter().enumerate().map(|(nth, item)| { + acc.extend(items.iter().enumerate().map(|(nth, item)| { VectorDiff::Insert { index: offset + nth, value: item.clone() } })); } @@ -364,7 +399,7 @@ impl UpdateToVectorDiff { // The chunk length doesn't change. - diffs.push(VectorDiff::Set { index: offset, value: item.clone() }); + acc.extend([VectorDiff::Set { index: offset, value: item.clone() }]); } Update::RemoveItem { at: position } => { @@ -379,7 +414,7 @@ impl UpdateToVectorDiff { } // Let's emit a `VectorDiff::Remove`. - diffs.push(VectorDiff::Remove { index: offset }); + acc.extend([VectorDiff::Remove { index: offset }]); } Update::DetachLastItems { at: position } => { @@ -422,12 +457,12 @@ impl UpdateToVectorDiff { self.chunks.clear(); // Let's straightforwardly emit a `VectorDiff::Clear`. - diffs.push(VectorDiff::Clear); + acc.extend([VectorDiff::Clear]); } } } - diffs + acc } fn map_to_offset(&mut self, position: &Position) -> (usize, (usize, &mut usize)) { diff --git a/crates/matrix-sdk-common/src/linked_chunk/mod.rs b/crates/matrix-sdk-common/src/linked_chunk/mod.rs index e5e7dfcb991..795a1612739 100644 --- a/crates/matrix-sdk-common/src/linked_chunk/mod.rs +++ b/crates/matrix-sdk-common/src/linked_chunk/mod.rs @@ -93,6 +93,7 @@ macro_rules! assert_items_eq { mod as_vector; pub mod lazy_loader; +mod order_tracker; pub mod relational; mod updates; @@ -104,6 +105,7 @@ use std::{ }; pub use as_vector::*; +pub use order_tracker::OrderTracker; use ruma::{OwnedRoomId, RoomId}; pub use updates::*; @@ -1085,6 +1087,42 @@ impl LinkedChunk { Some(AsVector::new(updates, token, chunk_iterator)) } + /// Get an [`OrderTracker`] for the linked chunk, which can be used to + /// compare the relative position of two events in this linked chunk. + /// + /// A pre-requisite is that the linked chunk has been constructed with + /// [`Self::new_with_update_history`], and that if the linked chunk is + /// lazily-loaded, an iterator over the fully-loaded linked chunk is + /// passed at construction time here. + pub fn order_tracker( + &mut self, + all_chunks: Option>, + ) -> Option> + where + Item: Clone, + { + let (updates, token) = self + .updates + .as_mut() + .map(|updates| (updates.inner.clone(), updates.new_reader_token()))?; + + Some(OrderTracker::new( + updates, + token, + all_chunks.unwrap_or_else(|| { + // Consider the linked chunk as fully loaded. + self.chunks() + .map(|chunk| ChunkMetadata { + identifier: chunk.identifier(), + num_items: chunk.num_items(), + previous: chunk.previous().map(|prev| prev.identifier()), + next: chunk.next().map(|next| next.identifier()), + }) + .collect() + }), + )) + } + /// Returns the number of items of the linked chunk. pub fn num_items(&self) -> usize { self.items().count() @@ -1721,6 +1759,25 @@ pub struct RawChunk { pub next: Option, } +/// A simplified [`RawChunk`] that only contains the number of items in a chunk, +/// instead of its type. +#[derive(Clone, Debug)] +pub struct ChunkMetadata { + /// The number of items in this chunk. + /// + /// By convention, a gap chunk contains 0 items. + pub num_items: usize, + + /// Link to the previous chunk, via its identifier. + pub previous: Option, + + /// Current chunk's identifier. + pub identifier: ChunkIdentifier, + + /// Link to the next chunk, via its identifier. + pub next: Option, +} + #[cfg(test)] mod tests { use std::{ diff --git a/crates/matrix-sdk-common/src/linked_chunk/order_tracker.rs b/crates/matrix-sdk-common/src/linked_chunk/order_tracker.rs new file mode 100644 index 00000000000..7d0d6668ac8 --- /dev/null +++ b/crates/matrix-sdk-common/src/linked_chunk/order_tracker.rs @@ -0,0 +1,569 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, RwLock}; + +use eyeball_im::VectorDiff; + +use super::{ + updates::{ReaderToken, Update, UpdatesInner}, + Position, +}; +use crate::linked_chunk::{ChunkMetadata, UpdateToVectorDiff}; + +/// A tracker for the order of items in a linked chunk. +/// +/// This can be used to determine the absolute ordering of an item, and thus the +/// relative ordering of two items in a linked chunk, in an +/// efficient manner, thanks to [`OrderTracker::ordering`]. Internally, it +/// keeps track of the relative ordering of the chunks themselves; given a +/// [`Position`] in a linked chunk, the item ordering is the lexicographic +/// ordering of the chunk in the linked chunk, and the internal position within +/// the chunk. For the sake of ease, we return the absolute vector index of the +/// item in the linked chunk. +/// +/// It requires the full links' metadata to be provided at creation time, so +/// that it can also give an order for an item that's not loaded yet, in the +/// context of lazy-loading. +#[derive(Debug)] +pub struct OrderTracker { + /// Strong reference to [`UpdatesInner`]. + updates: Arc>>, + + /// The token to read the updates. + token: ReaderToken, + + /// Mapper from `Update` to `VectorDiff`. + mapper: UpdateToVectorDiff>, +} + +struct NullAccumulator { + _phantom: std::marker::PhantomData, +} + +#[cfg(not(tarpaulin_include))] +impl std::fmt::Debug for NullAccumulator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("NullAccumulator") + } +} + +impl super::UpdatesAccumulator for NullAccumulator { + fn new(_num_updates_hint: usize) -> Self { + Self { _phantom: std::marker::PhantomData } + } +} + +impl Extend> for NullAccumulator { + fn extend>>(&mut self, _iter: T) { + // This is a no-op, as we don't want to accumulate anything. + } +} + +impl OrderTracker +where + Item: Clone, +{ + /// Create a new [`OrderTracker`]. + /// + /// The `all_chunks_metadata` parameter must include the metadata for *all* + /// chunks (the full collection, even if the linked chunk is + /// lazy-loaded). + /// + /// They must be ordered by their links in the linked chunk, i.e. the first + /// chunk in the vector is the first chunk in the linked chunk, the + /// second in the vector is the first's next chunk, and so on. If that + /// precondition doesn't hold, then the ordering of items will be undefined. + pub(super) fn new( + updates: Arc>>, + token: ReaderToken, + all_chunks_metadata: Vec, + ) -> Self { + // Drain previous updates so that this type is synced with `Updates`. + { + let mut updates = updates.write().unwrap(); + let _ = updates.take_with_token(token); + } + + Self { updates, token, mapper: UpdateToVectorDiff::from_metadata(all_chunks_metadata) } + } + + /// Force flushing of the updates manually. + /// + /// If `inhibit` is `true` (which is useful in the case of lazy-loading + /// related updates, which shouldn't affect the canonical, persisted + /// linked chunk), the updates are ignored; otherwise, they are consumed + /// normally. + pub fn flush_updates(&mut self, inhibit: bool) { + if inhibit { + // Ignore the updates. + let _ = self.updates.write().unwrap().take_with_token(self.token); + } else { + // Consume the updates. + let mut updater = self.updates.write().unwrap(); + let updates = updater.take_with_token(self.token); + let _ = self.mapper.map(updates); + } + } + + /// Apply some out-of-band updates to the ordering tracker. + /// + /// This must only be used when the updates do not affect the observed + /// linked chunk, but would affect the fully-loaded collection. + pub fn map_updates(&mut self, updates: &[Update]) { + let _ = self.mapper.map(updates); + } + + /// Given an event's position, returns its final ordering in the current + /// state of the linked chunk as a vector. + /// + /// Useful to compare the ordering of multiple events. + /// + /// Precondition: the reader must be up to date, i.e. + /// [`Self::flush_updates`] must have been called before this method. + /// + /// Will return `None` if the position doesn't match a known chunk in the + /// linked chunk, or if the chunk is a gap. + pub fn ordering(&self, event_pos: Position) -> Option { + // Check the precondition: there must not be any pending updates for this + // reader. + debug_assert!(self.updates.read().unwrap().is_reader_up_to_date(self.token)); + + // Find the chunk that contained the event. + let mut ordering = 0; + for (chunk_id, chunk_length) in &self.mapper.chunks { + if *chunk_id == event_pos.chunk_identifier() { + let offset_within_chunk = event_pos.index(); + if offset_within_chunk >= *chunk_length { + // The event is out of bounds for this chunk, return None. + return None; + } + // The final ordering is the number of items before the event, plus its own + // index within the chunk. + return Some(ordering + offset_within_chunk); + } + // This is not the target chunk yet, so add the size of the current chunk to the + // number of seen items, and continue. + ordering += *chunk_length; + } + + None + } +} + +#[cfg(test)] +mod tests { + use assert_matches::assert_matches; + use matrix_sdk_test_macros::async_test; + + use crate::linked_chunk::{ + lazy_loader::from_last_chunk, ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, + ChunkMetadata, LinkedChunk, OrderTracker, Position, RawChunk, Update, + }; + + #[async_test] + async fn test_linked_chunk_without_update_history_no_tracking() { + let mut linked_chunk = LinkedChunk::<10, char, ()>::new(); + assert_matches!(linked_chunk.order_tracker(None), None); + } + + /// Given a fully-loaded linked chunk, checks that the ordering of an item + /// is effectively the same as its index in an iteration of items. + fn assert_order_fully_loaded( + linked_chunk: &LinkedChunk<3, char, ()>, + tracker: &OrderTracker, + ) { + assert_order(linked_chunk, tracker, 0); + } + + /// Given a linked chunk with an offset representing the number of items not + /// loaded yet, checks that the ordering of an item is effectively the + /// same as its index+offset in an iteration of items. + fn assert_order( + linked_chunk: &LinkedChunk<3, char, ()>, + tracker: &OrderTracker, + offset: usize, + ) { + for (i, (item_pos, _value)) in linked_chunk.items().enumerate() { + assert_eq!(tracker.ordering(item_pos), Some(i + offset)); + } + } + + #[async_test] + async fn test_non_lazy_updates() { + // Assume the linked chunk is fully loaded, so we have all the chunks at + // our disposal. + let mut linked_chunk = LinkedChunk::<3, _, _>::new_with_update_history(); + + let mut tracker = linked_chunk.order_tracker(None).unwrap(); + + // Let's apply some updates to the live linked chunk. + + // Pushing new items. + { + linked_chunk.push_items_back(['a', 'b', 'c']); + tracker.flush_updates(false); + assert_order_fully_loaded(&linked_chunk, &tracker); + } + + // Pushing a gap. + { + linked_chunk.push_gap_back(()); + tracker.flush_updates(false); + assert_order_fully_loaded(&linked_chunk, &tracker); + } + + // Inserting items in the middle. + { + let b_pos = linked_chunk.item_position(|c| *c == 'b').unwrap(); + linked_chunk.insert_items_at(['d', 'e'], b_pos).unwrap(); + tracker.flush_updates(false); + assert_order_fully_loaded(&linked_chunk, &tracker); + } + + // Inserting a gap in the middle. + { + let c_pos = linked_chunk.item_position(|c| *c == 'c').unwrap(); + linked_chunk.insert_gap_at((), c_pos).unwrap(); + tracker.flush_updates(false); + assert_order_fully_loaded(&linked_chunk, &tracker); + } + + // Replacing a gap with items. + { + let last_gap = + linked_chunk.rchunks().filter(|c| c.is_gap()).last().unwrap().identifier(); + linked_chunk.replace_gap_at(['f', 'g'], last_gap).unwrap(); + tracker.flush_updates(false); + assert_order_fully_loaded(&linked_chunk, &tracker); + } + + // Removing an item. + { + let a_pos = linked_chunk.item_position(|c| *c == 'd').unwrap(); + linked_chunk.remove_item_at(a_pos).unwrap(); + tracker.flush_updates(false); + assert_order_fully_loaded(&linked_chunk, &tracker); + } + + // Replacing an item. + { + let b_pos = linked_chunk.item_position(|c| *c == 'e').unwrap(); + linked_chunk.replace_item_at(b_pos, 'E').unwrap(); + tracker.flush_updates(false); + assert_order_fully_loaded(&linked_chunk, &tracker); + } + + // Clearing all items. + { + linked_chunk.clear(); + tracker.flush_updates(false); + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(0), 0)), None); + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(3), 0)), None); + } + } + + #[async_test] + async fn test_lazy_loading() { + // Assume that all the chunks haven't been loaded yet, so we have a few of them + // in some memory, and some of them are still in an hypothetical + // database. + let db_metadata = vec![ + // Hypothetical non-empty items chunk with items 'a', 'b', 'c'. + ChunkMetadata { + previous: None, + identifier: ChunkIdentifier(0), + next: Some(ChunkIdentifier(1)), + num_items: 3, + }, + // Hypothetical gap chunk. + ChunkMetadata { + previous: Some(ChunkIdentifier(0)), + identifier: ChunkIdentifier(1), + next: Some(ChunkIdentifier(2)), + num_items: 0, + }, + // Hypothetical non-empty items chunk with items 'd', 'e', 'f'. + ChunkMetadata { + previous: Some(ChunkIdentifier(1)), + identifier: ChunkIdentifier(2), + next: Some(ChunkIdentifier(3)), + num_items: 3, + }, + // Hypothetical non-empty items chunk with items 'g'. + ChunkMetadata { + previous: Some(ChunkIdentifier(2)), + identifier: ChunkIdentifier(3), + next: None, + num_items: 1, + }, + ]; + + // The in-memory linked chunk contains the latest chunk only. + let mut linked_chunk = from_last_chunk::<3, _, ()>( + Some(RawChunk { + content: ChunkContent::Items(vec!['g']), + previous: Some(ChunkIdentifier(2)), + identifier: ChunkIdentifier(3), + next: None, + }), + ChunkIdentifierGenerator::new_from_previous_chunk_identifier(ChunkIdentifier(3)), + ) + .expect("could recreate the linked chunk") + .expect("the linked chunk isn't empty"); + + let tracker = linked_chunk.order_tracker(Some(db_metadata)).unwrap(); + + // At first, even if the main linked chunk is empty, the order tracker can + // compute the position for unloaded items. + + // Order of 'a': + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(0), 0)), Some(0)); + // Order of 'b': + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(0), 1)), Some(1)); + // Order of 'c': + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(0), 2)), Some(2)); + // An invalid position in a known chunk returns no ordering. + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(0), 42)), None); + + // A gap chunk doesn't have an ordering. + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(1), 0)), None); + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(1), 42)), None); + + // Order of 'd': + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(2), 0)), Some(3)); + // Order of 'e': + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(2), 1)), Some(4)); + // Order of 'f': + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(2), 2)), Some(5)); + // No subsequent entry in the same chunk, it's been split when inserting g. + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(2), 3)), None); + + // Order of 'g': + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(3), 0)), Some(6)); + // This was the final entry so far. + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(3), 1)), None); + } + + #[async_test] + async fn test_lazy_updates() { + // Assume that all the chunks haven't been loaded yet, so we have a few of them + // in some memory, and some of them are still in an hypothetical + // database. + let db_metadata = vec![ + // Hypothetical non-empty items chunk with items 'a', 'b'. + ChunkMetadata { + previous: None, + identifier: ChunkIdentifier(0), + next: Some(ChunkIdentifier(1)), + num_items: 2, + }, + // Hypothetical gap chunk. + ChunkMetadata { + previous: Some(ChunkIdentifier(0)), + identifier: ChunkIdentifier(1), + next: Some(ChunkIdentifier(2)), + num_items: 0, + }, + // Hypothetical non-empty items chunk with items 'd', 'e', 'f'. + ChunkMetadata { + previous: Some(ChunkIdentifier(1)), + identifier: ChunkIdentifier(2), + next: Some(ChunkIdentifier(3)), + num_items: 3, + }, + // Hypothetical non-empty items chunk with items 'g'. + ChunkMetadata { + previous: Some(ChunkIdentifier(2)), + identifier: ChunkIdentifier(3), + next: None, + num_items: 1, + }, + ]; + + // The in-memory linked chunk contains the latest chunk only. + let mut linked_chunk = from_last_chunk( + Some(RawChunk { + content: ChunkContent::Items(vec!['g']), + previous: Some(ChunkIdentifier(2)), + identifier: ChunkIdentifier(3), + next: None, + }), + ChunkIdentifierGenerator::new_from_previous_chunk_identifier(ChunkIdentifier(3)), + ) + .expect("could recreate the linked chunk") + .expect("the linked chunk isn't empty"); + + let mut tracker = linked_chunk.order_tracker(Some(db_metadata)).unwrap(); + + // Sanity checks on the initial state. + { + // Order of 'b': + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(0), 1)), Some(1)); + // Order of 'g': + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(3), 0)), Some(5)); + } + + // Let's apply some updates to the live linked chunk. + + // Pushing new items. + { + linked_chunk.push_items_back(['h', 'i']); + tracker.flush_updates(false); + + // Order of items not loaded: + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(0), 1)), Some(1)); + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(3), 0)), Some(5)); + // The loaded items are off by 5 (the absolute order of g). + assert_order(&linked_chunk, &tracker, 5); + } + + // Pushing a gap. + let gap_id = { + linked_chunk.push_gap_back(()); + tracker.flush_updates(false); + + // The gap doesn't have an ordering. + let last_chunk = linked_chunk.rchunks().next().unwrap(); + assert!(last_chunk.is_gap()); + assert_eq!(tracker.ordering(Position::new(last_chunk.identifier(), 0)), None); + assert_eq!(tracker.ordering(Position::new(last_chunk.identifier(), 42)), None); + + // The previous items are still ordered. + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(0), 1)), Some(1)); + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(3), 0)), Some(5)); + // The loaded items are off by 5 (the absolute order of g). + assert_order(&linked_chunk, &tracker, 5); + + last_chunk.identifier() + }; + + // Inserting items in the middle. + { + let h_pos = linked_chunk.item_position(|c| *c == 'h').unwrap(); + linked_chunk.insert_items_at(['j', 'k'], h_pos).unwrap(); + tracker.flush_updates(false); + + // The previous items are still ordered. + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(0), 1)), Some(1)); + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(3), 0)), Some(5)); + // The loaded items are off by 5 (the absolute order of g). + assert_order(&linked_chunk, &tracker, 5); + } + + // Replacing a gap with items. + { + linked_chunk.replace_gap_at(['l', 'm'], gap_id).unwrap(); + tracker.flush_updates(false); + + // The previous items are still ordered. + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(0), 1)), Some(1)); + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(3), 0)), Some(5)); + // The loaded items are off by 5 (the absolute order of g). + assert_order(&linked_chunk, &tracker, 5); + } + + // Removing an item. + { + let j_pos = linked_chunk.item_position(|c| *c == 'j').unwrap(); + linked_chunk.remove_item_at(j_pos).unwrap(); + tracker.flush_updates(false); + + // The previous items are still ordered. + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(0), 1)), Some(1)); + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(3), 0)), Some(5)); + // The loaded items are off by 5 (the absolute order of g). + assert_order(&linked_chunk, &tracker, 5); + } + + // Replacing an item. + { + let k_pos = linked_chunk.item_position(|c| *c == 'k').unwrap(); + linked_chunk.replace_item_at(k_pos, 'K').unwrap(); + tracker.flush_updates(false); + + // The previous items are still ordered. + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(0), 1)), Some(1)); + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(3), 0)), Some(5)); + // The loaded items are off by 5 (the absolute order of g). + assert_order(&linked_chunk, &tracker, 5); + } + + // Clearing all items. + { + linked_chunk.clear(); + tracker.flush_updates(false); + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(0), 0)), None); + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(3), 0)), None); + } + } + + #[async_test] + async fn test_out_of_band_updates() { + // Assume that all the chunks haven't been loaded yet, so we have a few of them + // in some memory, and some of them are still in an hypothetical + // database. + let db_metadata = vec![ + // Hypothetical non-empty items chunk with items 'a', 'b'. + ChunkMetadata { + previous: None, + identifier: ChunkIdentifier(0), + next: Some(ChunkIdentifier(1)), + num_items: 2, + }, + // Hypothetical gap chunk. + ChunkMetadata { + previous: Some(ChunkIdentifier(0)), + identifier: ChunkIdentifier(1), + next: Some(ChunkIdentifier(2)), + num_items: 0, + }, + // Hypothetical non-empty items chunk with items 'd', 'e', 'f'. + ChunkMetadata { + previous: Some(ChunkIdentifier(1)), + identifier: ChunkIdentifier(2), + next: Some(ChunkIdentifier(3)), + num_items: 3, + }, + // Hypothetical non-empty items chunk with items 'g'. + ChunkMetadata { + previous: Some(ChunkIdentifier(2)), + identifier: ChunkIdentifier(3), + next: None, + num_items: 1, + }, + ]; + + let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); + + let mut tracker = linked_chunk.order_tracker(Some(db_metadata)).unwrap(); + + // Sanity checks. + // Order of 'b': + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(0), 1)), Some(1)); + // Order of 'e': + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(2), 1)), Some(3)); + + // It's possible to apply updates out of band, i.e. without affecting the + // observed linked chunk. This can be useful when an update only applies + // to a database, but not to the in-memory linked chunk. + tracker.map_updates(&[Update::RemoveChunk(ChunkIdentifier::new(0))]); + + // 'b' doesn't exist anymore, so its ordering is now undefined. + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(0), 1)), None); + // 'e' has been shifted back by 2 places, aka the number of items in the first + // chunk. + assert_eq!(tracker.ordering(Position::new(ChunkIdentifier::new(2), 1)), Some(1)); + } +} diff --git a/crates/matrix-sdk-common/src/linked_chunk/relational.rs b/crates/matrix-sdk-common/src/linked_chunk/relational.rs index ae9b67b7bd9..7b1ecec4ce1 100644 --- a/crates/matrix-sdk-common/src/linked_chunk/relational.rs +++ b/crates/matrix-sdk-common/src/linked_chunk/relational.rs @@ -22,7 +22,9 @@ use ruma::{OwnedEventId, OwnedRoomId}; use super::{ChunkContent, ChunkIdentifierGenerator, RawChunk}; use crate::{ deserialized_responses::TimelineEvent, - linked_chunk::{ChunkIdentifier, LinkedChunkId, OwnedLinkedChunkId, Position, Update}, + linked_chunk::{ + ChunkIdentifier, ChunkMetadata, LinkedChunkId, OwnedLinkedChunkId, Position, Update, + }, }; /// A row of the [`RelationalLinkedChunk::chunks`]. @@ -427,6 +429,22 @@ where .collect::, String>>() } + /// Loads all the chunks' metadata. + /// + /// Return an error result if the data was malformed in the struct, with a + /// string message explaining details about the error. + #[doc(hidden)] + pub fn load_all_chunks_metadata( + &self, + linked_chunk_id: LinkedChunkId<'_>, + ) -> Result, String> { + self.chunks + .iter() + .filter(|chunk| chunk.linked_chunk_id == linked_chunk_id) + .map(|chunk_row| load_raw_chunk_metadata(self, chunk_row, linked_chunk_id)) + .collect::, String>>() + } + pub fn load_last_chunk( &self, linked_chunk_id: LinkedChunkId<'_>, @@ -518,6 +536,10 @@ where } } +/// Loads a single chunk along all its items. +/// +/// The code of this method must be kept in sync with that of +/// [`load_raw_chunk_metadata`] below. fn load_raw_chunk( relational_linked_chunk: &RelationalLinkedChunk, chunk_row: &ChunkRow, @@ -612,6 +634,89 @@ where }) } +/// Loads the metadata for a single chunk. +/// +/// The code of this method must be kept in sync with that of [`load_raw_chunk`] +/// above. +fn load_raw_chunk_metadata( + relational_linked_chunk: &RelationalLinkedChunk, + chunk_row: &ChunkRow, + linked_chunk_id: LinkedChunkId<'_>, +) -> Result +where + Item: Clone, + Gap: Clone, + ItemId: Hash + PartialEq + Eq, +{ + // Find all items that correspond to the chunk. + let mut items = relational_linked_chunk + .items_chunks + .iter() + .filter(|item_row| { + item_row.linked_chunk_id == linked_chunk_id + && item_row.position.chunk_identifier() == chunk_row.chunk + }) + .peekable(); + + let Some(first_item) = items.peek() else { + // No item. It means it is a chunk of kind `Items` and that it is empty! + return Ok(ChunkMetadata { + num_items: 0, + previous: chunk_row.previous_chunk, + identifier: chunk_row.chunk, + next: chunk_row.next_chunk, + }); + }; + + Ok(match first_item.item { + // This is a chunk of kind `Items`. + Either::Item(_) => { + // Count all the items. We add an additional filter that will exclude gaps, in + // case the chunk is malformed, but we should not have to, in theory. + + let mut num_items = 0; + for item in items { + match &item.item { + Either::Item(_) => num_items += 1, + Either::Gap(_) => { + return Err(format!( + "unexpected gap in items chunk {}", + chunk_row.chunk.index() + )); + } + } + } + + ChunkMetadata { + num_items, + previous: chunk_row.previous_chunk, + identifier: chunk_row.chunk, + next: chunk_row.next_chunk, + } + } + + Either::Gap(..) => { + assert!(items.next().is_some(), "we just peeked the gap"); + + // We shouldn't have more than one item row for this chunk. + if items.next().is_some() { + return Err(format!( + "there shouldn't be more than one item row attached in gap chunk {}", + chunk_row.chunk.index() + )); + } + + ChunkMetadata { + // By convention, a gap has 0 items. + num_items: 0, + previous: chunk_row.previous_chunk, + identifier: chunk_row.chunk, + next: chunk_row.next_chunk, + } + } + }) +} + #[cfg(test)] mod tests { use assert_matches::assert_matches; diff --git a/crates/matrix-sdk-common/src/linked_chunk/updates.rs b/crates/matrix-sdk-common/src/linked_chunk/updates.rs index c2eabc335e7..2ff7c72a0bf 100644 --- a/crates/matrix-sdk-common/src/linked_chunk/updates.rs +++ b/crates/matrix-sdk-common/src/linked_chunk/updates.rs @@ -294,6 +294,12 @@ impl UpdatesInner { slice } + /// Has the given reader, identified by its [`ReaderToken`], some pending + /// updates, or has it consumed all the pending updates? + pub(super) fn is_reader_up_to_date(&self, token: ReaderToken) -> bool { + *self.last_index_per_reader.get(&token).expect("unknown reader token") == self.updates.len() + } + /// Return the number of updates in the buffer. #[cfg(test)] fn len(&self) -> usize { diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index dc80055a917..f3f0f62980e 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -32,8 +32,8 @@ use matrix_sdk_base::{ Event, Gap, }, linked_chunk::{ - ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, LinkedChunkId, Position, RawChunk, - Update, + ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, + Position, RawChunk, Update, }, media::{MediaRequestParameters, UniqueKey}, }; @@ -836,6 +836,66 @@ impl EventCacheStore for SqliteEventCacheStore { Ok(result) } + async fn load_all_chunks_metadata( + &self, + linked_chunk_id: LinkedChunkId<'_>, + ) -> Result, Self::Error> { + let hashed_linked_chunk_id = + self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()); + + self.acquire() + .await? + .with_transaction(move |txn| -> Result<_> { + // I'm not a DB analyst, so for my own future sanity: this query joins the + // linked_chunks and events_chunks tables together, with a few specificities: + // + // - the `GROUP BY` clause will regroup the joined item lines by chunk. + // - the `COUNT(ec.event_id)` counts the number of unique non-NULL lines from + // the events_chunks table, aka the number of events in the chunk. + // - using a `LEFT JOIN` makes it so that if there's a chunk that has no events + // (because it's a gap, or an empty events chunk), there will still be a + // result for that chunk, and the count will be `0` (because the joined lines + // would be `NULL`). + // + // Overall, this query will return what we want: + // - for a gap or an empty item chunk: a count of 0, + // - otherwise, the number of related lines in `event_chunks` for that chunk, + // i.e. the number of events in that chunk. + // + // Also, use `ORDER BY id` to get a deterministic ordering for testing purposes. + + txn.prepare( + r#" + SELECT lc.id, lc.previous, lc.next, COUNT(ec.event_id) + FROM linked_chunks as lc + LEFT JOIN event_chunks as ec ON ec.chunk_id = lc.id + WHERE lc.linked_chunk_id = ? + GROUP BY lc.id + ORDER BY lc.id"#, + )? + .query_map((&hashed_linked_chunk_id,), |row| { + Ok(( + row.get::<_, u64>(0)?, + row.get::<_, Option>(1)?, + row.get::<_, Option>(2)?, + row.get::<_, usize>(3)?, + )) + })? + .map(|data| -> Result<_> { + let (id, previous, next, num_items) = data?; + + Ok(ChunkMetadata { + identifier: ChunkIdentifier::new(id), + previous: previous.map(ChunkIdentifier::new), + next: next.map(ChunkIdentifier::new), + num_items, + }) + }) + .collect::, _>>() + }) + .await + } + async fn load_last_chunk( &self, linked_chunk_id: LinkedChunkId<'_>, diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 823fff8fb78..f45ee063867 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -104,6 +104,14 @@ pub enum EventCacheError { /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk #[error(transparent)] LinkedChunkLoader(#[from] LazyLoaderError), + + /// An error happened when reading the metadata of a linked chunk, upon + /// reload. + #[error("the linked chunk metadata is invalid: {details}")] + InvalidLinkedChunkMetadata { + /// A string containing details about the error. + details: String, + }, } /// A result using the [`EventCacheError`]. diff --git a/crates/matrix-sdk/src/event_cache/room/events.rs b/crates/matrix-sdk/src/event_cache/room/events.rs index 556417f92fd..34a56a688aa 100644 --- a/crates/matrix-sdk/src/event_cache/room/events.rs +++ b/crates/matrix-sdk/src/event_cache/room/events.rs @@ -19,7 +19,7 @@ use matrix_sdk_base::{ event_cache::store::DEFAULT_CHUNK_CAPACITY, linked_chunk::{ lazy_loader::{self, LazyLoaderError}, - ChunkContent, ChunkIdentifierGenerator, RawChunk, + ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, OrderTracker, RawChunk, }, }; use matrix_sdk_common::linked_chunk::{ @@ -37,6 +37,9 @@ pub struct RoomEvents { /// /// [`Update`]: matrix_sdk_base::linked_chunk::Update chunks_updates_as_vectordiffs: AsVector, + + /// Tracker of the events ordering in this room. + pub order_tracker: OrderTracker, } impl Default for RoomEvents { @@ -48,7 +51,7 @@ impl Default for RoomEvents { impl RoomEvents { /// Build a new [`RoomEvents`] struct with zero events. pub fn new() -> Self { - Self::with_initial_linked_chunk(None) + Self::with_initial_linked_chunk(None, None) } /// Build a new [`RoomEvents`] struct with prior chunks knowledge. @@ -56,16 +59,19 @@ impl RoomEvents { /// The provided [`LinkedChunk`] must have been built with update history. pub fn with_initial_linked_chunk( linked_chunk: Option>, + full_linked_chunk_metadata: Option>, ) -> Self { let mut linked_chunk = linked_chunk.unwrap_or_else(LinkedChunk::new_with_update_history); let chunks_updates_as_vectordiffs = linked_chunk .as_vector() - // SAFETY: The `LinkedChunk` has been built with `new_with_update_history`, so - // `as_vector` must return `Some(…)`. .expect("`LinkedChunk` must have been built with `new_with_update_history`"); - Self { chunks: linked_chunk, chunks_updates_as_vectordiffs } + let order_tracker = linked_chunk + .order_tracker(full_linked_chunk_metadata) + .expect("`LinkedChunk` must have been built with `new_with_update_history`"); + + Self { chunks: linked_chunk, chunks_updates_as_vectordiffs, order_tracker } } /// Clear all events. @@ -76,18 +82,6 @@ impl RoomEvents { self.chunks.clear(); } - /// Replace the events with the given last chunk of events and generator. - /// - /// This clears all the chunks in memory before resetting to the new chunk, - /// if provided. - pub(super) fn replace_with( - &mut self, - last_chunk: Option>, - chunk_identifier_generator: ChunkIdentifierGenerator, - ) -> Result<(), LazyLoaderError> { - lazy_loader::replace_with(&mut self.chunks, last_chunk, chunk_identifier_generator) - } - /// Push events after all events or gaps. /// /// The last event in `events` is the most recent one. @@ -231,6 +225,35 @@ impl RoomEvents { self.chunks.items() } + /// Return the order of an event in the room linked chunk. + /// + /// Can return `None` if the event can't be found in the linked chunk. + pub fn event_order(&self, event_pos: Position) -> Option { + self.order_tracker.ordering(event_pos) + } + + #[cfg(any(test, debug_assertions))] + fn assert_event_ordering(&self) { + let mut iter = self.chunks.items().enumerate(); + let Some((i, (first_event_pos, _))) = iter.next() else { + return; + }; + + // Sanity check. + assert_eq!(i, 0); + + // That's the offset in the full linked chunk. Will be 0 if the linked chunk is + // entirely loaded, may be non-zero otherwise. + let offset = + self.event_order(first_event_pos).expect("first event's ordering must be known"); + + for (i, (next_pos, _)) in iter { + let next_index = + self.event_order(next_pos).expect("next event's ordering must be known"); + assert_eq!(offset + i, next_index, "event ordering must be continuous"); + } + } + /// Get all updates from the room events as [`VectorDiff`]. /// /// Be careful that each `VectorDiff` is returned only once! @@ -239,7 +262,17 @@ impl RoomEvents { /// /// [`Update`]: matrix_sdk_base::linked_chunk::Update pub fn updates_as_vector_diffs(&mut self) -> Vec> { - self.chunks_updates_as_vectordiffs.take() + let updates = self.chunks_updates_as_vectordiffs.take(); + + self.order_tracker.flush_updates(false); + + if cfg!(any(test, debug_assertions)) { + // Assert that the orderings are fully correct for all the events present in the + // in-memory linked chunk. + self.assert_event_ordering(); + } + + updates } /// Get a mutable reference to the [`LinkedChunk`] updates, aka @@ -258,8 +291,9 @@ impl RoomEvents { pub fn debug_string(&self) -> Vec { let mut result = Vec::new(); - for chunk in self.chunks() { - let content = chunk_debug_string(chunk.content()); + for chunk in self.chunks.chunks() { + let content = + chunk_debug_string(chunk.identifier(), chunk.content(), &self.order_tracker); let lazy_previous = if let Some(cid) = chunk.lazy_previous() { format!(" (lazy previous = {})", cid.index()) } else { @@ -283,18 +317,67 @@ impl RoomEvents { } } -// Private implementations, implementation specific. +// Methods related to lazy-loading. impl RoomEvents { + /// Inhibits all the linked chunk updates caused by the function `f` on the + /// ordering tracker. + /// + /// Updates to the linked chunk that happen because of lazy loading must not + /// be taken into account by the order tracker, otherwise the + /// fully-loaded state (tracked by the order tracker) wouldn't match + /// reality anymore. This provides a facility to help applying such + /// updates. + fn inhibit_updates_to_ordering_tracker R, R>(&mut self, f: F) -> R { + // Start by flushing previous pending updates to the chunk ordering, if any. + self.order_tracker.flush_updates(false); + + // Call the function. + let r = f(self); + + // Now, flush other pending updates which have been caused by the function, and + // ignore them. + self.order_tracker.flush_updates(true); + + r + } + + /// Replace the events with the given last chunk of events and generator. + /// + /// Happens only during lazy loading. + /// + /// This clears all the chunks in memory before resetting to the new chunk, + /// if provided. + pub(super) fn replace_with( + &mut self, + last_chunk: Option>, + chunk_identifier_generator: ChunkIdentifierGenerator, + ) -> Result<(), LazyLoaderError> { + // Since `replace_with` is used only to unload some chunks, we don't want it to + // affect the chunk ordering. + self.inhibit_updates_to_ordering_tracker(move |this| { + lazy_loader::replace_with(&mut this.chunks, last_chunk, chunk_identifier_generator) + }) + } + + /// Prepends a lazily-loaded chunk at the beginning of the linked chunk. pub(super) fn insert_new_chunk_as_first( &mut self, raw_new_first_chunk: RawChunk, ) -> Result<(), LazyLoaderError> { - lazy_loader::insert_new_first_chunk(&mut self.chunks, raw_new_first_chunk) + // This is only used when reinserting a chunk that was in persisted storage, so + // we don't need to touch the chunk ordering for this. + self.inhibit_updates_to_ordering_tracker(move |this| { + lazy_loader::insert_new_first_chunk(&mut this.chunks, raw_new_first_chunk) + }) } } /// Create a debug string for a [`ChunkContent`] for an event/gap pair. -fn chunk_debug_string(content: &ChunkContent) -> String { +fn chunk_debug_string( + chunk_id: ChunkIdentifier, + content: &ChunkContent, + order_tracker: &OrderTracker, +) -> String { match content { ChunkContent::Gap(Gap { prev_token }) => { format!("gap['{prev_token}']") @@ -302,11 +385,19 @@ fn chunk_debug_string(content: &ChunkContent) -> String { ChunkContent::Items(vec) => { let items = vec .iter() - .map(|event| { - // Limit event ids to 8 chars *after* the $. + .enumerate() + .map(|(i, event)| { event.event_id().map_or_else( || "".to_owned(), - |id| id.as_str().chars().take(1 + 8).collect(), + |id| { + let pos = Position::new(chunk_id, i); + let order = format!("#{}: ", order_tracker.ordering(pos).unwrap()); + + // Limit event ids to 8 chars *after* the $. + let event_id = id.as_str().chars().take(1 + 8).collect::(); + + format!("{order}{event_id}") + }, ) }) .collect::>() @@ -719,10 +810,13 @@ mod tests { ]); room_events.push_gap(Gap { prev_token: "raclette".to_owned() }); + // Flush updates to the order tracker. + let _ = room_events.updates_as_vector_diffs(); + let output = room_events.debug_string(); assert_eq!(output.len(), 2); - assert_eq!(&output[0], "chunk #0: events[$12345678, $2]"); + assert_eq!(&output[0], "chunk #0: events[#0: $12345678, #1: $2]"); assert_eq!(&output[1], "chunk #1: gap['raclette']"); } diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 6b427fd4fd6..3490f78d22c 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -477,7 +477,7 @@ pub(super) enum LoadMoreEventsBackwardsOutcome { // Use a private module to hide `events` to this parent module. mod private { use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, sync::{atomic::AtomicUsize, Arc}, }; @@ -486,9 +486,13 @@ mod private { use matrix_sdk_base::{ apply_redaction, deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind}, - event_cache::{store::EventCacheStoreLock, Event, Gap}, + event_cache::{ + store::{DynEventCacheStore, EventCacheStoreLock}, + Event, Gap, + }, linked_chunk::{ - lazy_loader, ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, LinkedChunkId, + lazy_loader::{self}, + ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position, Update, }, serde_helpers::extract_thread_root, @@ -563,30 +567,41 @@ mod private { let store_lock = store.lock().await?; let linked_chunk_id = LinkedChunkId::Room(&room_id); - let linked_chunk = match store_lock + + // Load the full linked chunk's metadata, so as to feed the order tracker. + // + // If loading the full linked chunk failed, we'll clear the event cache, as it + // indicates that at some point, there's some malformed data. + let full_linked_chunk_metadata = + match Self::load_linked_chunk_metadata(&*store_lock, linked_chunk_id).await { + Ok(metas) => Some(metas), + Err(err) => { + error!( + "error when loading a linked chunk's metadata from the store: {err}" + ); + + // Clear storage for this room. + store_lock + .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]) + .await?; + + // Restart with an empty linked chunk. + None + } + }; + + let linked_chunk = store_lock .load_last_chunk(linked_chunk_id) .await .map_err(EventCacheError::from) .and_then(|(last_chunk, chunk_identifier_generator)| { lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator) .map_err(EventCacheError::from) - }) { - Ok(linked_chunk) => linked_chunk, - - Err(err) => { - error!("error when reloading a linked chunk from memory: {err}"); - - // Clear storage for this room. - store_lock - .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]) - .await?; - - // Restart with an empty linked chunk. - None - } - }; + }) + .expect("fully loading the linked chunk just worked, so loading it partially should also work"); - let events = RoomEvents::with_initial_linked_chunk(linked_chunk); + let events = + RoomEvents::with_initial_linked_chunk(linked_chunk, full_linked_chunk_metadata); Ok(Self { room: room_id, @@ -599,6 +614,126 @@ mod private { }) } + /// Load a linked chunk's full metadata, making sure the chunks are + /// according to their their links. + /// + /// Returns `None` if there's no such linked chunk in the store, or an + /// error if the linked chunk is malformed. + async fn load_linked_chunk_metadata( + store: &DynEventCacheStore, + linked_chunk_id: LinkedChunkId<'_>, + ) -> Result, EventCacheError> { + let mut all_chunks = store + .load_all_chunks_metadata(linked_chunk_id) + .await + .map_err(EventCacheError::from)?; + + // Transform the vector into a hashmap, for quick lookup of the predecessors. + let chunk_map: HashMap<_, _> = + all_chunks.iter().map(|meta| (meta.identifier, meta)).collect(); + + // Find a last chunk. + let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none()); + let Some(last) = iter.next() else { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: "no last chunk found".to_owned(), + }); + }; + + // There must at most one last chunk. + if let Some(other_last) = iter.next() { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: format!( + "chunks {} and {} both claim to be last chunks", + last.identifier.index(), + other_last.identifier.index() + ), + }); + } + + // Rewind the chain back to the first chunk, and do some checks at the same + // time. + let mut seen = HashSet::new(); + let mut current = last; + loop { + // If we've already seen this chunk, there's a cycle somewhere. + if !seen.insert(current.identifier) { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: format!( + "cycle detected in linked chunk at {}", + current.identifier.index() + ), + }); + } + + let Some(prev_id) = current.previous else { + // If there's no previous chunk, we're done. + if seen.len() != all_chunks.len() { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: format!( + "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected", + seen.len(), + all_chunks.len() + ), + }); + } + break; + }; + + // If the previous chunk is not in the map, then it's unknown + // and missing. + let Some(pred_meta) = chunk_map.get(&prev_id) else { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: format!( + "missing predecessor {} chunk for {}", + prev_id.index(), + current.identifier.index() + ), + }); + }; + + // If the previous chunk isn't connected to the next, then the link is invalid. + if pred_meta.next != Some(current.identifier) { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: format!( + "chunk {}'s next ({:?}) doesn't match the current chunk ({})", + pred_meta.identifier.index(), + pred_meta.next.map(|chunk_id| chunk_id.index()), + current.identifier.index() + ), + }); + } + + current = *pred_meta; + } + + // At this point, `current` is the identifier of the first chunk. + // + // Reorder the resulting vector, by going through the chain of `next` links, and + // swapping items into their final position. + // + // Invariant in this loop: all items in [0..i[ are in their final, correct + // position. + let mut current = current.identifier; + for i in 0..all_chunks.len() { + // Find the target metadata. + let j = all_chunks + .iter() + .rev() + .position(|meta| meta.identifier == current) + .map(|j| all_chunks.len() - 1 - j) + .expect("the target chunk must be present in the metadata"); + if i != j { + all_chunks.swap(i, j); + } + if let Some(next) = all_chunks[i].next { + current = next; + } + } + + Ok(all_chunks) + } + /// Deduplicate `events` considering all events in `Self::events`. /// /// The returned tuple contains: @@ -833,6 +968,11 @@ mod private { Ok(self.events.updates_as_vector_diffs()) } + #[cfg(test)] + pub(crate) fn room_event_order(&self, event_pos: Position) -> Option { + self.events.event_order(event_pos) + } + /// Removes the bundled relations from an event, if they were present. /// /// Only replaces the present if it contained bundled relations. @@ -897,13 +1037,12 @@ mod private { sort_positions_descending(&mut positions); - self.send_updates_to_store( - positions - .into_iter() - .map(|position| Update::RemoveItem { at: position }) - .collect(), - ) - .await?; + let updates = positions + .into_iter() + .map(|pos| Update::RemoveItem { at: pos }) + .collect::>(); + + self.apply_store_only_updates(updates).await?; } // In-memory events. @@ -928,6 +1067,20 @@ mod private { self.send_updates_to_store(updates).await } + /// Apply some updates that are effective only on the store itself. + /// + /// This method should be used only for updates that happen *outside* + /// the in-memory linked chunk. Such updates must be applied + /// onto the ordering tracker as well as to the persistent + /// storage. + async fn apply_store_only_updates( + &mut self, + updates: Vec>, + ) -> Result<(), EventCacheError> { + self.events.order_tracker.map_updates(&updates); + self.send_updates_to_store(updates).await + } + async fn send_updates_to_store( &mut self, mut updates: Vec>, @@ -2593,6 +2746,141 @@ mod timed_tests { assert!(outcome.reached_start); } + #[async_test] + async fn test_room_ordering() { + let room_id = room_id!("!galette:saucisse.bzh"); + + let client = MockClientBuilder::new("http://localhost".to_owned()).build().await; + + let f = EventFactory::new().room(room_id).sender(*ALICE); + + let evid1 = event_id!("$1"); + let evid2 = event_id!("$2"); + let evid3 = event_id!("$3"); + + let ev1 = f.text_msg("hello world").event_id(evid1).into_event(); + let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event(); + let ev3 = f.text_msg("yo").event_id(evid3).into_event(); + + // Fill the event cache store with an initial linked chunk with 2 events chunks. + { + let store = client.event_cache_store(); + let store = store.lock().await.unwrap(); + store + .handle_linked_chunk_updates( + LinkedChunkId::Room(room_id), + vec![ + Update::NewItemsChunk { + previous: None, + new: ChunkIdentifier::new(0), + next: None, + }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(0), 0), + items: vec![ev1, ev2], + }, + Update::NewItemsChunk { + previous: Some(ChunkIdentifier::new(0)), + new: ChunkIdentifier::new(1), + next: None, + }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(1), 0), + items: vec![ev3.clone()], + }, + ], + ) + .await + .unwrap(); + } + + let event_cache = client.event_cache(); + event_cache.subscribe().unwrap(); + + client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); + let room = client.get_room(room_id).unwrap(); + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + // Initially, the linked chunk only contains the last chunk, so only ev3 is + // loaded. + { + let state = room_event_cache.inner.state.read().await; + + // But we can get the order of ev1. + assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0)); + + // And that of ev2 as well. + assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1)); + + // ev3, which is loaded, also has a known ordering. + let mut events = state.events().events(); + let (pos, ev) = events.next().unwrap(); + assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0)); + assert_eq!(ev.event_id().as_deref(), Some(evid3)); + assert_eq!(state.room_event_order(pos), Some(2)); + + // No other loaded events. + assert!(events.next().is_none()); + } + + // Force loading the full linked chunk by back-paginating. + let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap(); + assert!(outcome.reached_start); + + // All events are now loaded, so their order is precisely their enumerated index + // in a linear iteration. + { + let state = room_event_cache.inner.state.read().await; + for (i, (pos, _)) in state.events().events().enumerate() { + assert_eq!(state.room_event_order(pos), Some(i)); + } + } + + // Handle a gappy sync with two events (including one duplicate, so + // deduplication kicks in), so that the linked chunk is shrunk to the + // last chunk, and that the linked chunk only contains the last two + // events. + let evid4 = event_id!("$4"); + room_event_cache + .inner + .handle_joined_room_update(JoinedRoomUpdate { + timeline: Timeline { + limited: true, + prev_batch: Some("fondue".to_owned()), + events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()], + }, + ..Default::default() + }) + .await + .unwrap(); + + { + let state = room_event_cache.inner.state.read().await; + + // After the shrink, only evid3 and evid4 are loaded. + let mut events = state.events().events(); + + let (pos, ev) = events.next().unwrap(); + assert_eq!(ev.event_id().as_deref(), Some(evid3)); + assert_eq!(state.room_event_order(pos), Some(2)); + + let (pos, ev) = events.next().unwrap(); + assert_eq!(ev.event_id().as_deref(), Some(evid4)); + assert_eq!(state.room_event_order(pos), Some(3)); + + // No other loaded events. + assert!(events.next().is_none()); + + // But we can still get the order of previous events. + assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0)); + assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1)); + + // ev3 doesn't have an order with its previous position, since it's been + // deduplicated. + assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(1), 0)), None); + } + } + #[async_test] async fn test_auto_shrink_after_all_subscribers_are_gone() { let room_id = room_id!("!galette:saucisse.bzh");