From 26fa7f2fffc4458be0a0d6ab6e536501ffa6a0d5 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 6 Jun 2024 16:06:36 +0200 Subject: [PATCH] feat(sdk,ui): `Timeline` receives backpaginated events via `RoomEventCacheUpdate`! Prior to this patch, when the `Timeline` was doing a pagination, the new events were inside the `EventCache` and also added directly onto the timeline. However, we already have a mechanism to receive new events in the `Timeline`, and it's a task run by the `Timeline` builder. New events are received via `RoomEventCacheUpdate`. The problem is: we have two entry points to add events: one with `RoomEventCacheUpdate`, and one with paginations. This patch removes the second entry point! Now the `Timeline` receives **all** its events via `RoomEventCacheUpdate`. This patch updates all the appropriate tests accordingly. --- .../src/timeline/event_handler.rs | 2 +- .../matrix-sdk-ui/src/timeline/inner/mod.rs | 2 +- .../matrix-sdk-ui/src/timeline/pagination.rs | 8 - .../tests/integration/timeline/pagination.rs | 187 ++++++++---------- .../matrix-sdk/src/event_cache/pagination.rs | 28 ++- .../tests/integration/event_cache.rs | 84 +++++++- 6 files changed, 195 insertions(+), 116 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index a0ea817bf2c..fd35015147a 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -1122,7 +1122,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { // The event index maps to a timeline item index! event_meta.timeline_item_index = Some(timeline_item_index); - // Shift all timeline item index to the right after `timeline_item_index`! + // Shift all timeline item indices to the right after `timeline_item_index`! for event_meta in &mut self.meta.all_events { if let Some(index) = event_meta.timeline_item_index.as_mut() { if *index >= timeline_item_index { diff --git a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs index 58f87478ea2..e51b866ad6e 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs @@ -633,7 +633,7 @@ impl TimelineInner

