Skip to content

feat(event cache): introduce an absolute local event ordering #5225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ pub trait EventCacheStoreIntegrationTests {
/// anything.
async fn test_rebuild_empty_linked_chunk(&self);

/// Test that loading a linked chunk's metadata works as intended.
async fn test_load_all_chunks_metadata(&self);

/// Test that clear all the rooms' linked chunks works.
async fn test_clear_all_linked_chunks(&self);

Expand Down Expand Up @@ -417,6 +420,72 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
assert!(chunks.next().is_none());
}

async fn test_load_all_chunks_metadata(&self) {
let room_id = room_id!("!r0:matrix.org");
let linked_chunk_id = LinkedChunkId::Room(room_id);

self.handle_linked_chunk_updates(
linked_chunk_id,
vec![
// new chunk
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
// new items on 0
Update::PushItems {
at: Position::new(CId::new(0), 0),
items: vec![
make_test_event(room_id, "hello"),
make_test_event(room_id, "world"),
],
},
// a gap chunk
Update::NewGapChunk {
previous: Some(CId::new(0)),
new: CId::new(1),
next: None,
gap: Gap { prev_token: "parmesan".to_owned() },
},
// another items chunk
Update::NewItemsChunk { previous: Some(CId::new(1)), new: CId::new(2), next: None },
// new items on 2
Update::PushItems {
at: Position::new(CId::new(2), 0),
items: vec![make_test_event(room_id, "sup")],
},
// and an empty items chunk to finish
Update::NewItemsChunk { previous: Some(CId::new(2)), new: CId::new(3), next: None },
],
)
.await
.unwrap();

let metas = self.load_all_chunks_metadata(linked_chunk_id).await.unwrap();
assert_eq!(metas.len(), 4);

// The first chunk has two items.
assert_eq!(metas[0].identifier, CId::new(0));
assert_eq!(metas[0].previous, None);
assert_eq!(metas[0].next, Some(CId::new(1)));
assert_eq!(metas[0].num_items, 2);

// The second chunk is a gap, so it has 0 items.
assert_eq!(metas[1].identifier, CId::new(1));
assert_eq!(metas[1].previous, Some(CId::new(0)));
assert_eq!(metas[1].next, Some(CId::new(2)));
assert_eq!(metas[1].num_items, 0);

// The third event chunk has one item.
assert_eq!(metas[2].identifier, CId::new(2));
assert_eq!(metas[2].previous, Some(CId::new(1)));
assert_eq!(metas[2].next, Some(CId::new(3)));
assert_eq!(metas[2].num_items, 1);

// The final event chunk is empty.
assert_eq!(metas[3].identifier, CId::new(3));
assert_eq!(metas[3].previous, Some(CId::new(2)));
assert_eq!(metas[3].next, None);
assert_eq!(metas[3].num_items, 0);
}

