Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(event cache): fix back-pagination ordering in one specific case #4402

Merged
merged 5 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 62 additions & 82 deletions crates/matrix-sdk/src/event_cache/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,105 +137,85 @@ impl RoomPagination {

// Check that the previous token still exists; otherwise it's a sign that the
// room's timeline has been cleared.
let gap_identifier = if let Some(token) = prev_token {
let gap_identifier = state.events().chunk_identifier(|chunk| {
let prev_gap_id = if let Some(token) = prev_token {
let gap_id = state.events().chunk_identifier(|chunk| {
matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token)
});

// The method has been called with `token` but it doesn't exist in `RoomEvents`,
// it's an error.
if gap_identifier.is_none() {
// We got a previous-batch token from the linked chunk *before* running the
// request, which is missing from the linked chunk *after*
// completing the request. It may be a sign the linked chunk has
// been reset, and it's an error in any case.
if gap_id.is_none() {
return Ok(None);
}

gap_identifier
gap_id
} else {
None
};

let prev_token = paginator.prev_batch_token().map(|prev_token| Gap { prev_token });

Ok(Some(state.with_events_mut(move |room_events| {
// Note: The chunk could be empty.
//
// If there's any event, they are presented in reverse order (i.e. the first one
// should be prepended first).

let sync_events = events
.iter()
// Reverse the order of the events as `/messages` has been called with `dir=b`
// (backward). The `RoomEvents` API expects the first event to be the oldest.
.rev()
.cloned()
.map(SyncTimelineEvent::from);


// There is a `token`/gap, let's replace it by new events!
if let Some(gap_identifier) = gap_identifier {
let new_position = {
// Replace the gap by new events.
let new_chunk = room_events
.replace_gap_at(sync_events, gap_identifier)
// SAFETY: we are sure that `gap_identifier` represents a valid
// `ChunkIdentifier` for a `Gap` chunk.
.expect("The `gap_identifier` must represent a `Gap`");

new_chunk.first_position()
};

// And insert a new gap if there is any `prev_token`.
if let Some(prev_token_gap) = prev_token {
let new_gap = paginator.prev_batch_token().map(|prev_token| Gap { prev_token });

let result = state
.with_events_mut(move |room_events| {
// Note: The chunk could be empty.
//
// If there's any event, they are presented in reverse order (i.e. the first one
// should be prepended first).

let sync_events = events
.iter()
// Reverse the order of the events as `/messages` has been called with `dir=b`
// (backward). The `RoomEvents` API expects the first event to be the oldest.
.rev()
.cloned()
.map(SyncTimelineEvent::from);

let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos);

// First, insert events.
let insert_new_gap_pos = if let Some(gap_id) = prev_gap_id {
// There is a prior gap, let's replace it by new events!
trace!("replaced gap with new events from backpagination");
Some(
room_events
.replace_gap_at(sync_events, gap_id)
.expect("gap_identifier is a valid chunk id we read previously")
.first_position(),
)
} else if let Some(pos) = first_event_pos {
// No prior gap, but we had some events: assume we need to prepend events
// before those.
trace!("inserted events before the first known event");
room_events
.insert_gap_at(prev_token_gap, new_position)
// SAFETY: we are sure that `new_position` represents a valid
// `ChunkIdentifier` for an `Item` chunk.
.expect("The `new_position` must represent an `Item`");
}

trace!("replaced gap with new events from backpagination");

// TODO: implement smarter reconciliation later
//let _ = self.sender.send(RoomEventCacheUpdate::Prepend { events });

return BackPaginationOutcome { events, reached_start };
}

// There is no `token`/gap identifier. Let's assume we must prepend the new
// events.
let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos);
.insert_events_at(sync_events, pos)
.expect("pos is a valid position we just read above");
Some(pos)
} else {
// No prior gap, and no prior events: push the events.
trace!("pushing events received from back-pagination");
room_events.push_events(sync_events);
// A new gap may be inserted before the new events, if there are any.
room_events.events().next().map(|(item_pos, _)| item_pos)
};

match first_event_pos {
// Is there a first item? Insert at this position.
Some(first_event_pos) => {
if let Some(prev_token_gap) = prev_token {
// And insert the new gap if needs be.
if let Some(new_gap) = new_gap {
if let Some(new_pos) = insert_new_gap_pos {
room_events
.insert_gap_at(prev_token_gap, first_event_pos)
// SAFETY: The `first_event_pos` can only be an `Item` chunk, it's
// an invariant of `LinkedChunk`. Also, it can only represent a valid
// `ChunkIdentifier` as the data structure isn't modified yet.
.expect("`first_event_pos` must point to a valid `Item` chunk when inserting a gap");
.insert_gap_at(new_gap, new_pos)
.expect("events_chunk_pos represents a valid chunk position");
} else {
room_events.push_gap(new_gap);
}

room_events
.insert_events_at(sync_events, first_event_pos)
// SAFETY: The `first_event_pos` can only be an `Item` chunk, it's
// an invariant of `LinkedChunk`. The chunk it points to has not been
// removed.
.expect("The `first_event_pos` must point to a valid `Item` chunk when inserting events");
}

// There is no first item. Let's simply push.
None => {
if let Some(prev_token_gap) = prev_token {
room_events.push_gap(prev_token_gap);
}

room_events.push_events(sync_events);
}
}
BackPaginationOutcome { events, reached_start }
})
.await?;

BackPaginationOutcome { events, reached_start }
}).await?))
Ok(Some(result))
}

/// Get the latest pagination token, as stored in the room events linked
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk/src/event_cache/room/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl RoomEvents {

/// Push a gap after all events or gaps.
pub fn push_gap(&mut self, gap: Gap) {
self.chunks.push_gap_back(gap)
self.chunks.push_gap_back(gap);
}

/// Insert events at a specified position.
Expand Down
145 changes: 102 additions & 43 deletions crates/matrix-sdk/tests/integration/event_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,16 @@ use matrix_sdk::{
paginator::PaginatorState, BackPaginationOutcome, EventCacheError, RoomEventCacheUpdate,
TimelineHasBeenResetWhilePaginating,
},
test_utils::{assert_event_matches_msg, logged_in_client_with_server, mocks::MatrixMockServer},
test_utils::{assert_event_matches_msg, mocks::MatrixMockServer},
};
use matrix_sdk_test::{
async_test, event_factory::EventFactory, GlobalAccountDataTestEvent, JoinedRoomBuilder,
SyncResponseBuilder,
};
use ruma::{event_id, room_id, user_id};
use serde_json::json;
use tokio::{spawn, sync::broadcast};
use wiremock::ResponseTemplate;

use crate::mock_sync;

async fn once(
outcome: BackPaginationOutcome,
_timeline_has_been_reset: TimelineHasBeenResetWhilePaginating,
Expand All @@ -31,24 +28,14 @@ async fn once(

#[async_test]
async fn test_must_explicitly_subscribe() {
let (client, server) = logged_in_client_with_server().await;
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;

let room_id = room_id!("!omelette:fromage.fr");

{
// Make sure the client is aware of the room.
let mut sync_builder = SyncResponseBuilder::new();
sync_builder.add_joined_room(JoinedRoomBuilder::new(room_id));
let response_body = sync_builder.build_json_sync_response();

mock_sync(&server, response_body, None).await;
client.sync_once(Default::default()).await.unwrap();
server.reset().await;
}

// If I create a room event subscriber for a room before subscribing the event
// cache,
let room = client.get_room(room_id).unwrap();
let room = server.sync_joined_room(&client, room_id).await;
let result = room.event_cache().await;

// Then it fails, because one must explicitly call `.subscribe()` on the event
Expand All @@ -58,49 +45,39 @@ async fn test_must_explicitly_subscribe() {

#[async_test]
async fn test_event_cache_receives_events() {
let (client, server) = logged_in_client_with_server().await;
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;

// Immediately subscribe the event cache to sync updates.
client.event_cache().subscribe().unwrap();

// If I sync and get informed I've joined The Room, but with no events,
let room_id = room_id!("!omelette:fromage.fr");

let mut sync_builder = SyncResponseBuilder::new();
sync_builder.add_joined_room(JoinedRoomBuilder::new(room_id));
let response_body = sync_builder.build_json_sync_response();

mock_sync(&server, response_body, None).await;
client.sync_once(Default::default()).await.unwrap();
server.reset().await;
let room = server.sync_joined_room(&client, room_id).await;

// If I create a room event subscriber,

let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut subscriber) = room_event_cache.subscribe().await.unwrap();

// Then at first it's empty, and the subscriber doesn't yield anything.
assert!(events.is_empty());
assert!(subscriber.is_empty());

let ev_factory = EventFactory::new().sender(user_id!("@dexter:lab.org"));
let f = EventFactory::new().sender(user_id!("@dexter:lab.org"));

// And after a sync, yielding updates to two rooms,
sync_builder.add_joined_room(
JoinedRoomBuilder::new(room_id).add_timeline_event(ev_factory.text_msg("bonjour monde")),
);

sync_builder.add_joined_room(
JoinedRoomBuilder::new(room_id!("!parallel:universe.uk"))
.add_timeline_event(ev_factory.text_msg("hi i'm learning French")),
);

let response_body = sync_builder.build_json_sync_response();

mock_sync(&server, response_body, None).await;
client.sync_once(Default::default()).await.unwrap();
server.reset().await;
server
.mock_sync()
.ok_and_run(&client, |sync_builder| {
sync_builder.add_joined_room(
JoinedRoomBuilder::new(room_id).add_timeline_event(f.text_msg("bonjour monde")),
);
sync_builder.add_joined_room(
JoinedRoomBuilder::new(room_id!("!parallel:universe.uk"))
.add_timeline_event(f.text_msg("hi i'm learning French")),
);
})
.await;

// It does receive one update,
assert_let_timeout!(
Expand Down Expand Up @@ -857,3 +834,85 @@ async fn test_limited_timeline_with_storage() {
// That's all, folks!
assert!(subscriber.is_empty());
}

#[async_test]
async fn test_backpaginate_with_no_initial_events() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;

let event_cache = client.event_cache();

// Immediately subscribe the event cache to sync updates.
event_cache.subscribe().unwrap();

let room_id = room_id!("!omelette:fromage.fr");

let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));

// Start with a room with an event, but no prev-batch token.
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("hello").event_id(event_id!("$3"))),
)
.await;

let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();

let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
wait_for_initial_events(events, &mut stream).await;

// The first back-pagination will return these two events.
//
// Note: it's important to return the same event that came from sync: since we
// will back-paginate without a prev-batch token first, we'll back-paginate
// from the end of the timeline, which must include the event we got from
// sync.

server
.mock_room_messages()
.ok(
"start-token-unused1".to_owned(),
Some("prev_batch".to_owned()),
vec![
f.text_msg("world").event_id(event_id!("$2")),
f.text_msg("hello").event_id(event_id!("$3")),
],
Vec::new(),
)
.mock_once()
.mount()
.await;

// The second round of back-pagination will return this one.
server
.mock_room_messages()
.from("prev_batch")
.ok(
"start-token-unused2".to_owned(),
None,
vec![f.text_msg("oh well").event_id(event_id!("$1"))],
Vec::new(),
)
.mock_once()
.mount()
.await;

let pagination = room_event_cache.pagination();

// Run pagination: since there's no token, we'll wait a bit for a sync to return
// one, and since there's none, we'll end up starting from the end of the
// timeline.
pagination.run_backwards(20, once).await.unwrap();
// Second pagination will be instant.
pagination.run_backwards(20, once).await.unwrap();

// The linked chunk should contain the events in the correct order.
let (events, _stream) = room_event_cache.subscribe().await.unwrap();

assert_event_matches_msg(&events[0], "oh well");
assert_event_matches_msg(&events[1], "hello");
assert_event_matches_msg(&events[2], "world");
assert_eq!(events.len(), 3);
}
Loading