{ .await } - diff => todo!("Unsupported `VectorDiff` {diff:?}"), + diff => unimplemented!("Unsupported `VectorDiff` {diff:?}"), }; } diff --git a/crates/matrix-sdk-ui/src/timeline/pagination.rs b/crates/matrix-sdk-ui/src/timeline/pagination.rs index 55750b33311..c7601ad2160 100644 --- a/crates/matrix-sdk-ui/src/timeline/pagination.rs +++ b/crates/matrix-sdk-ui/src/timeline/pagination.rs @@ -26,7 +26,6 @@ use matrix_sdk::event_cache::{ use tracing::{instrument, trace, warn}; use super::Error; -use crate::timeline::{event_item::RemoteEventOrigin, inner::TimelineNewItemPosition}; impl super::Timeline { /// Add more events to the start of the timeline. @@ -77,13 +76,6 @@ impl super::Timeline { let num_events = events.len(); trace!("Back-pagination succeeded with {num_events} events"); - // TODO(hywan): Remove, and let spread events via - // `matrix_sdk::event_cache::RoomEventCacheUpdate` from - // `matrix_sdk::event_cache::RoomPagination::run_backwards`. - self.inner - .add_events_at(events, TimelineNewItemPosition::Start { origin: RemoteEventOrigin::Pagination }) - .await; - if num_events == 0 && !reached_start { // As an exceptional contract: if there were no events in the response, // and we've not hit the start of the timeline, retry until we get diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs b/crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs index ca8fb08dc31..4e9333d7348 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs @@ -36,7 +36,7 @@ use ruma::{ room_id, }; use serde_json::{json, Value as JsonValue}; -use stream_assert::{assert_next_eq, assert_next_matches}; +use stream_assert::{assert_next_eq, assert_pending}; use tokio::{ spawn, time::{sleep, timeout}, @@ -84,26 +84,7 @@ async fn test_back_pagination() { }; join(paginate, observe_paginating).await; - let message = assert_next_matches!( - timeline_stream, - VectorDiff::PushBack { value } => value - ); - assert_let!(TimelineItemContent::Message(msg) = message.as_event().unwrap().content()); - assert_let!(MessageType::Text(text) = msg.msgtype()); - assert_eq!(text.body, "hello world"); - - let message = assert_next_matches!( - timeline_stream, - VectorDiff::PushFront { value } => value - ); - assert_let!(TimelineItemContent::Message(msg) = message.as_event().unwrap().content()); - assert_let!(MessageType::Text(text) = msg.msgtype()); - assert_eq!(text.body, "the world is big"); - - let message = assert_next_matches!( - timeline_stream, - VectorDiff::PushFront { value } => value - ); + assert_let!(Some(VectorDiff::PushBack { value: message }) = timeline_stream.next().await); assert_let!(TimelineItemContent::OtherState(state) = message.as_event().unwrap().content()); assert_eq!(state.state_key(), ""); assert_let!( @@ -115,12 +96,25 @@ async fn test_back_pagination() { assert_eq!(content.name, "New room name"); assert_eq!(prev_content.as_ref().unwrap().name.as_ref().unwrap(), "Old room name"); - let day_divider = assert_next_matches!( - timeline_stream, - VectorDiff::PushFront { value } => value - ); + // Implicit read receipt. + assert_let!(Some(VectorDiff::Set { index: 0, value: message }) = timeline_stream.next().await); + assert_let!(TimelineItemContent::OtherState(_) = message.as_event().unwrap().content()); + + assert_let!(Some(VectorDiff::PushBack { value: message }) = timeline_stream.next().await); + assert_let!(TimelineItemContent::Message(msg) = message.as_event().unwrap().content()); + assert_let!(MessageType::Text(text) = msg.msgtype()); + assert_eq!(text.body, "the world is big"); + + assert_let!(Some(VectorDiff::PushBack { value: message }) = timeline_stream.next().await); + assert_let!(TimelineItemContent::Message(msg) = message.as_event().unwrap().content()); + assert_let!(MessageType::Text(text) = msg.msgtype()); + assert_eq!(text.body, "hello world"); + + assert_let!(Some(VectorDiff::PushFront { value: day_divider }) = timeline_stream.next().await); assert!(day_divider.is_day_divider()); + assert_pending!(timeline_stream); + Mock::given(method("GET")) .and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$")) .and(header("authorization", "Bearer 1234")) @@ -141,6 +135,9 @@ async fn test_back_pagination() { back_pagination_status, LiveBackPaginationStatus::Idle { hit_start_of_timeline: true } ); + + assert_pending!(timeline_stream); + assert_pending!(back_pagination_status); } #[async_test] @@ -207,27 +204,20 @@ async fn test_back_pagination_highlighted() { timeline.live_paginate_backwards(10).await.unwrap(); server.reset().await; - let first = assert_next_matches!( - timeline_stream, - VectorDiff::PushBack { value } => value - ); - let remote_event = first.as_event().unwrap(); - // Own events don't trigger push rules. - assert!(!remote_event.is_highlighted()); - - let second = assert_next_matches!( - timeline_stream, - VectorDiff::PushFront { value } => value - ); - let remote_event = second.as_event().unwrap(); + assert_let!(Some(VectorDiff::PushBack { value: event }) = timeline_stream.next().await); + let remote_event = event.as_event().unwrap(); // `m.room.tombstone` should be highlighted by default. assert!(remote_event.is_highlighted()); - let day_divider = assert_next_matches!( - timeline_stream, - VectorDiff::PushFront { value } => value - ); + assert_let!(Some(VectorDiff::PushBack { value: event }) = timeline_stream.next().await); + let remote_event = event.as_event().unwrap(); + // Own events don't trigger push rules. + assert!(!remote_event.is_highlighted()); + + assert_let!(Some(VectorDiff::PushFront { value: day_divider }) = timeline_stream.next().await); assert!(day_divider.is_day_divider()); + + assert_pending!(timeline_stream); } #[async_test] @@ -543,7 +533,6 @@ pub static ROOM_MESSAGES_BATCH_2: Lazy = Lazy::new(|| { "type": "m.room.message" }, ], - "start": "t54392-516_47314_0_7_1_1_1_11444_1", "start": "t59392-516_47314_0_7_1_1_1_11444_1", }) }); @@ -600,26 +589,7 @@ async fn test_empty_chunk() { }; join(paginate, observe_paginating).await; - let message = assert_next_matches!( - timeline_stream, - VectorDiff::PushBack { value } => value - ); - assert_let!(TimelineItemContent::Message(msg) = message.as_event().unwrap().content()); - assert_let!(MessageType::Text(text) = msg.msgtype()); - assert_eq!(text.body, "hello world"); - - let message = assert_next_matches!( - timeline_stream, - VectorDiff::PushFront { value } => value - ); - assert_let!(TimelineItemContent::Message(msg) = message.as_event().unwrap().content()); - assert_let!(MessageType::Text(text) = msg.msgtype()); - assert_eq!(text.body, "the world is big"); - - let message = assert_next_matches!( - timeline_stream, - VectorDiff::PushFront { value } => value - ); + assert_let!(Some(VectorDiff::PushBack { value: message }) = timeline_stream.next().await); assert_let!(TimelineItemContent::OtherState(state) = message.as_event().unwrap().content()); assert_eq!(state.state_key(), ""); assert_let!( @@ -631,11 +601,30 @@ async fn test_empty_chunk() { assert_eq!(content.name, "New room name"); assert_eq!(prev_content.as_ref().unwrap().name.as_ref().unwrap(), "Old room name"); - let day_divider = assert_next_matches!( - timeline_stream, - VectorDiff::PushFront { value } => value - ); + // Implicit read receipt. + assert_let!(Some(VectorDiff::Set { index: 0, value: message }) = timeline_stream.next().await); + assert_let!(TimelineItemContent::OtherState(_) = message.as_event().unwrap().content()); + + assert_let!(Some(VectorDiff::PushBack { value: message }) = timeline_stream.next().await); + assert_let!(TimelineItemContent::Message(msg) = message.as_event().unwrap().content()); + assert_let!(MessageType::Text(text) = msg.msgtype()); + assert_eq!(text.body, "the world is big"); + + assert_let!(Some(VectorDiff::PushBack { value: message }) = timeline_stream.next().await); + assert_let!(TimelineItemContent::Message(msg) = message.as_event().unwrap().content()); + assert_let!(MessageType::Text(text) = msg.msgtype()); + assert_eq!(text.body, "hello world"); + + assert_let!(Some(VectorDiff::PushFront { value: day_divider }) = timeline_stream.next().await); assert!(day_divider.is_day_divider()); + + assert_eq!( + back_pagination_status.next().await, + Some(LiveBackPaginationStatus::Idle { hit_start_of_timeline: false }) + ); + + assert_pending!(timeline_stream); + assert_pending!(back_pagination_status); } #[async_test] @@ -698,26 +687,7 @@ async fn test_until_num_items_with_empty_chunk() { }; join(paginate, observe_paginating).await; - let message = assert_next_matches!( - timeline_stream, - VectorDiff::PushBack { value } => value - ); - assert_let!(TimelineItemContent::Message(msg) = message.as_event().unwrap().content()); - assert_let!(MessageType::Text(text) = msg.msgtype()); - assert_eq!(text.body, "hello world"); - - let message = assert_next_matches!( - timeline_stream, - VectorDiff::PushFront { value } => value - ); - assert_let!(TimelineItemContent::Message(msg) = message.as_event().unwrap().content()); - assert_let!(MessageType::Text(text) = msg.msgtype()); - assert_eq!(text.body, "the world is big"); - - let message = assert_next_matches!( - timeline_stream, - VectorDiff::PushFront { value } => value - ); + assert_let!(Some(VectorDiff::PushBack { value: message }) = timeline_stream.next().await); assert_let!(TimelineItemContent::OtherState(state) = message.as_event().unwrap().content()); assert_eq!(state.state_key(), ""); assert_let!( @@ -729,28 +699,43 @@ async fn test_until_num_items_with_empty_chunk() { assert_eq!(content.name, "New room name"); assert_eq!(prev_content.as_ref().unwrap().name.as_ref().unwrap(), "Old room name"); - let day_divider = assert_next_matches!( - timeline_stream, - VectorDiff::PushFront { value } => value - ); + // Implicit read receipt. + assert_let!(Some(VectorDiff::Set { index: 0, value: message }) = timeline_stream.next().await); + assert_let!(TimelineItemContent::OtherState(_) = message.as_event().unwrap().content()); + + assert_let!(Some(VectorDiff::PushBack { value: message }) = timeline_stream.next().await); + assert_let!(TimelineItemContent::Message(msg) = message.as_event().unwrap().content()); + assert_let!(MessageType::Text(text) = msg.msgtype()); + assert_eq!(text.body, "the world is big"); + + assert_let!(Some(VectorDiff::PushBack { value: message }) = timeline_stream.next().await); + assert_let!(TimelineItemContent::Message(msg) = message.as_event().unwrap().content()); + assert_let!(MessageType::Text(text) = msg.msgtype()); + assert_eq!(text.body, "hello world"); + + assert_let!(Some(VectorDiff::PushFront { value: day_divider }) = timeline_stream.next().await); assert!(day_divider.is_day_divider()); - timeline.live_paginate_backwards(10).await.unwrap(); + let paginate = async { + timeline.live_paginate_backwards(10).await.unwrap(); + }; + let observe_paginating = async { + assert_eq!( + back_pagination_status.next().await, + Some(LiveBackPaginationStatus::Idle { hit_start_of_timeline: true }) + ); + }; + join(paginate, observe_paginating).await; - let message = assert_next_matches!( - timeline_stream, - VectorDiff::PushFront { value } => value + assert_let!( + Some(VectorDiff::Insert { index: 3, value: message }) = timeline_stream.next().await ); assert_let!(TimelineItemContent::Message(msg) = message.as_event().unwrap().content()); assert_let!(MessageType::Text(text) = msg.msgtype()); assert_eq!(text.body, "hello room then"); - let day_divider = assert_next_matches!( - timeline_stream, - VectorDiff::PushFront { value } => value - ); - assert!(day_divider.is_day_divider()); - assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 2 }); + assert_pending!(timeline_stream); + assert_pending!(back_pagination_status); } #[async_test] diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index a24af594094..022559ec1a2 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -25,11 +25,13 @@ use tokio::{ use tracing::{debug, instrument, trace}; use super::{ + super::event_cache::{ + linked_chunk::ChunkContent, store::RoomEvents, EventsOrigin, RoomEventCacheUpdate, + }, paginator::{PaginationResult, Paginator, PaginatorState}, store::Gap, BackPaginationOutcome, Result, RoomEventCacheInner, }; -use crate::event_cache::{linked_chunk::ChunkContent, store::RoomEvents}; #[derive(Debug)] pub(super) struct RoomPaginationData { @@ -206,7 +208,17 @@ impl RoomPagination { trace!("replaced gap with new events from backpagination"); // TODO: implement smarter reconciliation later - //let _ = self.sender.send(RoomEventCacheUpdate::Prepend { events }); + // Send the `VectorDiff`s from the `RoomEvents`. + { + let sync_timeline_event_diffs = room_events.updates_as_vector_diffs(); + + if !sync_timeline_event_diffs.is_empty() { + let _ = self.inner.sender.send(RoomEventCacheUpdate::AddTimelineEvents { + events: sync_timeline_event_diffs, + origin: EventsOrigin::Pagination, + }); + } + } return Ok(Some(BackPaginationOutcome { events, reached_start })); } @@ -245,6 +257,18 @@ impl RoomPagination { } } + // Send the `VectorDiff`s from the `RoomEvents`. + { + let sync_timeline_event_diffs = room_events.updates_as_vector_diffs(); + + if !sync_timeline_event_diffs.is_empty() { + let _ = self.inner.sender.send(RoomEventCacheUpdate::AddTimelineEvents { + events: sync_timeline_event_diffs, + origin: EventsOrigin::Pagination, + }); + } + } + Ok(Some(BackPaginationOutcome { events, reached_start })) } diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index 8cafafbb69e..a6eb87ea600 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -390,6 +390,19 @@ async fn test_backpaginate_once() { assert_event_matches_msg(&events[1], "hello"); assert_eq!(events.len(), 2); + // I'll get all the previous events, as `VectorDiff`. + assert_matches!( + room_stream.recv().await.expect("failed to read `room_stream`"), + RoomEventCacheUpdate::AddTimelineEvents { events, .. } + ); + assert_eq!(events.len(), 2); + + assert_matches!(&events[0], VectorDiff::Insert { index: 0, value: event }); + assert_event_matches_msg(event, "hello"); + + assert_matches!(&events[1], VectorDiff::Insert { index: 1, value: event }); + assert_event_matches_msg(event, "world"); + assert!(room_stream.is_empty()); } @@ -497,8 +510,34 @@ async fn test_backpaginate_many_times_with_many_iterations() { assert_event_matches_msg(&global_events[2], "oh well"); assert_eq!(global_events.len(), 3); + // I'll get all the previous events, as `VectorDiff`. + // … First batch. + assert_matches!( + room_stream.recv().await.expect("failed to read `room_stream`"), + RoomEventCacheUpdate::AddTimelineEvents { events, .. } + ); + assert_eq!(events.len(), 2); + + assert_matches!(&events[0], VectorDiff::Insert { index: 0, value: event }); + assert_event_matches_msg(event, "hello"); + + assert_matches!(&events[1], VectorDiff::Insert { index: 1, value: event }); + assert_event_matches_msg(event, "world"); + + // … Second batch. + assert_matches!( + room_stream.recv().await.expect("failed to read `room_stream`"), + RoomEventCacheUpdate::AddTimelineEvents { events, .. } + ); + assert_eq!(events.len(), 1); + + assert_matches!(&events[0], VectorDiff::Insert { index: 0, value: event }); + assert_event_matches_msg(event, "oh well"); + + assert!(room_stream.is_empty()); + // And next time I'll open the room, I'll get the events in the right order. - let (events, _receiver) = room_event_cache.subscribe().await.unwrap(); + let (events, room_stream) = room_event_cache.subscribe().await.unwrap(); assert_event_matches_msg(&events[0], "oh well"); assert_event_matches_msg(&events[1], "hello"); @@ -506,6 +545,7 @@ async fn test_backpaginate_many_times_with_many_iterations() { assert_event_matches_msg(&events[3], "heyo"); assert_eq!(events.len(), 4); + // And I'll get zero update from the stream. assert!(room_stream.is_empty()); } @@ -617,8 +657,34 @@ async fn test_backpaginate_many_times_with_one_iteration() { assert_event_matches_msg(&global_events[2], "oh well"); assert_eq!(global_events.len(), 3); + // I'll get all the previous events, as `VectorDiff`. + // … First batch. + assert_matches!( + room_stream.recv().await.expect("failed to read `room_stream`"), + RoomEventCacheUpdate::AddTimelineEvents { events, .. } + ); + assert_eq!(events.len(), 2); + + assert_matches!(&events[0], VectorDiff::Insert { index: 0, value: event }); + assert_event_matches_msg(event, "hello"); + + assert_matches!(&events[1], VectorDiff::Insert { index: 1, value: event }); + assert_event_matches_msg(event, "world"); + + // … Second batch. + assert_matches!( + room_stream.recv().await.expect("failed to read `room_stream`"), + RoomEventCacheUpdate::AddTimelineEvents { events, .. } + ); + assert_eq!(events.len(), 1); + + assert_matches!(&events[0], VectorDiff::Insert { index: 0, value: event }); + assert_event_matches_msg(event, "oh well"); + + assert!(room_stream.is_empty()); + // And next time I'll open the room, I'll get the events in the right order. - let (events, _receiver) = room_event_cache.subscribe().await.unwrap(); + let (events, room_stream) = room_event_cache.subscribe().await.unwrap(); assert_event_matches_msg(&events[0], "oh well"); assert_event_matches_msg(&events[1], "hello"); @@ -626,6 +692,7 @@ async fn test_backpaginate_many_times_with_one_iteration() { assert_event_matches_msg(&events[3], "heyo"); assert_eq!(events.len(), 4); + // And I'll get zero update from the stream. assert!(room_stream.is_empty()); } @@ -792,7 +859,7 @@ async fn test_backpaginating_without_token() { let (room_event_cache, _drop_handles) = client.get_room(room_id).unwrap().event_cache().await.unwrap(); - let (events, room_stream) = room_event_cache.subscribe().await.unwrap(); + let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); assert!(events.is_empty()); assert!(room_stream.is_empty()); @@ -823,5 +890,16 @@ async fn test_backpaginating_without_token() { assert_event_matches_msg(&events[0], "hi"); assert_eq!(events.len(), 1); + // I'll get all the previous events, as `VectorDiff`. + // … First batch. + assert_matches!( + room_stream.recv().await.expect("failed to read `room_stream`"), + RoomEventCacheUpdate::AddTimelineEvents { events, .. } + ); + assert_eq!(events.len(), 1); + + assert_matches!(&events[0], VectorDiff::Append { values: sub_events }); + assert_event_matches_msg(&sub_events[0], "hi"); + assert!(room_stream.is_empty()); }