async fn test_linked_chunk_incremental_loading(&self) {
let room_id = room_id!("!r0:matrix.org");
let linked_chunk_id = LinkedChunkId::Room(room_id);
Expand Down Expand Up @@ -1105,6 +1174,13 @@ macro_rules! event_cache_store_integration_tests {
event_cache_store.test_rebuild_empty_linked_chunk().await;
}

#[async_test]
async fn test_load_all_chunks_metadata() {
let event_cache_store =
get_event_cache_store().await.unwrap().into_event_cache_store();
event_cache_store.test_load_all_chunks_metadata().await;
}

#[async_test]
async fn test_clear_all_linked_chunks() {
let event_cache_store =
Expand Down
13 changes: 12 additions & 1 deletion crates/matrix-sdk-base/src/event_cache/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use async_trait::async_trait;
use matrix_sdk_common::{
linked_chunk::{
relational::RelationalLinkedChunk, ChunkIdentifier, ChunkIdentifierGenerator,
LinkedChunkId, Position, RawChunk, Update,
ChunkMetadata, LinkedChunkId, Position, RawChunk, Update,
},
ring_buffer::RingBuffer,
store_locks::memory_store_helper::try_take_leased_lock,
Expand Down Expand Up @@ -148,6 +148,17 @@ impl EventCacheStore for MemoryStore {
.map_err(|err| EventCacheStoreError::InvalidData { details: err })
}

async fn load_all_chunks_metadata(
&self,
linked_chunk_id: LinkedChunkId<'_>,
) -> Result<Vec<ChunkMetadata>, Self::Error> {
let inner = self.inner.read().unwrap();
inner
.events
.load_all_chunks_metadata(linked_chunk_id)
.map_err(|err| EventCacheStoreError::InvalidData { details: err })
}

async fn load_last_chunk(
&self,
linked_chunk_id: LinkedChunkId<'_>,
Expand Down
19 changes: 18 additions & 1 deletion crates/matrix-sdk-base/src/event_cache/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use std::{fmt, sync::Arc};
use async_trait::async_trait;
use matrix_sdk_common::{
linked_chunk::{
ChunkIdentifier, ChunkIdentifierGenerator, LinkedChunkId, Position, RawChunk, Update,
ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
RawChunk, Update,
},
AsyncTraitDeps,
};
Expand Down Expand Up @@ -77,6 +78,15 @@ pub trait EventCacheStore: AsyncTraitDeps {
linked_chunk_id: LinkedChunkId<'_>,
) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error>;

/// Load all of the chunks' metadata for the given [`LinkedChunkId`].
///
/// Chunks are unordered, and there's no guarantee that the chunks would
/// form a valid linked chunk after reconstruction.
async fn load_all_chunks_metadata(
&self,
linked_chunk_id: LinkedChunkId<'_>,
) -> Result<Vec<ChunkMetadata>, Self::Error>;

/// Load the last chunk of the `LinkedChunk` holding all events of the room
/// identified by `room_id`.
///
Expand Down Expand Up @@ -313,6 +323,13 @@ impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
self.0.load_all_chunks(linked_chunk_id).await.map_err(Into::into)
}

async fn load_all_chunks_metadata(
&self,
linked_chunk_id: LinkedChunkId<'_>,
) -> Result<Vec<ChunkMetadata>, Self::Error> {
self.0.load_all_chunks_metadata(linked_chunk_id).await.map_err(Into::into)
}

async fn load_last_chunk(
&self,
linked_chunk_id: LinkedChunkId<'_>,
Expand Down
71 changes: 53 additions & 18 deletions crates/matrix-sdk-common/src/linked_chunk/as_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::{
collections::VecDeque,
iter::repeat_n,
ops::{ControlFlow, Not},
ops::ControlFlow,
sync::{Arc, RwLock},
};

Expand All @@ -25,6 +25,7 @@ use super::{
updates::{ReaderToken, Update, UpdatesInner},
ChunkContent, ChunkIdentifier, Iter, Position,
};
use crate::linked_chunk::ChunkMetadata;

/// A type alias to represent a chunk's length. This is purely for commodity.
type ChunkLength = usize;
Expand All @@ -43,7 +44,7 @@ pub struct AsVector<Item, Gap> {
token: ReaderToken,

/// Mapper from `Update` to `VectorDiff`.
mapper: UpdateToVectorDiff,
mapper: UpdateToVectorDiff<Item, Vec<VectorDiff<Item>>>,
}

impl<Item, Gap> AsVector<Item, Gap> {
Expand Down Expand Up @@ -83,20 +84,38 @@ impl<Item, Gap> AsVector<Item, Gap> {
}
}

/// Interface for a type accumulating updates from [`UpdateToVectorDiff::map`],
/// and being returned as a result of this.
pub(super) trait UpdatesAccumulator<Item>: Extend<VectorDiff<Item>> {
/// Create a new accumulator with a rough estimation of the number of
/// updates this accumulator is going to receive.
fn new(num_updates_hint: usize) -> Self;
}

// Simple implementation for a `Vec<VectorDiff<Item>>` collection for
// `AsVector<Item, Gap>`.
impl<Item> UpdatesAccumulator<Item> for Vec<VectorDiff<Item>> {
fn new(num_updates_hint: usize) -> Vec<VectorDiff<Item>> {
Vec::with_capacity(num_updates_hint)
}
}

/// Internal type that converts [`Update`] into [`VectorDiff`].
#[derive(Debug)]
struct UpdateToVectorDiff {
pub(super) struct UpdateToVectorDiff<Item, Acc: UpdatesAccumulator<Item>> {
/// Pairs of all known chunks and their respective length. This is the only
/// required data for this algorithm.
chunks: VecDeque<(ChunkIdentifier, ChunkLength)>,
pub chunks: VecDeque<(ChunkIdentifier, ChunkLength)>,

_phantom: std::marker::PhantomData<(Item, Acc)>,
}

impl UpdateToVectorDiff {
impl<Item, Acc: UpdatesAccumulator<Item>> UpdateToVectorDiff<Item, Acc> {
/// Construct [`UpdateToVectorDiff`], based on an iterator of
/// [`Chunk`](super::Chunk)s, used to set up its own internal state.
///
/// See [`Self::map`] to learn more about the algorithm.
fn new<const CAP: usize, Item, Gap>(chunk_iterator: Iter<'_, CAP, Item, Gap>) -> Self {
pub fn new<const CAP: usize, Gap>(chunk_iterator: Iter<'_, CAP, Item, Gap>) -> Self {
let mut initial_chunk_lengths = VecDeque::new();

for chunk in chunk_iterator {
Expand All @@ -109,7 +128,18 @@ impl UpdateToVectorDiff {
))
}

Self { chunks: initial_chunk_lengths }
Self { chunks: initial_chunk_lengths, _phantom: std::marker::PhantomData }
}

/// Construct [`UpdateToVectorDiff`], based on a linked chunk's full
/// metadata, used to set up its own internal state.
///
/// See [`Self::map`] to learn more about the algorithm.
pub fn from_metadata(metas: Vec<ChunkMetadata>) -> Self {
let initial_chunk_lengths =
metas.into_iter().map(|meta| (meta.identifier, meta.num_items)).collect();

Self { chunks: initial_chunk_lengths, _phantom: std::marker::PhantomData }
}

/// Map several [`Update`] into [`VectorDiff`].
Expand Down Expand Up @@ -172,13 +202,18 @@ impl UpdateToVectorDiff {
/// [`LinkedChunk`]: super::LinkedChunk
/// [`ChunkContent::Gap`]: super::ChunkContent::Gap
/// [`ChunkContent::Content`]: super::ChunkContent::Content
fn map<Item, Gap>(&mut self, updates: &[Update<Item, Gap>]) -> Vec<VectorDiff<Item>>
pub fn map<Gap>(&mut self, updates: &[Update<Item, Gap>]) -> Acc
where
Item: Clone,
{
let mut diffs = Vec::with_capacity(updates.len());
let mut acc = Acc::new(updates.len());

// A flag specifying when updates are reattaching detached items.
// Flags specifying when updates are reattaching detached items.
//
// TL;DR: This is an optimization to avoid that insertions in the middle of a
// chunk cause a large series of `VectorDiff::Remove` and
// `VectorDiff::Insert` updates for the elements placed after the
// inserted item.
//
// Why is it useful?
//
Expand Down Expand Up @@ -329,7 +364,7 @@ impl UpdateToVectorDiff {
.expect("Removing an index out of the bounds");

// Removing at the same index because each `Remove` shifts items to the left.
diffs.extend(repeat_n(VectorDiff::Remove { index: offset }, number_of_items));
acc.extend(repeat_n(VectorDiff::Remove { index: offset }, number_of_items));
}

Update::PushItems { at: position, items } => {
Expand All @@ -348,12 +383,12 @@ impl UpdateToVectorDiff {
}

// Optimisation: we can emit a `VectorDiff::Append` in this particular case.
if is_pushing_back && detaching.not() {
diffs.push(VectorDiff::Append { values: items.into() });
if is_pushing_back && !detaching {
acc.extend([VectorDiff::Append { values: items.into() }]);
}
// No optimisation: let's emit `VectorDiff::Insert`.
else {
diffs.extend(items.iter().enumerate().map(|(nth, item)| {
acc.extend(items.iter().enumerate().map(|(nth, item)| {
VectorDiff::Insert { index: offset + nth, value: item.clone() }
}));
}
Expand All @@ -364,7 +399,7 @@ impl UpdateToVectorDiff {

// The chunk length doesn't change.

diffs.push(VectorDiff::Set { index: offset, value: item.clone() });
acc.extend([VectorDiff::Set { index: offset, value: item.clone() }]);
}

Update::RemoveItem { at: position } => {
Expand All @@ -379,7 +414,7 @@ impl UpdateToVectorDiff {
}

// Let's emit a `VectorDiff::Remove`.
diffs.push(VectorDiff::Remove { index: offset });
acc.extend([VectorDiff::Remove { index: offset }]);
}

Update::DetachLastItems { at: position } => {
Expand Down Expand Up @@ -422,12 +457,12 @@ impl UpdateToVectorDiff {
self.chunks.clear();

// Let's straightforwardly emit a `VectorDiff::Clear`.
diffs.push(VectorDiff::Clear);
acc.extend([VectorDiff::Clear]);
}
}
}

diffs
acc
}

fn map_to_offset(&mut self, position: &Position) -> (usize, (usize, &mut usize)) {
Expand Down
Loading
Loading