From 4289d0df011c63396bf1c92f104b8ec503e7627f Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 26 Aug 2024 12:20:58 +0200 Subject: [PATCH] timeline: add support for local reactions to local echoes --- bindings/matrix-sdk-ffi/src/timeline/mod.rs | 6 +- .../src/timeline/event_item/mod.rs | 6 +- .../matrix-sdk-ui/src/timeline/inner/mod.rs | 176 +++++++++++++---- crates/matrix-sdk-ui/src/timeline/mod.rs | 9 +- .../matrix-sdk-ui/src/timeline/tests/mod.rs | 22 ++- .../src/timeline/tests/reactions.rs | 52 +++-- crates/matrix-sdk-ui/src/timeline/util.rs | 36 +++- .../tests/integration/timeline/reactions.rs | 177 +++++++++++++++--- crates/matrix-sdk/src/send_queue.rs | 5 + .../src/tests/timeline.rs | 30 +-- 10 files changed, 396 insertions(+), 123 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/timeline/mod.rs b/bindings/matrix-sdk-ffi/src/timeline/mod.rs index d1def3c444c..a1da8772a6f 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/mod.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/mod.rs @@ -45,7 +45,6 @@ use ruma::{ }, }, receipt::ReceiptThread, - relation::Annotation, room::message::{ ForwardThread, LocationMessageEventContent, MessageType, RoomMessageEventContentWithoutRelation, @@ -538,9 +537,8 @@ impl Timeline { let _ = self.send(Arc::new(room_message_event_content)).await; } - pub async fn toggle_reaction(&self, event_id: String, key: String) -> Result<(), ClientError> { - let event_id = EventId::parse(event_id)?; - self.inner.toggle_reaction(&Annotation::new(event_id, key)).await?; + pub async fn toggle_reaction(&self, unique_id: String, key: String) -> Result<(), ClientError> { + self.inner.toggle_reaction(&unique_id, &key).await?; Ok(()) } diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs index 52abb9766e0..3a8e7033d10 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs @@ -21,7 +21,7 @@ use as_variant::as_variant; use indexmap::IndexMap; use matrix_sdk::{ deserialized_responses::{EncryptionInfo, ShieldState}, - send_queue::SendHandle, + send_queue::{SendHandle, SendReactionHandle}, Client, Error, }; use matrix_sdk_base::{ @@ -625,6 +625,10 @@ pub enum EventItemOrigin { /// What's the status of a reaction? #[derive(Clone, Debug)] pub enum ReactionStatus { + /// It's a local reaction to a local echo. + /// + /// The handle is missing only in testing contexts. + LocalToLocal(Option), /// It's a local reaction to a remote event. /// /// The handle is missing only in testing contexts. diff --git a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs index 1b1e81262d5..adf46997b22 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs @@ -26,7 +26,9 @@ use matrix_sdk::crypto::OlmMachine; use matrix_sdk::{ deserialized_responses::SyncTimelineEvent, event_cache::{paginator::Paginator, RoomEventCache}, - send_queue::{LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle}, + send_queue::{ + LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle, + }, Result, Room, }; #[cfg(test)] @@ -45,7 +47,8 @@ use ruma::{ AnySyncTimelineEvent, MessageLikeEventType, }, serde::Raw, - EventId, OwnedEventId, OwnedTransactionId, RoomVersionId, TransactionId, UserId, + EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, RoomVersionId, + TransactionId, UserId, }; use tokio::sync::{RwLock, RwLockWriteGuard}; use tracing::{debug, error, field::debug, info, instrument, trace, warn}; @@ -64,14 +67,17 @@ use super::{ traits::RoomDataProvider, util::{rfind_event_by_id, rfind_event_item, RelativePosition}, Error, EventSendState, EventTimelineItem, InReplyToDetails, Message, PaginationError, Profile, - RepliedToEvent, TimelineDetails, TimelineEventItemId, TimelineFocus, TimelineItem, - TimelineItemContent, TimelineItemKind, + ReactionInfo, RepliedToEvent, TimelineDetails, TimelineEventItemId, TimelineFocus, + TimelineItem, TimelineItemContent, TimelineItemKind, }; use crate::{ timeline::{ day_dividers::DayDividerAdjuster, event_handler::LiveTimelineUpdatesAllowed, + event_item::EventTimelineItemKind, pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError}, + reactions::FullReactionKey, + util::rfind_event_by_uid, TimelineEventFilterFn, }, unable_to_decrypt_hook::UtdHookManager, @@ -416,7 +422,7 @@ impl TimelineInner

{ pub(super) async fn subscribe( &self, - ) -> (Vector>, impl Stream>>) { + ) -> (Vector>, impl Stream>> + Send) { trace!("Creating timeline items signal"); let state = self.state.read().await; (state.items.clone(), state.items.subscribe().into_stream()) @@ -448,49 +454,79 @@ impl TimelineInner

{ #[instrument(skip_all)] pub(super) async fn toggle_reaction_local( &self, - annotation: &Annotation, + unique_id: &str, + key: &str, ) -> Result { let mut state = self.state.write().await; - let user_id = self.room_data_provider.own_user_id(); - - let Some((item_pos, item)) = rfind_event_by_id(&state.items, &annotation.event_id) else { + let Some((item_pos, item)) = rfind_event_by_uid(&state.items, unique_id) else { warn!("Timeline item not found, can't add reaction"); return Err(Error::FailedToToggleReaction); }; + let user_id = self.room_data_provider.own_user_id(); let prev_status = item .reactions() - .get(&annotation.key) + .get(key) .and_then(|group| group.get(user_id)) .map(|reaction_info| reaction_info.status.clone()); let Some(prev_status) = prev_status else { - // Add a reaction through the room data provider. - // No need to reflect the effect locally, since the local echo handling will - // take care of it. - trace!("adding a new reaction"); - self.room_data_provider - .send(ReactionEventContent::from(annotation.clone()).into()) - .await?; - return Ok(true); + match &item.inner.kind { + EventTimelineItemKind::Local(local) => { + if let Some(send_handle) = local.send_handle.clone() { + if send_handle + .react(key.to_owned()) + .await + .map_err(|err| Error::SendQueueError(err.into()))? + .is_some() + { + trace!("adding a reaction to a local echo"); + return Ok(true); + } + + warn!("couldn't toggle reaction for local echo"); + return Ok(false); + } + + warn!("missing send handle for local echo; is this a test?"); + return Ok(false); + } + + EventTimelineItemKind::Remote(remote) => { + // Add a reaction through the room data provider. + // No need to reflect the effect locally, since the local echo handling will + // take care of it. + trace!("adding a reaction to a remote echo"); + let annotation = Annotation::new(remote.event_id.to_owned(), key.to_owned()); + self.room_data_provider + .send(ReactionEventContent::from(annotation).into()) + .await?; + return Ok(true); + } + } }; trace!("removing a previous reaction"); match prev_status { - ReactionStatus::LocalToRemote(send_handle) => { - // No need to keep the lock. - drop(state); + ReactionStatus::LocalToLocal(send_reaction_handle) => { + if let Some(handle) = send_reaction_handle { + if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? { + // Impossible state: the reaction has moved from local to echo under our + // feet, but the timeline was supposed to be locked! + warn!("unexpectedly unable to abort sending of local reaction"); + } + } else { + warn!("no send reaction handle (this should only happen in testing contexts)"); + } + } + ReactionStatus::LocalToRemote(send_handle) => { // No need to reflect the change ourselves, since handling the discard of the // local echo will take care of it. trace!("aborting send of the previous reaction that was a local echo"); - if let Some(send_handle) = send_handle { - if !send_handle - .abort() - .await - .map_err(|err| Error::SendQueueError(err.into()))? - { + if let Some(handle) = send_handle { + if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? { // Impossible state: the reaction has moved from local to echo under our // feet, but the timeline was supposed to be locked! warn!("unexpectedly unable to abort sending of local reaction"); @@ -502,8 +538,15 @@ impl TimelineInner

{ ReactionStatus::RemoteToRemote(event_id) => { // Assume the redaction will work; we'll re-add the reaction if it didn't. + let Some(annotated_event_id) = + item.as_remote().map(|event_item| event_item.event_id.clone()) + else { + warn!("remote reaction to remote event, but the associated item isn't remote"); + return Ok(false); + }; + let mut reactions = item.reactions().clone(); - let reaction_info = reactions.remove_reaction(user_id, &annotation.key); + let reaction_info = reactions.remove_reaction(user_id, key); if reaction_info.is_some() { let new_item = item.with_reactions(reactions); @@ -522,12 +565,12 @@ impl TimelineInner

{ let mut state = self.state.write().await; if let Some((item_pos, item)) = - rfind_event_by_id(&state.items, &annotation.event_id) + rfind_event_by_id(&state.items, &annotated_event_id) { // Re-add the reaction to the mapping. let mut reactions = item.reactions().clone(); reactions - .entry(annotation.key.to_owned()) + .entry(key.to_owned()) .or_default() .insert(user_id.to_owned(), reaction_info); let new_item = item.with_reactions(reactions); @@ -756,6 +799,27 @@ impl TimelineInner

{ error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent"); } + // If the event had local reactions, upgrade the mapping from reaction to + // events, to indicate that the event is now remote. + if let Some(new_event_id) = new_event_id { + let reactions = item.reactions(); + for (_key, by_user) in reactions.iter() { + for (_user_id, info) in by_user.iter() { + if let ReactionStatus::LocalToLocal(Some(reaction_handle)) = &info.status { + let reaction_txn_id = reaction_handle.transaction_id().to_owned(); + if let Some(found) = txn + .meta + .reactions + .map + .get_mut(&TimelineEventItemId::TransactionId(reaction_txn_id)) + { + found.item = TimelineEventItemId::EventId(new_event_id.to_owned()); + } + } + } + } + } + let new_item = item.with_inner_kind(local_item.with_send_state(send_state)); txn.items.set(idx, new_item); @@ -790,10 +854,8 @@ impl TimelineInner

{ state.meta.reactions.map.remove(&TimelineEventItemId::TransactionId(txn_id.to_owned())) { let item = match &full_key.item { - TimelineEventItemId::TransactionId(_) => { - // TODO(bnjbvr): reactions on local echoes - warn!("reactions on local echoes are NYI"); - return false; + TimelineEventItemId::TransactionId(txn_id) => { + rfind_event_item(&state.items, |item| item.transaction_id() == Some(txn_id)) } TimelineEventItemId::EventId(event_id) => rfind_event_by_id(&state.items, event_id), }; @@ -1181,11 +1243,55 @@ impl TimelineInner

{ } LocalEchoContent::React { key, send_handle, applies_to } => { - todo!(); + self.handle_local_reaction(key, send_handle, applies_to).await; } } } + /// Adds a reaction (local echo) to a local echo. + #[instrument(skip(self, send_handle))] + async fn handle_local_reaction( + &self, + reaction_key: String, + send_handle: SendReactionHandle, + applies_to: OwnedTransactionId, + ) { + let mut state = self.state.write().await; + + let Some((item_pos, item)) = + rfind_event_item(&state.items, |item| item.transaction_id() == Some(&applies_to)) + else { + warn!("Local item not found anymore."); + return; + }; + + let user_id = self.room_data_provider.own_user_id(); + + let reaction_txn_id = send_handle.transaction_id().to_owned(); + let reaction_info = ReactionInfo { + timestamp: MilliSecondsSinceUnixEpoch::now(), + status: ReactionStatus::LocalToLocal(Some(send_handle)), + }; + + let mut reactions = item.reactions().clone(); + let by_user = reactions.entry(reaction_key.clone()).or_default(); + by_user.insert(user_id.to_owned(), reaction_info); + + trace!("Adding local reaction to local echo"); + let new_item = item.with_reactions(reactions); + state.items.set(item_pos, new_item); + + // Add it to the reaction map, so we can discard it later if needs be. + state.meta.reactions.map.insert( + TimelineEventItemId::TransactionId(reaction_txn_id), + FullReactionKey { + item: TimelineEventItemId::TransactionId(applies_to), + key: reaction_key, + sender: user_id.to_owned(), + }, + ); + } + /// Handle a single room send queue update. pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) { match update { diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index b4472e1560a..df5d0e6b2f6 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -41,7 +41,6 @@ use ruma::{ UnstablePollStartEventContent, }, receipt::{Receipt, ReceiptThread}, - relation::Annotation, room::{ message::{ AddMentions, ForwardThread, OriginalRoomMessageEvent, RoomMessageEventContent, @@ -548,17 +547,17 @@ impl Timeline { Ok(()) } - /// Toggle a reaction on an event + /// Toggle a reaction on an event. /// /// Adds or redacts a reaction based on the state of the reaction at the /// time it is called. /// - /// When redacting an event, the redaction reason is not sent. + /// When redacting a previous reactino, the redaction reason is not set. /// /// Ensures that only one reaction is sent at a time to avoid race /// conditions and spamming the homeserver with requests. - pub async fn toggle_reaction(&self, annotation: &Annotation) -> Result<(), Error> { - self.inner.toggle_reaction_local(annotation).await?; + pub async fn toggle_reaction(&self, unique_id: &str, reaction_key: &str) -> Result<(), Error> { + self.inner.toggle_reaction_local(unique_id, reaction_key).await?; Ok(()) } diff --git a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs index d38efaf93c8..971e0c98063 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs @@ -257,10 +257,24 @@ impl TestTimeline { self.inner.handle_read_receipts(ev_content).await; } - async fn toggle_reaction_local(&self, annotation: &Annotation) -> Result<(), super::Error> { - if self.inner.toggle_reaction_local(annotation).await? { - // Fake a local echo, for new reactions. - self.handle_local_event(ReactionEventContent::new(annotation.clone()).into()).await; + async fn toggle_reaction_local(&self, unique_id: &str, key: &str) -> Result<(), super::Error> { + if self.inner.toggle_reaction_local(unique_id, key).await? { + // TODO(bnjbvr): hacky? + if let Some(event_id) = self + .inner + .items() + .await + .iter() + .rfind(|item| item.unique_id() == unique_id) + .and_then(|item| item.as_event()?.as_remote()) + .map(|event_item| event_item.event_id.clone()) + { + // Fake a local echo, for new reactions. + self.handle_local_event( + ReactionEventContent::new(Annotation::new(event_id, key.to_owned())).into(), + ) + .await; + } } Ok(()) } diff --git a/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs b/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs index 1a6d4e4de60..2c2f514dfcb 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs @@ -21,9 +21,8 @@ use futures_util::{FutureExt as _, StreamExt as _}; use matrix_sdk::{deserialized_responses::SyncTimelineEvent, test_utils::events::EventFactory}; use matrix_sdk_test::{async_test, sync_timeline_event, ALICE, BOB}; use ruma::{ - event_id, - events::{relation::Annotation, AnyMessageLikeEventContent}, - server_name, uint, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, + event_id, events::AnyMessageLikeEventContent, server_name, uint, EventId, + MilliSecondsSinceUnixEpoch, OwnedEventId, }; use stream_assert::assert_next_matches; use tokio::time::timeout; @@ -70,7 +69,9 @@ macro_rules! assert_reaction_is_updated { let reactions = event.reactions().get(&REACTION_KEY.to_owned()).unwrap(); let reaction = reactions.get(*ALICE).unwrap(); match reaction.status { - ReactionStatus::LocalToRemote(_) => assert!(!$is_remote_echo), + ReactionStatus::LocalToRemote(_) | ReactionStatus::LocalToLocal(_) => { + assert!(!$is_remote_echo) + } ReactionStatus::RemoteToRemote(_) => assert!($is_remote_echo), }; event @@ -82,10 +83,7 @@ async fn test_add_reaction_on_non_existent_event() { let timeline = TestTimeline::new(); let mut stream = timeline.subscribe().await; - let event_id = EventId::new(server_name!("example.org")); // non existent event - let reaction = create_annotation(&event_id); - - timeline.toggle_reaction_local(&reaction).await.unwrap_err(); + timeline.toggle_reaction_local("nonexisting_unique_id", REACTION_KEY).await.unwrap_err(); assert!(stream.next().now_or_never().is_none()); } @@ -94,11 +92,10 @@ async fn test_add_reaction_on_non_existent_event() { async fn test_add_reaction_success() { let timeline = TestTimeline::new(); let mut stream = timeline.subscribe().await; - let (event_id, item_pos) = send_first_message(&timeline, &mut stream).await; - let reaction = create_annotation(&event_id); + let (msg_uid, event_id, item_pos) = send_first_message(&timeline, &mut stream).await; // If I toggle a reaction on an event which didn't have any… - timeline.toggle_reaction_local(&reaction).await.unwrap(); + timeline.toggle_reaction_local(&msg_uid, REACTION_KEY).await.unwrap(); // The timeline item is updated, with a local echo for the reaction. assert_reaction_is_updated!(stream, &event_id, item_pos, false); @@ -126,22 +123,22 @@ async fn test_redact_reaction_success() { let f = &timeline.factory; let mut stream = timeline.subscribe().await; - let (msg_id, item_pos) = send_first_message(&timeline, &mut stream).await; - let reaction = create_annotation(&msg_id); + let (msg_uid, event_id, item_pos) = send_first_message(&timeline, &mut stream).await; // A reaction is added by sync. let reaction_id = event_id!("$reaction_id"); timeline .handle_live_event( - f.reaction(&msg_id, REACTION_KEY.to_owned()).sender(&ALICE).event_id(reaction_id), + f.reaction(&event_id, REACTION_KEY.to_owned()).sender(&ALICE).event_id(reaction_id), ) .await; - assert_reaction_is_updated!(stream, &msg_id, item_pos, true); + assert_reaction_is_updated!(stream, &event_id, item_pos, true); // Toggling the reaction locally… - timeline.toggle_reaction_local(&reaction).await.unwrap(); + timeline.toggle_reaction_local(&msg_uid, REACTION_KEY).await.unwrap(); + // Will immediately redact it on the item. - let event = assert_item_update!(stream, &msg_id, item_pos); + let event = assert_item_update!(stream, &event_id, item_pos); assert!(event.reactions().get(&REACTION_KEY.to_owned()).is_none()); // And send a redaction request for that reaction. { @@ -169,15 +166,14 @@ async fn test_redact_reaction_success() { async fn test_reactions_store_timestamp() { let timeline = TestTimeline::new(); let mut stream = timeline.subscribe().await; - let (msg_id, msg_pos) = send_first_message(&timeline, &mut stream).await; - let reaction = create_annotation(&msg_id); + let (msg_uid, event_id, msg_pos) = send_first_message(&timeline, &mut stream).await; // Creating a reaction adds a valid timestamp. let timestamp_before = MilliSecondsSinceUnixEpoch::now(); - timeline.toggle_reaction_local(&reaction).await.unwrap(); + timeline.toggle_reaction_local(&msg_uid, REACTION_KEY).await.unwrap(); - let event = assert_reaction_is_updated!(stream, &msg_id, msg_pos, false); + let event = assert_reaction_is_updated!(stream, &event_id, msg_pos, false); let reactions = event.reactions().get(&REACTION_KEY.to_owned()).unwrap(); let timestamp = reactions.values().next().unwrap().timestamp; @@ -216,25 +212,19 @@ async fn test_initial_reaction_timestamp_is_stored() { assert_eq!(reaction_timestamp, entry.values().next().unwrap().timestamp); } -fn create_annotation(related_message_id: &EventId) -> Annotation { - let reaction_key = REACTION_KEY.to_owned(); - let msg_id = related_message_id.to_owned(); - Annotation::new(msg_id, reaction_key) -} - -/// Returns the event id and position of the message. +/// Returns the unique item id, the event id, and position of the message. async fn send_first_message( timeline: &TestTimeline, stream: &mut (impl Stream>> + Unpin), -) -> (OwnedEventId, usize) { +) -> (String, OwnedEventId, usize) { timeline.handle_live_event(timeline.factory.text_msg("I want you to react").sender(&BOB)).await; let item = assert_next_matches!(*stream, VectorDiff::PushBack { value } => value); - let event_id = item.as_event().unwrap().clone().event_id().unwrap().to_owned(); + let event_id = item.as_event().unwrap().as_remote().unwrap().event_id.clone(); let position = timeline.len().await - 1; let day_divider = assert_next_matches!(*stream, VectorDiff::PushFront { value } => value); assert!(day_divider.is_day_divider()); - (event_id, position) + (item.unique_id().to_owned(), event_id, position) } diff --git a/crates/matrix-sdk-ui/src/timeline/util.rs b/crates/matrix-sdk-ui/src/timeline/util.rs index f88428af725..af9e316cb42 100644 --- a/crates/matrix-sdk-ui/src/timeline/util.rs +++ b/crates/matrix-sdk-ui/src/timeline/util.rs @@ -52,13 +52,10 @@ impl Deref for EventTimelineItemWithId<'_> { } } -/// Finds an item in the vector of `items` given a predicate `f`. -/// -/// WARNING/FIXME: this does a linear scan of the items, so this can be slow if -/// there are many items in the timeline. -pub(super) fn rfind_event_item( +#[inline(always)] +fn rfind_event_item_internal( items: &Vector>, - mut f: impl FnMut(&EventTimelineItem) -> bool, + mut f: impl FnMut(&EventTimelineItemWithId<'_>) -> bool, ) -> Option<(usize, EventTimelineItemWithId<'_>)> { items .iter() @@ -69,10 +66,35 @@ pub(super) fn rfind_event_item( EventTimelineItemWithId { inner: item.as_event()?, internal_id: &item.internal_id }, )) }) - .rfind(|(_, it)| f(it.inner)) + .rfind(|(_, it)| f(it)) +} + +/// Finds an item in the vector of `items` given a predicate `f`. +/// +/// WARNING/FIXME: this does a linear scan of the items, so this can be slow if +/// there are many items in the timeline. +pub(super) fn rfind_event_item( + items: &Vector>, + mut f: impl FnMut(&EventTimelineItem) -> bool, +) -> Option<(usize, EventTimelineItemWithId<'_>)> { + rfind_event_item_internal(items, |item_with_id| f(item_with_id.inner)) +} + +/// Find the timeline item that matches the given internal id, if any. +/// +/// WARNING: Linear scan of the items, see documentation of +/// [`rfind_event_item`]. +pub(super) fn rfind_event_by_uid<'a>( + items: &'a Vector>, + internal_id: &'a str, +) -> Option<(usize, EventTimelineItemWithId<'a>)> { + rfind_event_item_internal(items, |item_with_id| item_with_id.internal_id == internal_id) } /// Find the timeline item that matches the given event id, if any. +/// +/// WARNING: Linear scan of the items, see documentation of +/// [`rfind_event_item`]. pub(super) fn rfind_event_by_id<'a>( items: &'a Vector>, event_id: &EventId, diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs b/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs index cb68d904f29..8202af31d40 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs @@ -12,19 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Duration; +use std::{sync::Mutex, time::Duration}; use assert_matches2::{assert_let, assert_matches}; use eyeball_im::VectorDiff; use futures_util::{FutureExt as _, StreamExt as _}; -use matrix_sdk::test_utils::{events::EventFactory, logged_in_client_with_server}; +use matrix_sdk::{ + assert_next_matches_with_timeout, + test_utils::{events::EventFactory, logged_in_client_with_server}, +}; use matrix_sdk_test::{ async_test, mocks::{mock_encryption_state, mock_redaction}, JoinedRoomBuilder, SyncResponseBuilder, ALICE, }; use matrix_sdk_ui::timeline::{ReactionStatus, RoomExt as _}; -use ruma::{event_id, events::relation::Annotation, room_id}; +use ruma::{event_id, events::room::message::RoomMessageEventContent, room_id}; use serde_json::json; use wiremock::{ matchers::{header, method, path_regex}, @@ -72,6 +75,7 @@ async fn test_abort_before_being_sent() { assert_let!(Some(VectorDiff::PushBack { value: first }) = stream.next().await); let item = first.as_event().unwrap(); + let unique_id = first.unique_id(); assert_eq!(item.content().as_message().unwrap().body(), "hello"); assert_let!(Some(VectorDiff::PushFront { value: day_divider }) = stream.next().await); @@ -98,7 +102,7 @@ async fn test_abort_before_being_sent() { mock_redaction(event_id!("$3")).mount(&server).await; // We add the reaction… - timeline.toggle_reaction(&Annotation::new(event_id.to_owned(), "👍".to_owned())).await.unwrap(); + timeline.toggle_reaction(unique_id, "👍").await.unwrap(); // First toggle (local echo). { @@ -115,7 +119,7 @@ async fn test_abort_before_being_sent() { } // We toggle another reaction at the same time… - timeline.toggle_reaction(&Annotation::new(event_id.to_owned(), "🥰".to_owned())).await.unwrap(); + timeline.toggle_reaction(unique_id, "🥰").await.unwrap(); { assert_let!(Some(VectorDiff::Set { index: 1, value: item }) = stream.next().await); @@ -136,7 +140,7 @@ async fn test_abort_before_being_sent() { // Then we remove the first one; because it was being sent, it should lead to a // redaction event. - timeline.toggle_reaction(&Annotation::new(event_id.to_owned(), "👍".to_owned())).await.unwrap(); + timeline.toggle_reaction(unique_id, "👍").await.unwrap(); { assert_let!(Some(VectorDiff::Set { index: 1, value: item }) = stream.next().await); @@ -153,7 +157,7 @@ async fn test_abort_before_being_sent() { // But because the first one was being sent, this one won't and the local echo // could be discarded. - timeline.toggle_reaction(&Annotation::new(event_id.to_owned(), "🥰".to_owned())).await.unwrap(); + timeline.toggle_reaction(unique_id, "🥰").await.unwrap(); { assert_let!(Some(VectorDiff::Set { index: 1, value: item }) = stream.next().await); @@ -210,16 +214,21 @@ async fn test_redact_failed() { let _response = client.sync_once(Default::default()).await.unwrap(); server.reset().await; - assert_let!(Some(VectorDiff::PushBack { value: item }) = stream.next().await); - let item = item.as_event().unwrap(); - assert_eq!(item.content().as_message().unwrap().body(), "hello"); - assert!(item.reactions().is_empty()); + let unique_id = assert_next_matches_with_timeout!(stream, VectorDiff::PushBack { value: item } => { + let unique_id = item.unique_id().to_owned(); + let item = item.as_event().unwrap(); + assert_eq!(item.content().as_message().unwrap().body(), "hello"); + assert!(item.reactions().is_empty()); + unique_id + }); - assert_let!(Some(VectorDiff::Set { index: 0, value: item }) = stream.next().await); - assert_eq!(item.as_event().unwrap().reactions().len(), 1); + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 0, value: item } => { + assert_eq!(item.as_event().unwrap().reactions().len(), 1); + }); - assert_let!(Some(VectorDiff::PushFront { value: day_divider }) = stream.next().await); - assert!(day_divider.is_day_divider()); + assert_next_matches_with_timeout!(stream, VectorDiff::PushFront { value: day_divider } => { + assert!(day_divider.is_day_divider()); + }); // Now, redact the annotation we previously added. @@ -233,19 +242,141 @@ async fn test_redact_failed() { .await; // We toggle the reaction, which fails with an error. - timeline - .toggle_reaction(&Annotation::new(event_id.to_owned(), "😆".to_owned())) - .await - .unwrap_err(); + timeline.toggle_reaction(&unique_id, "😆").await.unwrap_err(); // The local echo is removed (assuming the redaction works)… - assert_let!(Some(VectorDiff::Set { index: 1, value: item }) = stream.next().await); - assert!(item.as_event().unwrap().reactions().is_empty()); + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 1, value: item } => { + assert!(item.as_event().unwrap().reactions().is_empty()); + }); // …then added back, after redaction failed. - assert_let!(Some(VectorDiff::Set { index: 1, value: item }) = stream.next().await); - assert_eq!(item.as_event().unwrap().reactions().len(), 1); + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 1, value: item } => { + assert_eq!(item.as_event().unwrap().reactions().len(), 1); + }); + + tokio::time::sleep(Duration::from_millis(150)).await; + assert!(stream.next().now_or_never().is_none()); +} + +#[async_test] +async fn test_local_reaction_to_local_echo() { + // This test checks that if a reaction redaction failed, then we re-insert the + // reaction after displaying it was removed. + + let room_id = room_id!("!a98sd12bjh:example.org"); + let (client, server) = logged_in_client_with_server().await; + let user_id = client.user_id().unwrap(); + + // Make the test aware of the room. + let mut sync_builder = SyncResponseBuilder::new(); + sync_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + + mock_sync(&server, sync_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(Default::default()).await.unwrap(); + server.reset().await; + + mock_encryption_state(&server, false).await; + + let room = client.get_room(room_id).unwrap(); + let timeline = room.timeline().await.unwrap(); + let (initial_items, mut stream) = timeline.subscribe().await; + + assert!(initial_items.is_empty()); + + // Add a duration to the response, so we can check other things in the + // meanwhile. + let next_event_id = Mutex::new(0); + Mock::given(method("PUT")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*")) + .and(header("authorization", "Bearer 1234")) + .respond_with(move |_req: &wiremock::Request| { + let mut next_event_id = next_event_id.lock().unwrap(); + let event_id = *next_event_id; + *next_event_id += 1; + let mut tmp = ResponseTemplate::new(200).set_body_json(json!({ + "event_id": format!("${event_id}"), + })); + + if event_id == 0 { + tmp = tmp.set_delay(Duration::from_secs(1)); + } + + tmp + }) + .mount(&server) + .await; + + // Send a local event. + let _ = timeline.send(RoomMessageEventContent::text_plain("lol").into()).await.unwrap(); + + // Receive a local echo. + let unique_id = assert_next_matches_with_timeout!(stream, VectorDiff::PushBack { value: item } => { + let unique_id = item.unique_id().to_owned(); + let item = item.as_event().unwrap(); + assert!(item.is_local_echo()); + assert_eq!(item.content().as_message().unwrap().body(), "lol"); + assert!(item.reactions().is_empty()); + unique_id + }); + + // Good ol' day divider. + assert_next_matches_with_timeout!(stream, VectorDiff::PushFront { value: day_divider } => { + assert!(day_divider.is_day_divider()); + }); + + // Add a reaction before the remote echo comes back. + let key1 = "🤣"; + timeline.toggle_reaction(&unique_id, key1).await.unwrap(); + + // The reaction is added to the local echo. + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 1, value: item } => { + let reactions = item.as_event().unwrap().reactions(); + assert_eq!(reactions.len(), 1); + let reaction_info = reactions.get(key1).unwrap().get(user_id).unwrap(); + assert_matches!(&reaction_info.status, ReactionStatus::LocalToLocal(..)); + }); + + // Add another reaction. + let key2 = "😈"; + timeline.toggle_reaction(&unique_id, key2).await.unwrap(); + + // Also comes as a local echo. + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 1, value: item } => { + let reactions = item.as_event().unwrap().reactions(); + assert_eq!(reactions.len(), 2); + let reaction_info = reactions.get(key2).unwrap().get(user_id).unwrap(); + assert_matches!(&reaction_info.status, ReactionStatus::LocalToLocal(..)); + }); + + // Remove second reaction. It's immediately removed, since it was a local echo, + // and it wasn't being sent. + timeline.toggle_reaction(&unique_id, key2).await.unwrap(); + + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 1, value: item } => { + let reactions = item.as_event().unwrap().reactions(); + assert_eq!(reactions.len(), 1); + let reaction_info = reactions.get(key1).unwrap().get(user_id).unwrap(); + assert_matches!(&reaction_info.status, ReactionStatus::LocalToLocal(..)); + }); + + // Now, wait for the remote echo for the message itself. + assert_next_matches_with_timeout!(stream, 2000, VectorDiff::Set { index: 1, value: item } => { + let reactions = item.as_event().unwrap().reactions(); + assert_eq!(reactions.len(), 1); + let reaction_info = reactions.get(key1).unwrap().get(user_id).unwrap(); + // TODO(bnjbvr): why not LocalToRemote here? + assert_matches!(&reaction_info.status, ReactionStatus::LocalToLocal(..)); + }); + + // And then the remote echo for the reaction itself. + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 1, value: item } => { + let reactions = item.as_event().unwrap().reactions(); + assert_eq!(reactions.len(), 1); + let reaction_info = reactions.get(key1).unwrap().get(user_id).unwrap(); + assert_matches!(&reaction_info.status, ReactionStatus::RemoteToRemote(..)); + }); + // And we're done. tokio::time::sleep(Duration::from_millis(150)).await; assert!(stream.next().now_or_never().is_none()); } diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index 15b3458c3dd..1ac78b1c491 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -1370,6 +1370,11 @@ impl SendReactionHandle { handle.abort().await } + + /// The transaction id that will be used to send this reaction later. + pub fn transaction_id(&self) -> &TransactionId { + &self.transaction_id + } } /// From a given source of [`DependentQueuedEvent`], return only the most diff --git a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs index fc9c8129a6a..80ebce3b2db 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs @@ -22,8 +22,7 @@ use eyeball_im::{Vector, VectorDiff}; use futures_util::{FutureExt, StreamExt}; use matrix_sdk::ruma::{ api::client::room::create_room::v3::Request as CreateRoomRequest, - events::{relation::Annotation, room::message::RoomMessageEventContent}, - MilliSecondsSinceUnixEpoch, + events::room::message::RoomMessageEventContent, MilliSecondsSinceUnixEpoch, }; use matrix_sdk_ui::timeline::{EventSendState, ReactionStatus, RoomExt, TimelineItem}; use tokio::{ @@ -85,15 +84,17 @@ async fn test_toggling_reaction() -> Result<()> { items.iter().find_map(|item| { let event = item.as_event()?; if !event.is_local_echo() && event.content().as_message()?.body().trim() == "hi!" { - event.event_id().map(|event_id| event_id.to_owned()) + event + .event_id() + .map(|event_id| (item.unique_id().to_owned(), event_id.to_owned())) } else { None } }) }; - if let Some(event_id) = find_event_id(&items) { - return Ok(event_id); + if let Some(pair) = find_event_id(&items) { + return Ok(pair); } warn!(?items, "Waiting for updates…"); @@ -101,8 +102,8 @@ async fn test_toggling_reaction() -> Result<()> { while let Some(diff) = stream.next().await { warn!(?diff, "received a diff"); diff.apply(&mut items); - if let Some(event_id) = find_event_id(&items) { - return Ok(event_id); + if let Some(pair) = find_event_id(&items) { + return Ok(pair); } } @@ -117,7 +118,7 @@ async fn test_toggling_reaction() -> Result<()> { debug!("Sending initial message…"); timeline.send(RoomMessageEventContent::text_plain("hi!").into()).await.unwrap(); - let event_id = timeout(Duration::from_secs(10), event_id_task) + let (msg_uid, event_id) = timeout(Duration::from_secs(10), event_id_task) .await .expect("timeout") .expect("failed to join tokio task") @@ -143,19 +144,19 @@ async fn test_toggling_reaction() -> Result<()> { .find_map(|(i, item)| (item.as_event()?.event_id()? == event_id).then_some(i)) .expect("couldn't find the final position for the event id"); - let reaction = Annotation::new(event_id.clone(), "👍".to_owned()); + let reaction_key = "👍".to_owned(); // Toggle reaction multiple times. for _ in 0..3 { debug!("Starting the toggle reaction tests…"); // Add the reaction. - timeline.toggle_reaction(&reaction).await.expect("toggling reaction"); + timeline.toggle_reaction(&msg_uid, &reaction_key).await.expect("toggling reaction"); // Local echo is added. { let event = assert_event_is_updated!(stream, event_id, message_position); - let reactions = event.reactions().get(&reaction.key).unwrap(); + let reactions = event.reactions().get(&reaction_key).unwrap(); let reaction = reactions.get(&user_id).unwrap(); assert_matches!(reaction.status, ReactionStatus::LocalToRemote(..)); } @@ -164,7 +165,7 @@ async fn test_toggling_reaction() -> Result<()> { { let event = assert_event_is_updated!(stream, event_id, message_position); - let reactions = event.reactions().get(&reaction.key).unwrap(); + let reactions = event.reactions().get(&reaction_key).unwrap(); assert_eq!(reactions.keys().count(), 1); let reaction = reactions.get(&user_id).unwrap(); @@ -178,7 +179,10 @@ async fn test_toggling_reaction() -> Result<()> { } // Redact the reaction. - timeline.toggle_reaction(&reaction).await.expect("toggling reaction the second time"); + timeline + .toggle_reaction(&msg_uid, &reaction_key) + .await + .expect("toggling reaction the second time"); // The reaction is removed. let event = assert_event_is_updated!(stream, event_id, message_position);