diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index 793c7d700af..064acd5ae00 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -137,105 +137,85 @@ impl RoomPagination { // Check that the previous token still exists; otherwise it's a sign that the // room's timeline has been cleared. - let gap_identifier = if let Some(token) = prev_token { - let gap_identifier = state.events().chunk_identifier(|chunk| { + let prev_gap_id = if let Some(token) = prev_token { + let gap_id = state.events().chunk_identifier(|chunk| { matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token) }); - // The method has been called with `token` but it doesn't exist in `RoomEvents`, - // it's an error. - if gap_identifier.is_none() { + // We got a previous-batch token from the linked chunk *before* running the + // request, which is missing from the linked chunk *after* + // completing the request. It may be a sign the linked chunk has + // been reset, and it's an error in any case. + if gap_id.is_none() { return Ok(None); } - gap_identifier + gap_id } else { None }; - let prev_token = paginator.prev_batch_token().map(|prev_token| Gap { prev_token }); - - Ok(Some(state.with_events_mut(move |room_events| { - // Note: The chunk could be empty. - // - // If there's any event, they are presented in reverse order (i.e. the first one - // should be prepended first). - - let sync_events = events - .iter() - // Reverse the order of the events as `/messages` has been called with `dir=b` - // (backward). The `RoomEvents` API expects the first event to be the oldest. - .rev() - .cloned() - .map(SyncTimelineEvent::from); - - - // There is a `token`/gap, let's replace it by new events! - if let Some(gap_identifier) = gap_identifier { - let new_position = { - // Replace the gap by new events. - let new_chunk = room_events - .replace_gap_at(sync_events, gap_identifier) - // SAFETY: we are sure that `gap_identifier` represents a valid - // `ChunkIdentifier` for a `Gap` chunk. - .expect("The `gap_identifier` must represent a `Gap`"); - - new_chunk.first_position() - }; - - // And insert a new gap if there is any `prev_token`. - if let Some(prev_token_gap) = prev_token { + let new_gap = paginator.prev_batch_token().map(|prev_token| Gap { prev_token }); + + let result = state + .with_events_mut(move |room_events| { + // Note: The chunk could be empty. + // + // If there's any event, they are presented in reverse order (i.e. the first one + // should be prepended first). + + let sync_events = events + .iter() + // Reverse the order of the events as `/messages` has been called with `dir=b` + // (backward). The `RoomEvents` API expects the first event to be the oldest. + .rev() + .cloned() + .map(SyncTimelineEvent::from); + + let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos); + + // First, insert events. + let insert_new_gap_pos = if let Some(gap_id) = prev_gap_id { + // There is a prior gap, let's replace it by new events! + trace!("replaced gap with new events from backpagination"); + Some( + room_events + .replace_gap_at(sync_events, gap_id) + .expect("gap_identifier is a valid chunk id we read previously") + .first_position(), + ) + } else if let Some(pos) = first_event_pos { + // No prior gap, but we had some events: assume we need to prepend events + // before those. + trace!("inserted events before the first known event"); room_events - .insert_gap_at(prev_token_gap, new_position) - // SAFETY: we are sure that `new_position` represents a valid - // `ChunkIdentifier` for an `Item` chunk. - .expect("The `new_position` must represent an `Item`"); - } - - trace!("replaced gap with new events from backpagination"); - - // TODO: implement smarter reconciliation later - //let _ = self.sender.send(RoomEventCacheUpdate::Prepend { events }); - - return BackPaginationOutcome { events, reached_start }; - } - - // There is no `token`/gap identifier. Let's assume we must prepend the new - // events. - let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos); + .insert_events_at(sync_events, pos) + .expect("pos is a valid position we just read above"); + Some(pos) + } else { + // No prior gap, and no prior events: push the events. + trace!("pushing events received from back-pagination"); + room_events.push_events(sync_events); + // A new gap may be inserted before the new events, if there are any. + room_events.events().next().map(|(item_pos, _)| item_pos) + }; - match first_event_pos { - // Is there a first item? Insert at this position. - Some(first_event_pos) => { - if let Some(prev_token_gap) = prev_token { + // And insert the new gap if needs be. + if let Some(new_gap) = new_gap { + if let Some(new_pos) = insert_new_gap_pos { room_events - .insert_gap_at(prev_token_gap, first_event_pos) - // SAFETY: The `first_event_pos` can only be an `Item` chunk, it's - // an invariant of `LinkedChunk`. Also, it can only represent a valid - // `ChunkIdentifier` as the data structure isn't modified yet. - .expect("`first_event_pos` must point to a valid `Item` chunk when inserting a gap"); + .insert_gap_at(new_gap, new_pos) + .expect("events_chunk_pos represents a valid chunk position"); + } else { + room_events.push_gap(new_gap); } - - room_events - .insert_events_at(sync_events, first_event_pos) - // SAFETY: The `first_event_pos` can only be an `Item` chunk, it's - // an invariant of `LinkedChunk`. The chunk it points to has not been - // removed. - .expect("The `first_event_pos` must point to a valid `Item` chunk when inserting events"); } - // There is no first item. Let's simply push. - None => { - if let Some(prev_token_gap) = prev_token { - room_events.push_gap(prev_token_gap); - } - - room_events.push_events(sync_events); - } - } + BackPaginationOutcome { events, reached_start } + }) + .await?; - BackPaginationOutcome { events, reached_start } - }).await?)) + Ok(Some(result)) } /// Get the latest pagination token, as stored in the room events linked diff --git a/crates/matrix-sdk/src/event_cache/room/events.rs b/crates/matrix-sdk/src/event_cache/room/events.rs index f194349ff35..24c1d9ba3b7 100644 --- a/crates/matrix-sdk/src/event_cache/room/events.rs +++ b/crates/matrix-sdk/src/event_cache/room/events.rs @@ -111,7 +111,7 @@ impl RoomEvents { /// Push a gap after all events or gaps. pub fn push_gap(&mut self, gap: Gap) { - self.chunks.push_gap_back(gap) + self.chunks.push_gap_back(gap); } /// Insert events at a specified position. diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index 95490990741..0832fea4b5d 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -9,19 +9,16 @@ use matrix_sdk::{ paginator::PaginatorState, BackPaginationOutcome, EventCacheError, RoomEventCacheUpdate, TimelineHasBeenResetWhilePaginating, }, - test_utils::{assert_event_matches_msg, logged_in_client_with_server, mocks::MatrixMockServer}, + test_utils::{assert_event_matches_msg, mocks::MatrixMockServer}, }; use matrix_sdk_test::{ async_test, event_factory::EventFactory, GlobalAccountDataTestEvent, JoinedRoomBuilder, - SyncResponseBuilder, }; use ruma::{event_id, room_id, user_id}; use serde_json::json; use tokio::{spawn, sync::broadcast}; use wiremock::ResponseTemplate; -use crate::mock_sync; - async fn once( outcome: BackPaginationOutcome, _timeline_has_been_reset: TimelineHasBeenResetWhilePaginating, @@ -31,24 +28,14 @@ async fn once( #[async_test] async fn test_must_explicitly_subscribe() { - let (client, server) = logged_in_client_with_server().await; + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; let room_id = room_id!("!omelette:fromage.fr"); - { - // Make sure the client is aware of the room. - let mut sync_builder = SyncResponseBuilder::new(); - sync_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); - let response_body = sync_builder.build_json_sync_response(); - - mock_sync(&server, response_body, None).await; - client.sync_once(Default::default()).await.unwrap(); - server.reset().await; - } - // If I create a room event subscriber for a room before subscribing the event // cache, - let room = client.get_room(room_id).unwrap(); + let room = server.sync_joined_room(&client, room_id).await; let result = room.event_cache().await; // Then it fails, because one must explicitly call `.subscribe()` on the event @@ -58,25 +45,17 @@ async fn test_must_explicitly_subscribe() { #[async_test] async fn test_event_cache_receives_events() { - let (client, server) = logged_in_client_with_server().await; + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; // Immediately subscribe the event cache to sync updates. client.event_cache().subscribe().unwrap(); // If I sync and get informed I've joined The Room, but with no events, let room_id = room_id!("!omelette:fromage.fr"); - - let mut sync_builder = SyncResponseBuilder::new(); - sync_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); - let response_body = sync_builder.build_json_sync_response(); - - mock_sync(&server, response_body, None).await; - client.sync_once(Default::default()).await.unwrap(); - server.reset().await; + let room = server.sync_joined_room(&client, room_id).await; // If I create a room event subscriber, - - let room = client.get_room(room_id).unwrap(); let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); let (events, mut subscriber) = room_event_cache.subscribe().await.unwrap(); @@ -84,23 +63,21 @@ async fn test_event_cache_receives_events() { assert!(events.is_empty()); assert!(subscriber.is_empty()); - let ev_factory = EventFactory::new().sender(user_id!("@dexter:lab.org")); + let f = EventFactory::new().sender(user_id!("@dexter:lab.org")); // And after a sync, yielding updates to two rooms, - sync_builder.add_joined_room( - JoinedRoomBuilder::new(room_id).add_timeline_event(ev_factory.text_msg("bonjour monde")), - ); - - sync_builder.add_joined_room( - JoinedRoomBuilder::new(room_id!("!parallel:universe.uk")) - .add_timeline_event(ev_factory.text_msg("hi i'm learning French")), - ); - - let response_body = sync_builder.build_json_sync_response(); - - mock_sync(&server, response_body, None).await; - client.sync_once(Default::default()).await.unwrap(); - server.reset().await; + server + .mock_sync() + .ok_and_run(&client, |sync_builder| { + sync_builder.add_joined_room( + JoinedRoomBuilder::new(room_id).add_timeline_event(f.text_msg("bonjour monde")), + ); + sync_builder.add_joined_room( + JoinedRoomBuilder::new(room_id!("!parallel:universe.uk")) + .add_timeline_event(f.text_msg("hi i'm learning French")), + ); + }) + .await; // It does receive one update, assert_let_timeout!( @@ -857,3 +834,85 @@ async fn test_limited_timeline_with_storage() { // That's all, folks! assert!(subscriber.is_empty()); } + +#[async_test] +async fn test_backpaginate_with_no_initial_events() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + let event_cache = client.event_cache(); + + // Immediately subscribe the event cache to sync updates. + event_cache.subscribe().unwrap(); + + let room_id = room_id!("!omelette:fromage.fr"); + + let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c")); + + // Start with a room with an event, but no prev-batch token. + let room = server + .sync_room( + &client, + JoinedRoomBuilder::new(room_id) + .add_timeline_event(f.text_msg("hello").event_id(event_id!("$3"))), + ) + .await; + + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + let (events, mut stream) = room_event_cache.subscribe().await.unwrap(); + wait_for_initial_events(events, &mut stream).await; + + // The first back-pagination will return these two events. + // + // Note: it's important to return the same event that came from sync: since we + // will back-paginate without a prev-batch token first, we'll back-paginate + // from the end of the timeline, which must include the event we got from + // sync. + + server + .mock_room_messages() + .ok( + "start-token-unused1".to_owned(), + Some("prev_batch".to_owned()), + vec![ + f.text_msg("world").event_id(event_id!("$2")), + f.text_msg("hello").event_id(event_id!("$3")), + ], + Vec::new(), + ) + .mock_once() + .mount() + .await; + + // The second round of back-pagination will return this one. + server + .mock_room_messages() + .from("prev_batch") + .ok( + "start-token-unused2".to_owned(), + None, + vec![f.text_msg("oh well").event_id(event_id!("$1"))], + Vec::new(), + ) + .mock_once() + .mount() + .await; + + let pagination = room_event_cache.pagination(); + + // Run pagination: since there's no token, we'll wait a bit for a sync to return + // one, and since there's none, we'll end up starting from the end of the + // timeline. + pagination.run_backwards(20, once).await.unwrap(); + // Second pagination will be instant. + pagination.run_backwards(20, once).await.unwrap(); + + // The linked chunk should contain the events in the correct order. + let (events, _stream) = room_event_cache.subscribe().await.unwrap(); + + assert_event_matches_msg(&events[0], "oh well"); + assert_event_matches_msg(&events[1], "hello"); + assert_event_matches_msg(&events[2], "world"); + assert_eq!(events.len(), 3); +}