diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index 5f3fba30100..13dc36bc6e9 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -789,6 +789,7 @@ impl TimelineController

{ txn_id, send_handle, content, + &self.settings, ) .await; } diff --git a/crates/matrix-sdk-ui/src/timeline/controller/state.rs b/crates/matrix-sdk-ui/src/timeline/controller/state.rs index 7c9fec59dfb..e1d3e51ff48 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/state.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/state.rs @@ -223,6 +223,7 @@ impl TimelineState { txn_id: OwnedTransactionId, send_handle: Option, content: TimelineEventKind, + settings: &TimelineSettings, ) { let ctx = TimelineEventContext { sender: own_user_id, @@ -240,7 +241,7 @@ impl TimelineState { let mut date_divider_adjuster = DateDividerAdjuster::new(date_divider_mode); - TimelineEventHandler::new(&mut txn, ctx) + TimelineEventHandler::new(&mut txn, ctx, settings) .handle_event(&mut date_divider_adjuster, content) .await; @@ -744,14 +745,16 @@ impl TimelineStateTransaction<'_> { }; // Handle the event to create or update a timeline item. - TimelineEventHandler::new(self, ctx).handle_event(date_divider_adjuster, event_kind).await + TimelineEventHandler::new(self, ctx, settings) + .handle_event(date_divider_adjuster, event_kind) + .await } /// Remove one timeline item by its `event_index`. fn remove_timeline_item( &mut self, event_index: usize, - day_divider_adjuster: &mut DayDividerAdjuster, + day_divider_adjuster: &mut DateDividerAdjuster, ) { day_divider_adjuster.mark_used(); diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index c180f4aee6e..5639a1430f1 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -46,13 +46,14 @@ use ruma::{ }, serde::Raw, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, + TransactionId, }; use tracing::{debug, error, field::debug, info, instrument, trace, warn}; use super::{ controller::{ ObservableItemsTransaction, ObservableItemsTransactionEntry, PendingEdit, PendingEditKind, - TimelineMetadata, TimelineStateTransaction, + TimelineMetadata, TimelineSettings, TimelineStateTransaction, }, date_dividers::DateDividerAdjuster, event_item::{ @@ -337,15 +338,17 @@ pub(super) struct TimelineEventHandler<'a, 'o> { meta: &'a mut TimelineMetadata, ctx: TimelineEventContext, result: HandleEventResult, + settings: &'a TimelineSettings, } impl<'a, 'o> TimelineEventHandler<'a, 'o> { pub(super) fn new( state: &'a mut TimelineStateTransaction<'o>, ctx: TimelineEventContext, + settings: &'a TimelineSettings, ) -> Self { let TimelineStateTransaction { items, meta, .. } = state; - Self { items, meta, ctx, result: HandleEventResult::default() } + Self { items, meta, ctx, result: HandleEventResult::default(), settings } } /// Handle an event. @@ -1097,28 +1100,47 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { Flow::Local { .. } => { trace!("Adding new local timeline item"); - let item = self.meta.new_timeline_item(item); + let item = Self::new_timeline_item(&mut self.meta, item, None); + self.items.push_back(item, None); } - Flow::Remote { position: TimelineItemPosition::Start { .. }, event_id, .. } => { - if self - .items - .iter() - .filter_map(|ev| ev.as_event()?.event_id()) - .any(|id| id == event_id) - { - trace!("Skipping back-paginated event that has already been seen"); - return; - } + Flow::Remote { + position: TimelineItemPosition::Start { .. }, event_id, txn_id, .. + } => { + let removed_duplicated_timeline_item = Self::deduplicate_local_timeline_item( + &mut self.items, + &mut item, + Some(&event_id), + txn_id.as_ref().map(AsRef::as_ref), + &self.meta, + &self.settings, + ); + let item = + Self::new_timeline_item(&mut self.meta, item, removed_duplicated_timeline_item); trace!("Adding new remote timeline item at the start"); - let item = self.meta.new_timeline_item(item); self.items.push_front(item, Some(0)); } - Flow::Remote { position: TimelineItemPosition::At { event_index, .. }, .. } => { + Flow::Remote { + position: TimelineItemPosition::At { event_index, .. }, + event_id, + txn_id, + .. + } => { + let removed_duplicated_timeline_item = Self::deduplicate_local_timeline_item( + &mut self.items, + &mut item, + Some(&event_id), + txn_id.as_ref().map(AsRef::as_ref), + &self.meta, + &self.settings, + ); + let item = + Self::new_timeline_item(&mut self.meta, item, removed_duplicated_timeline_item); + let event_index = *event_index; let timeline_item_index = self .items @@ -1163,69 +1185,22 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { "Adding new remote timeline at specific event index" ); - let item = self.meta.new_timeline_item(item); self.items.insert(timeline_item_index, item, Some(event_index)); } Flow::Remote { - position: TimelineItemPosition::End { .. }, txn_id, event_id, .. + position: TimelineItemPosition::End { .. }, event_id, txn_id, .. } => { - // Look if we already have a corresponding item somewhere, based on the - // transaction id (if a local echo) or the event id (if a - // duplicate remote event). - let result = rfind_event_item(self.items, |it| { - txn_id.is_some() && it.transaction_id() == txn_id.as_deref() - || it.event_id() == Some(event_id) - }); - - let mut removed_event_item_id = None; - - if let Some((idx, old_item)) = result { - if old_item.as_remote().is_some() { - // Item was previously received from the server. This should be very rare - // normally, but with the sliding- sync proxy, it is actually very - // common. - // NOTE: SS proxy workaround. - trace!(?item, old_item = ?*old_item, "Received duplicate event"); - - if old_item.content.is_redacted() && !item.content.is_redacted() { - warn!("Got original form of an event that was previously redacted"); - item.content = item.content.redact(&self.meta.room_version); - item.reactions.clear(); - } - } - - // TODO: Check whether anything is different about the - // old and new item? - - transfer_details(&mut item, &old_item); - - let old_item_id = old_item.internal_id; - - if idx == self.items.len() - 1 { - // If the old item is the last one and no date divider - // changes need to happen, replace and return early. - trace!(idx, "Replacing existing event"); - self.items.replace(idx, TimelineItem::new(item, old_item_id.to_owned())); - return; - } - - // In more complex cases, remove the item before re-adding the item. - trace!("Removing local echo or duplicate timeline item"); - removed_event_item_id = Some(self.items.remove(idx).internal_id.clone()); - - // no return here, below code for adding a new event - // will run to re-add the removed item - } - - trace!("Adding new remote timeline item after all non-pending events"); - let new_item = match removed_event_item_id { - // If a previous version of the same item (usually a local - // echo) was removed and we now need to add it again, reuse - // the previous item's ID. - Some(id) => TimelineItem::new(item, id), - None => self.meta.new_timeline_item(item), - }; + let removed_duplicated_timeline_item = Self::deduplicate_local_timeline_item( + &mut self.items, + &mut item, + Some(&event_id), + txn_id.as_ref().map(AsRef::as_ref), + &self.meta, + &self.settings, + ); + let item = + Self::new_timeline_item(&mut self.meta, item, removed_duplicated_timeline_item); // Local events are always at the bottom. Let's find the latest remote event // and insert after it, otherwise, if there is no remote event, insert at 0. @@ -1263,16 +1238,16 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { if timeline_item_index == self.items.len() { trace!("Adding new remote timeline item at the back"); - self.items.push_back(new_item, event_index); + self.items.push_back(item, event_index); } else if timeline_item_index == 0 { trace!("Adding new remote timeline item at the front"); - self.items.push_front(new_item, event_index); + self.items.push_front(item, event_index); } else { trace!( timeline_item_index, "Adding new remote timeline item at specific index" ); - self.items.insert(timeline_item_index, new_item, event_index); + self.items.insert(timeline_item_index, item, event_index); } } @@ -1297,6 +1272,128 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { } } + /// Remove the local timeline item matching the `event_id` or the + /// `transaction_id` of `new_event_timeline_item` if it exists. + /// + /// Let's also try to deduplicate remote evevents. If `VectorDiff`s are the + /// inputs of the `Timeline`, this is not necessary, as they are + /// generated by the `EventCache`, which supports its own deduplication + /// algorithm. + // + // Note: this method doesn't take `&mut self` to avoid a borrow checker + // conflict with `TimelineEventHandler::add_item`. It's fine. + fn deduplicate_local_timeline_item( + items: &mut ObservableItemsTransaction<'_>, + new_event_timeline_item: &mut EventTimelineItem, + event_id: Option<&EventId>, + transaction_id: Option<&TransactionId>, + metadata: &TimelineMetadata, + settings: &TimelineSettings, + ) -> Option> { + /// Transfer `TimelineDetails` that weren't available on the original + /// item and have been fetched separately (only `reply_to` for + /// now) from `old_item` to `item`, given two items for an event + /// that was re-received. + /// + /// `old_item` *should* always be a local timeline item usually, but it + /// can be a remote timeline item. + fn transfer_details(new_item: &mut EventTimelineItem, old_item: &EventTimelineItem) { + let TimelineItemContent::Message(msg) = &mut new_item.content else { return }; + let TimelineItemContent::Message(old_msg) = &old_item.content else { return }; + + let Some(in_reply_to) = &mut msg.in_reply_to else { return }; + let Some(old_in_reply_to) = &old_msg.in_reply_to else { return }; + + if matches!(&in_reply_to.event, TimelineDetails::Unavailable) { + in_reply_to.event = old_in_reply_to.event.clone(); + } + } + + // Start with the canonical case: detect a local timeline item that matches + // `event_id` or `transaction_id`. + if let Some((local_timeline_item_index, local_timeline_item)) = + rfind_event_item(items, |event_timeline_item| { + if event_timeline_item.is_local_echo() { + event_id == event_timeline_item.event_id() + || (transaction_id.is_some() + && transaction_id == event_timeline_item.transaction_id()) + } else { + false + } + }) + { + trace!( + ?event_id, + ?transaction_id, + ?local_timeline_item_index, + "Removing local timeline item" + ); + + transfer_details(new_event_timeline_item, &local_timeline_item); + + // Remove the local timeline item. + return Some(items.remove(local_timeline_item_index)); + }; + + if !settings.vectordiffs_as_inputs { + if let Some((remote_timeline_item_index, remote_timeline_item)) = + rfind_event_item(items, |event_timeline_item| { + if event_timeline_item.is_remote_event() { + event_id == event_timeline_item.event_id() + } else { + false + } + }) + { + trace!( + ?event_id, + ?transaction_id, + ?remote_timeline_item_index, + "Removing remote timeline item (it is a duplicate)" + ); + + if remote_timeline_item.content.is_redacted() + && !new_event_timeline_item.content.is_redacted() + { + warn!("Got original form of an event that was previously redacted"); + new_event_timeline_item.content = + new_event_timeline_item.content.redact(&metadata.room_version); + new_event_timeline_item.reactions.clear(); + } + + transfer_details(new_event_timeline_item, &remote_timeline_item); + + // Remove the remote timeline item. + return Some(items.remove(remote_timeline_item_index)); + } + } + + None + } + + /// Create a new timeline item from an [`EventTimelineItem`]. + /// + /// It is possible that the new timeline item replaces a duplicated timeline + /// event (see [`TimelineEventHandler::deduplicate_local_timeline_item`]) in + /// case it replaces a local timeline item. + // + // Note: this method doesn't take `&mut self` to avoid a borrow checker + // conflict with `Self::add_item`. It's fine. + fn new_timeline_item( + metadata: &mut TimelineMetadata, + event_timeline_item: EventTimelineItem, + replaced_timeline_item: Option>, + ) -> Arc { + match replaced_timeline_item { + // Reuse the internal ID. + Some(to_replace_timeline_item) => { + TimelineItem::new(event_timeline_item, to_replace_timeline_item.internal_id.clone()) + } + + None => metadata.new_timeline_item(event_timeline_item), + } + } + /// After updating the timeline item `new_item` which id is /// `target_event_id`, update other items that are responses to this item. fn maybe_update_responses( @@ -1355,21 +1452,3 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { }) } } - -/// Transfer `TimelineDetails` that weren't available on the original item and -/// have been fetched separately (only `reply_to` for now) from `old_item` to -/// `item`, given two items for an event that was re-received. -/// -/// `old_item` *should* always be a local echo usually, but with the sliding -/// sync proxy, we often re-receive remote events that aren't remote echoes. -fn transfer_details(item: &mut EventTimelineItem, old_item: &EventTimelineItem) { - let TimelineItemContent::Message(msg) = &mut item.content else { return }; - let TimelineItemContent::Message(old_msg) = &old_item.content else { return }; - - let Some(in_reply_to) = &mut msg.in_reply_to else { return }; - let Some(old_in_reply_to) = &old_msg.in_reply_to else { return }; - - if matches!(&in_reply_to.event, TimelineDetails::Unavailable) { - in_reply_to.event = old_in_reply_to.event.clone(); - } -} diff --git a/crates/matrix-sdk-ui/src/timeline/tests/echo.rs b/crates/matrix-sdk-ui/src/timeline/tests/echo.rs index 533c1c2c56d..683f643ec85 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/echo.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/echo.rs @@ -24,7 +24,7 @@ use ruma::{ events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent}, user_id, MilliSecondsSinceUnixEpoch, }; -use stream_assert::assert_next_matches; +use stream_assert::{assert_next_matches, assert_pending}; use super::TestTimeline; use crate::timeline::{ @@ -121,9 +121,18 @@ async fn test_remote_echo_full_trip() { .await; // The local echo is replaced with the remote echo. - let item = assert_next_matches!(stream, VectorDiff::Set { index: 1, value } => value); + assert_next_matches!(stream, VectorDiff::Remove { index: 1 }); + let item = assert_next_matches!(stream, VectorDiff::PushFront { value } => value); assert!(!item.as_event().unwrap().is_local_echo()); assert_eq!(*item.unique_id(), id); + + // The date divider is adjusted. + // A new date divider is inserted, and the older one is removed. + let date_divider = assert_next_matches!(stream, VectorDiff::PushFront { value } => value); + assert!(date_divider.is_date_divider()); + assert_next_matches!(stream, VectorDiff::Remove { index: 2 }); + + assert_pending!(stream); } #[async_test] @@ -168,12 +177,14 @@ async fn test_remote_echo_new_position() { .await; // … the remote echo replaces the previous event. - let item = assert_next_matches!(stream, VectorDiff::Set { index: 3, value } => value); + assert_next_matches!(stream, VectorDiff::Remove { index: 3 }); + let item = assert_next_matches!(stream, VectorDiff::Insert { index: 2, value} => value); assert!(!item.as_event().unwrap().is_local_echo()); - // … the date divider is removed (because both bob's and alice's message are - // from the same day according to server timestamps). - assert_next_matches!(stream, VectorDiff::Remove { index: 2 }); + // Date divider is updated. + assert_next_matches!(stream, VectorDiff::Remove { index: 3 }); + + assert_pending!(stream); } #[async_test] diff --git a/crates/matrix-sdk-ui/src/timeline/tests/shields.rs b/crates/matrix-sdk-ui/src/timeline/tests/shields.rs index b87177e4ed0..c7f50c26731 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/shields.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/shields.rs @@ -14,7 +14,7 @@ use ruma::{ AnyMessageLikeEventContent, }, }; -use stream_assert::assert_next_matches; +use stream_assert::{assert_next_matches, assert_pending}; use crate::timeline::{tests::TestTimeline, EventSendState}; @@ -108,7 +108,8 @@ async fn test_local_sent_in_clear_shield() { "type": "m.room.message", }))) .await; - let item = assert_next_matches!(stream, VectorDiff::Set { index: 1, value } => value); + assert_next_matches!(stream, VectorDiff::Remove { index: 1 }); + let item = assert_next_matches!(stream, VectorDiff::PushFront { value } => value); let event_item = item.as_event().unwrap(); // Then the remote echo should now be showing the shield. @@ -118,6 +119,13 @@ async fn test_local_sent_in_clear_shield() { shield, Some(ShieldState::Red { code: ShieldStateCode::SentInClear, message: "Not encrypted." }) ); + + // Date divider is adjusted. + let date_divider = assert_next_matches!(stream, VectorDiff::PushFront { value } => value); + assert!(date_divider.is_date_divider()); + assert_next_matches!(stream, VectorDiff::Remove { index: 2 }); + + assert_pending!(stream); } #[async_test] diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs index f00ad135d05..2d715963ab4 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs @@ -121,16 +121,18 @@ async fn test_echo() { server.reset().await; // Local echo is replaced with the remote echo. + assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 1 }); let remote_echo = - assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => value); + assert_next_matches!(timeline_stream, VectorDiff::PushFront { value } => value); let item = remote_echo.as_event().unwrap(); assert!(item.is_own()); assert_eq!(item.timestamp(), MilliSecondsSinceUnixEpoch(uint!(152038280))); // The date divider is also replaced. let date_divider = - assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => value); + assert_next_matches!(timeline_stream, VectorDiff::PushFront { value } => value); assert!(date_divider.is_date_divider()); + assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 2 }); } #[async_test] diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs index 32a11923dfc..d9d27689180 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs @@ -508,20 +508,21 @@ async fn test_no_duplicate_date_divider() { assert_eq!(value.event_id().unwrap(), "$PyHxV5mYzjetBUT3qZq7V95GOzxb02EP"); }); - // The second message is replaced -> [First DD Second] - assert_next_matches!(timeline_stream, VectorDiff::Set { index: 2, value } => { + // The second message is replaced -> [First Second DD] + assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 2 }); + assert_next_matches!(timeline_stream, VectorDiff::Insert { index: 1, value } => { let value = value.as_event().unwrap(); assert_eq!(value.content().as_message().unwrap().body(), "Second."); assert_eq!(value.event_id().unwrap(), "$5E2kLK/Sg342bgBU9ceEIEPYpbFaqJpZ"); }); - // A new date divider is inserted -> [DD First DD Second] + // A new date divider is inserted -> [DD First Second DD] assert_next_matches!(timeline_stream, VectorDiff::PushFront { value } => { assert!(value.is_date_divider()); }); // The useless date divider is removed. -> [DD First Second] - assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 2 }); + assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 3 }); assert_pending!(timeline_stream); } diff --git a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs index 531ca7a1d21..af62c8bc591 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs @@ -37,6 +37,7 @@ use matrix_sdk_ui::timeline::{ EventSendState, ReactionStatus, RoomExt, TimelineItem, TimelineItemContent, }; use similar_asserts::assert_eq; +use stream_assert::assert_pending; use tokio::{ spawn, task::JoinHandle, @@ -270,22 +271,42 @@ async fn test_stale_local_echo_time_abort_edit() { // - or the remote echo comes up faster. // // Handle both orderings. - while let Ok(Some(vector_diff)) = timeout(Duration::from_secs(3), stream.next()).await { - let VectorDiff::Set { index: 0, value: echo } = vector_diff else { - panic!("unexpected diff: {vector_diff:#?}"); - }; + { + let mut diffs = Vec::with_capacity(3); + + while let Ok(Some(vector_diff)) = timeout(Duration::from_secs(5), stream.next()).await { + diffs.push(vector_diff); + } + + assert!(diffs.len() >= 3); + + for diff in diffs { + match diff { + VectorDiff::Set { index: 0, value: event } + | VectorDiff::PushBack { value: event } + | VectorDiff::Insert { index: 0, value: event } => { + if event.is_local_echo() { + // If the sender profile wasn't available, we may receive an update about + // it; ignore it. + if !has_sender_profile && event.sender_profile().is_ready() { + has_sender_profile = true; + continue; + } + + assert_matches!(event.send_state(), Some(EventSendState::Sent { .. })); + } + + assert!(event.is_editable()); + assert_eq!(event.content().as_message().unwrap().body(), "hi!"); + } + + VectorDiff::Remove { index } => assert_eq!(index, 0), - if echo.is_local_echo() { - // If the sender profile wasn't available, we may receive an update about it; - // ignore it. - if !has_sender_profile && echo.sender_profile().is_ready() { - has_sender_profile = true; - continue; + diff => { + panic!("unexpected diff: {diff:?}"); + } } - assert_matches!(echo.send_state(), Some(EventSendState::Sent { .. })); } - assert!(echo.is_editable()); - assert_eq!(echo.content().as_message().unwrap().body(), "hi!"); } // Now do a crime: try to edit the local echo. @@ -306,6 +327,8 @@ async fn test_stale_local_echo_time_abort_edit() { assert_eq!(remote_echo.content().as_message().unwrap().body(), "bonjour"); alice_sync.abort(); + + assert_pending!(stream); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)]