Skip to content

Commit

Permalink
refactor(event cache): get the *most recent* pagination token, not th…
Browse files Browse the repository at this point in the history
…e *oldest* one

Whenever it needs to back-paginate, the event cache should start with
the *most recent* backpagination token, not the oldest one.

This isn't a functional change, until the persistent storage is enabled.
The reason is that, currently, there is one previous-batch token alive;
after it's used, it's replaced with another gap and the events it served
to request from the server.

When persistent storage will be enabled, we'll have situations like the
one shown in the test code, where we can have multiple previous-batch
token alive at the same time. In that case, we'll need to back-paginate
from the most recent events to the least recent events, and not the
other way around, or we'll have holes in the timeline that won't be
filled until we got to the start of the timeline.
  • Loading branch information
bnjbvr committed Dec 9, 2024
1 parent a1a04ee commit e402ed4
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 8 deletions.
57 changes: 52 additions & 5 deletions crates/matrix-sdk/src/event_cache/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ impl RoomPagination {
/// Otherwise, it will immediately skip.
#[doc(hidden)]
pub async fn get_or_wait_for_token(&self, wait_time: Option<Duration>) -> Option<String> {
fn get_oldest(events: &RoomEvents) -> Option<String> {
events.chunks().find_map(|chunk| match chunk.content() {
fn get_latest(events: &RoomEvents) -> Option<String> {
events.rchunks().find_map(|chunk| match chunk.content() {
ChunkContent::Gap(gap) => Some(gap.prev_token.clone()),
ChunkContent::Items(..) => None,
})
Expand All @@ -256,7 +256,7 @@ impl RoomPagination {
// Scope for the lock guard.
let state = self.inner.state.read().await;
// Fast-path: we do have a previous-batch token already.
if let Some(found) = get_oldest(state.events()) {
if let Some(found) = get_latest(state.events()) {
return Some(found);
}
// If we've already waited for an initial previous-batch token before,
Expand All @@ -275,7 +275,7 @@ impl RoomPagination {
let _ = timeout(wait_time, self.inner.pagination_batch_token_notifier.notified()).await;

let mut state = self.inner.state.write().await;
let token = get_oldest(state.events());
let token = get_latest(state.events());
state.waited_for_initial_prev_token = true;
token
}
Expand Down Expand Up @@ -321,7 +321,9 @@ mod tests {
use std::time::{Duration, Instant};

use matrix_sdk_base::RoomState;
use matrix_sdk_test::{async_test, sync_timeline_event};
use matrix_sdk_test::{
async_test, event_factory::EventFactory, sync_timeline_event, ALICE,
};
use ruma::room_id;
use tokio::{spawn, time::sleep};

Expand Down Expand Up @@ -506,5 +508,50 @@ mod tests {
// The task succeeded.
insert_token_task.await.unwrap();
}

#[async_test]
async fn test_get_latest_token() {
let client = logged_in_client(None).await;
let room_id = room_id!("!galette:saucisse.bzh");
client.base_client().get_or_create_room(room_id, RoomState::Joined);

let event_cache = client.event_cache();

event_cache.subscribe().unwrap();

let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();

let old_token = "old".to_owned();
let new_token = "new".to_owned();

// Assuming a room event cache that contains both an old and a new pagination
// token, and events in between,
room_event_cache
.inner
.state
.write()
.await
.with_events_mut(|events| {
let f = EventFactory::new().room(room_id).sender(*ALICE);

// This simulates a valid representation of a room: first group of gap+events
// were e.g. restored from the cache; second group of gap+events was received
// from a subsequent sync.
events.push_gap(Gap { prev_token: old_token });
events.push_events([f.text_msg("oldest from cache").into()]);

events.push_gap(Gap { prev_token: new_token.clone() });
events.push_events([f.text_msg("sync'd gappy timeline").into()]);
})
.await
.unwrap();

let pagination = room_event_cache.pagination();

// Retrieving the pagination token will return the most recent one, not the old
// one.
let found = pagination.get_or_wait_for_token(None).await;
assert_eq!(found, Some(new_token));
}
}
}
16 changes: 13 additions & 3 deletions crates/matrix-sdk/src/event_cache/room/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use eyeball_im::VectorDiff;
pub use matrix_sdk_base::event_cache::{Event, Gap};
use matrix_sdk_base::{
event_cache::store::DEFAULT_CHUNK_CAPACITY,
linked_chunk::{AsVector, ObservableUpdates},
linked_chunk::{AsVector, IterBackward, ObservableUpdates},
};
use matrix_sdk_common::linked_chunk::{
Chunk, ChunkIdentifier, EmptyChunk, Error, Iter, LinkedChunk, Position,
Chunk, ChunkIdentifier, EmptyChunk, Error, LinkedChunk, Position,
};
use ruma::OwnedEventId;
use tracing::{debug, error, warn};
Expand Down Expand Up @@ -177,10 +177,20 @@ impl RoomEvents {
/// Iterate over the chunks, forward.
///
/// The oldest chunk comes first.
pub fn chunks(&self) -> Iter<'_, DEFAULT_CHUNK_CAPACITY, Event, Gap> {
#[cfg(test)]
pub fn chunks(
&self,
) -> matrix_sdk_common::linked_chunk::Iter<'_, DEFAULT_CHUNK_CAPACITY, Event, Gap> {
self.chunks.chunks()
}

/// Iterate over the chunks, backward.
///
/// The most recent chunk comes first.
pub fn rchunks(&self) -> IterBackward<'_, DEFAULT_CHUNK_CAPACITY, Event, Gap> {
self.chunks.rchunks()
}

/// Iterate over the events, backward.
///
/// The most recent event comes first.
Expand Down

0 comments on commit e402ed4

Please sign in to comment.