diff --git a/bindings/matrix-sdk-ffi/src/timeline/mod.rs b/bindings/matrix-sdk-ffi/src/timeline/mod.rs index 08f15ba076f..50ff1972726 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/mod.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/mod.rs @@ -45,7 +45,6 @@ use ruma::{ }, }, receipt::ReceiptThread, - relation::Annotation, room::message::{ ForwardThread, LocationMessageEventContent, MessageType, RoomMessageEventContentWithoutRelation, @@ -538,9 +537,21 @@ impl Timeline { let _ = self.send(Arc::new(room_message_event_content)).await; } - pub async fn toggle_reaction(&self, event_id: String, key: String) -> Result<(), ClientError> { - let event_id = EventId::parse(event_id)?; - self.inner.toggle_reaction(&Annotation::new(event_id, key)).await?; + /// Toggle a reaction on an event. + /// + /// The `unique_id` parameter is a string returned by + /// the `TimelineItem::unique_id()` method. As such, this method works both + /// on local echoes and remote items. + /// + /// Adds or redacts a reaction based on the state of the reaction at the + /// time it is called. + /// + /// When redacting a previous reaction, the redaction reason is not set. + /// + /// Ensures that only one reaction is sent at a time to avoid race + /// conditions and spamming the homeserver with requests. + pub async fn toggle_reaction(&self, unique_id: String, key: String) -> Result<(), ClientError> { + self.inner.toggle_reaction(&unique_id, &key).await?; Ok(()) } diff --git a/crates/matrix-sdk-base/Cargo.toml b/crates/matrix-sdk-base/Cargo.toml index 4243806ff55..7373797a9e3 100644 --- a/crates/matrix-sdk-base/Cargo.toml +++ b/crates/matrix-sdk-base/Cargo.toml @@ -27,6 +27,11 @@ experimental-sliding-sync = [ ] uniffi = ["dep:uniffi", "matrix-sdk-crypto?/uniffi", "matrix-sdk-common/uniffi"] +# Private feature, see +# https://github.com/matrix-org/matrix-rust-sdk/pull/3749#issuecomment-2312939823 for the gory +# details. +test-send-sync = [] + # "message-ids" feature doesn't do anything and is deprecated. message-ids = [] diff --git a/crates/matrix-sdk-base/src/rooms/normal.rs b/crates/matrix-sdk-base/src/rooms/normal.rs index 7028e88d01d..0d08370cd8c 100644 --- a/crates/matrix-sdk-base/src/rooms/normal.rs +++ b/crates/matrix-sdk-base/src/rooms/normal.rs @@ -992,6 +992,23 @@ impl Room { } } +// See https://github.com/matrix-org/matrix-rust-sdk/pull/3749#issuecomment-2312939823. +#[cfg(not(feature = "test-send-sync"))] +unsafe impl Send for Room {} + +// See https://github.com/matrix-org/matrix-rust-sdk/pull/3749#issuecomment-2312939823. +#[cfg(not(feature = "test-send-sync"))] +unsafe impl Sync for Room {} + +#[cfg(feature = "test-send-sync")] +#[test] +// See https://github.com/matrix-org/matrix-rust-sdk/pull/3749#issuecomment-2312939823. +fn test_send_sync_for_room() { + fn assert_send_sync() {} + + assert_send_sync::(); +} + /// The underlying pure data structure for joined and left rooms. /// /// Holds all the info needed to persist a room into the state store. diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index 906ffdc271a..1a6e3105efc 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -1201,6 +1201,12 @@ pub enum DependentQueuedEventKind { /// The event should be redacted/aborted/removed. Redact, + + /// The event should be reacted to, with the given key. + React { + /// Key used for the reaction. + key: String, + }, } /// A transaction id identifying a [`DependentQueuedEvent`] rather than its diff --git a/crates/matrix-sdk-ui/src/timeline/day_dividers.rs b/crates/matrix-sdk-ui/src/timeline/day_dividers.rs index 8fd295f4174..f8e1e748502 100644 --- a/crates/matrix-sdk-ui/src/timeline/day_dividers.rs +++ b/crates/matrix-sdk-ui/src/timeline/day_dividers.rs @@ -623,7 +623,6 @@ mod tests { let event_kind = EventTimelineItemKind::Remote(RemoteEventTimelineItem { event_id: owned_event_id!("$1"), transaction_id: None, - reactions: Default::default(), read_receipts: Default::default(), is_own: false, is_highlighted: false, @@ -638,6 +637,7 @@ mod tests { timestamp, TimelineItemContent::RedactedMessage, event_kind, + Default::default(), false, ) } diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index 3f0d2bde840..a3d71d8ac5d 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -572,11 +572,6 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { }; if let Some((idx, event_item)) = rfind_event_by_id(self.items, reacted_to_event_id) { - let Some(remote_event_item) = event_item.as_remote() else { - error!("received reaction to a local echo"); - return; - }; - // Ignore reactions on redacted events. if let TimelineItemContent::RedactedMessage = event_item.content() { debug!("Ignoring reaction on redacted event"); @@ -586,7 +581,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { trace!("Added reaction"); // Add the reaction to the event item's bundled reactions. - let mut reactions = remote_event_item.reactions.clone(); + let mut reactions = event_item.reactions.clone(); reactions.entry(c.relates_to.key.clone()).or_default().insert( self.ctx.sender.clone(), @@ -839,16 +834,10 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { return false; }; - let Some(remote_event_item) = item.as_remote() else { - error!("inconsistent state: redaction received on a non-remote event item"); - return false; - }; - - let mut reactions = remote_event_item.reactions.clone(); + let mut reactions = item.reactions.clone(); if reactions.remove_reaction(&sender, &key).is_some() { trace!("Removing reaction"); - let new_item = item.with_kind(remote_event_item.with_reactions(reactions)); - self.items.set(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned())); + self.items.set(item_pos, item.with_reactions(reactions)); self.result.items_updated += 1; return true; } @@ -894,7 +883,6 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { RemoteEventTimelineItem { event_id: event_id.clone(), transaction_id: txn_id.clone(), - reactions, read_receipts: self.ctx.read_receipts.clone(), is_own: self.ctx.is_own_event, is_highlighted: self.ctx.is_highlighted, @@ -915,6 +903,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { timestamp, content, kind, + reactions, is_room_encrypted, ); @@ -967,10 +956,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { if old_item.content.is_redacted() && !item.content.is_redacted() { warn!("Got original form of an event that was previously redacted"); item.content = item.content.redact(&self.meta.room_version); - item.as_remote_mut() - .expect("Can't have a local item when flow == Remote") - .reactions - .clear(); + item.reactions.clear(); } } diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs index 192d34da86c..3a8e7033d10 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs @@ -21,7 +21,7 @@ use as_variant::as_variant; use indexmap::IndexMap; use matrix_sdk::{ deserialized_responses::{EncryptionInfo, ShieldState}, - send_queue::SendHandle, + send_queue::{SendHandle, SendReactionHandle}, Client, Error, }; use matrix_sdk_base::{ @@ -66,6 +66,8 @@ pub struct EventTimelineItem { pub(super) sender: OwnedUserId, /// The sender's profile of the event. pub(super) sender_profile: TimelineDetails, + /// All bundled reactions about the event. + pub(super) reactions: ReactionsByKeyBySender, /// The timestamp of the event. pub(super) timestamp: MilliSecondsSinceUnixEpoch, /// The content of the event. @@ -114,10 +116,11 @@ impl EventTimelineItem { timestamp: MilliSecondsSinceUnixEpoch, content: TimelineItemContent, kind: EventTimelineItemKind, + reactions: ReactionsByKeyBySender, is_room_encrypted: bool, ) -> Self { let is_room_encrypted = Some(is_room_encrypted); - Self { sender, sender_profile, timestamp, content, kind, is_room_encrypted } + Self { sender, sender_profile, timestamp, content, reactions, kind, is_room_encrypted } } /// If the supplied low-level `SyncTimelineEvent` is suitable for use as the @@ -175,7 +178,6 @@ impl EventTimelineItem { let kind = RemoteEventTimelineItem { event_id, transaction_id: None, - reactions, read_receipts, is_own, is_highlighted, @@ -199,9 +201,16 @@ impl EventTimelineItem { } else { TimelineDetails::Unavailable }; - let is_room_encrypted = None; - Some(Self { sender, sender_profile, timestamp, content, kind, is_room_encrypted }) + Some(Self { + sender, + sender_profile, + timestamp, + content, + kind, + reactions, + is_room_encrypted: None, + }) } /// Check whether this item is a local echo. @@ -291,12 +300,7 @@ impl EventTimelineItem { /// Get the reactions of this item. pub fn reactions(&self) -> &ReactionsByKeyBySender { - // There's not much of a point in allowing reactions to local echoes. - static EMPTY_REACTIONS: Lazy = Lazy::new(Default::default); - match &self.kind { - EventTimelineItemKind::Local(_) => &EMPTY_REACTIONS, - EventTimelineItemKind::Remote(remote_event) => &remote_event.reactions, - } + &self.reactions } /// Get the read receipts of this item. @@ -453,6 +457,11 @@ impl EventTimelineItem { Self { kind: kind.into(), ..self.clone() } } + /// Clone the current event item, and update its `reactions`. + pub fn with_reactions(&self, reactions: ReactionsByKeyBySender) -> Self { + Self { reactions, ..self.clone() } + } + /// Clone the current event item, and update its content. /// /// Optionally update `latest_edit_json` if the update is an edit received @@ -490,6 +499,7 @@ impl EventTimelineItem { content, kind, is_room_encrypted: self.is_room_encrypted, + reactions: ReactionsByKeyBySender::default(), } } @@ -615,6 +625,10 @@ pub enum EventItemOrigin { /// What's the status of a reaction? #[derive(Clone, Debug)] pub enum ReactionStatus { + /// It's a local reaction to a local echo. + /// + /// The handle is missing only in testing contexts. + LocalToLocal(Option), /// It's a local reaction to a remote event. /// /// The handle is missing only in testing contexts. diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/remote.rs b/crates/matrix-sdk-ui/src/timeline/event_item/remote.rs index c86e6a53925..4510910c47a 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/remote.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/remote.rs @@ -22,8 +22,6 @@ use ruma::{ OwnedEventId, OwnedTransactionId, OwnedUserId, }; -use super::ReactionsByKeyBySender; - /// An item for an event that was received from the homeserver. #[derive(Clone)] pub(in crate::timeline) struct RemoteEventTimelineItem { @@ -33,9 +31,6 @@ pub(in crate::timeline) struct RemoteEventTimelineItem { /// If available, the transaction id we've used to send this event. pub transaction_id: Option, - /// All bundled reactions about the event. - pub reactions: ReactionsByKeyBySender, - /// All read receipts for the event. /// /// The key is the ID of a room member and the value are details about the @@ -78,20 +73,9 @@ impl RemoteEventTimelineItem { Self { encryption_info, ..self.clone() } } - /// Clone the current event item, and update its `reactions`. - pub fn with_reactions(&self, reactions: ReactionsByKeyBySender) -> Self { - Self { reactions, ..self.clone() } - } - - /// Clone the current event item, and clear its `reactions` as well as the - /// JSON representation fields. + /// Clone the current event item, and redacts its fields. pub fn redact(&self) -> Self { - Self { - reactions: ReactionsByKeyBySender::default(), - original_json: None, - latest_edit_json: None, - ..self.clone() - } + Self { original_json: None, latest_edit_json: None, ..self.clone() } } } @@ -116,7 +100,6 @@ impl fmt::Debug for RemoteEventTimelineItem { let Self { event_id, transaction_id, - reactions, read_receipts, is_own, encryption_info, @@ -129,7 +112,6 @@ impl fmt::Debug for RemoteEventTimelineItem { f.debug_struct("RemoteEventTimelineItem") .field("event_id", event_id) .field("transaction_id", transaction_id) - .field("reactions", reactions) .field("read_receipts", read_receipts) .field("is_own", is_own) .field("is_highlighted", is_highlighted) diff --git a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs index 999e0768685..adf46997b22 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs @@ -26,7 +26,9 @@ use matrix_sdk::crypto::OlmMachine; use matrix_sdk::{ deserialized_responses::SyncTimelineEvent, event_cache::{paginator::Paginator, RoomEventCache}, - send_queue::{LocalEcho, RoomSendQueueUpdate, SendHandle}, + send_queue::{ + LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle, + }, Result, Room, }; #[cfg(test)] @@ -45,7 +47,8 @@ use ruma::{ AnySyncTimelineEvent, MessageLikeEventType, }, serde::Raw, - EventId, OwnedEventId, OwnedTransactionId, RoomVersionId, TransactionId, UserId, + EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, RoomVersionId, + TransactionId, UserId, }; use tokio::sync::{RwLock, RwLockWriteGuard}; use tracing::{debug, error, field::debug, info, instrument, trace, warn}; @@ -64,14 +67,17 @@ use super::{ traits::RoomDataProvider, util::{rfind_event_by_id, rfind_event_item, RelativePosition}, Error, EventSendState, EventTimelineItem, InReplyToDetails, Message, PaginationError, Profile, - RepliedToEvent, TimelineDetails, TimelineEventItemId, TimelineFocus, TimelineItem, - TimelineItemContent, TimelineItemKind, + ReactionInfo, RepliedToEvent, TimelineDetails, TimelineEventItemId, TimelineFocus, + TimelineItem, TimelineItemContent, TimelineItemKind, }; use crate::{ timeline::{ day_dividers::DayDividerAdjuster, event_handler::LiveTimelineUpdatesAllowed, + event_item::EventTimelineItemKind, pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError}, + reactions::FullReactionKey, + util::rfind_event_by_uid, TimelineEventFilterFn, }, unable_to_decrypt_hook::UtdHookManager, @@ -416,7 +422,7 @@ impl TimelineInner

{ pub(super) async fn subscribe( &self, - ) -> (Vector>, impl Stream>>) { + ) -> (Vector>, impl Stream>> + Send) { trace!("Creating timeline items signal"); let state = self.state.read().await; (state.items.clone(), state.items.subscribe().into_stream()) @@ -448,49 +454,79 @@ impl TimelineInner

{ #[instrument(skip_all)] pub(super) async fn toggle_reaction_local( &self, - annotation: &Annotation, + unique_id: &str, + key: &str, ) -> Result { let mut state = self.state.write().await; - let user_id = self.room_data_provider.own_user_id(); - - let Some((item_pos, item)) = rfind_event_by_id(&state.items, &annotation.event_id) else { + let Some((item_pos, item)) = rfind_event_by_uid(&state.items, unique_id) else { warn!("Timeline item not found, can't add reaction"); return Err(Error::FailedToToggleReaction); }; + let user_id = self.room_data_provider.own_user_id(); let prev_status = item .reactions() - .get(&annotation.key) + .get(key) .and_then(|group| group.get(user_id)) .map(|reaction_info| reaction_info.status.clone()); let Some(prev_status) = prev_status else { - // Add a reaction through the room data provider. - // No need to reflect the effect locally, since the local echo handling will - // take care of it. - trace!("adding a new reaction"); - self.room_data_provider - .send(ReactionEventContent::from(annotation.clone()).into()) - .await?; - return Ok(true); + match &item.inner.kind { + EventTimelineItemKind::Local(local) => { + if let Some(send_handle) = local.send_handle.clone() { + if send_handle + .react(key.to_owned()) + .await + .map_err(|err| Error::SendQueueError(err.into()))? + .is_some() + { + trace!("adding a reaction to a local echo"); + return Ok(true); + } + + warn!("couldn't toggle reaction for local echo"); + return Ok(false); + } + + warn!("missing send handle for local echo; is this a test?"); + return Ok(false); + } + + EventTimelineItemKind::Remote(remote) => { + // Add a reaction through the room data provider. + // No need to reflect the effect locally, since the local echo handling will + // take care of it. + trace!("adding a reaction to a remote echo"); + let annotation = Annotation::new(remote.event_id.to_owned(), key.to_owned()); + self.room_data_provider + .send(ReactionEventContent::from(annotation).into()) + .await?; + return Ok(true); + } + } }; trace!("removing a previous reaction"); match prev_status { - ReactionStatus::LocalToRemote(send_handle) => { - // No need to keep the lock. - drop(state); + ReactionStatus::LocalToLocal(send_reaction_handle) => { + if let Some(handle) = send_reaction_handle { + if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? { + // Impossible state: the reaction has moved from local to echo under our + // feet, but the timeline was supposed to be locked! + warn!("unexpectedly unable to abort sending of local reaction"); + } + } else { + warn!("no send reaction handle (this should only happen in testing contexts)"); + } + } + ReactionStatus::LocalToRemote(send_handle) => { // No need to reflect the change ourselves, since handling the discard of the // local echo will take care of it. trace!("aborting send of the previous reaction that was a local echo"); - if let Some(send_handle) = send_handle { - if !send_handle - .abort() - .await - .map_err(|err| Error::SendQueueError(err.into()))? - { + if let Some(handle) = send_handle { + if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? { // Impossible state: the reaction has moved from local to echo under our // feet, but the timeline was supposed to be locked! warn!("unexpectedly unable to abort sending of local reaction"); @@ -502,8 +538,15 @@ impl TimelineInner

{ ReactionStatus::RemoteToRemote(event_id) => { // Assume the redaction will work; we'll re-add the reaction if it didn't. + let Some(annotated_event_id) = + item.as_remote().map(|event_item| event_item.event_id.clone()) + else { + warn!("remote reaction to remote event, but the associated item isn't remote"); + return Ok(false); + }; + let mut reactions = item.reactions().clone(); - let reaction_info = reactions.remove_reaction(user_id, &annotation.key); + let reaction_info = reactions.remove_reaction(user_id, key); if reaction_info.is_some() { let new_item = item.with_reactions(reactions); @@ -522,12 +565,12 @@ impl TimelineInner

{ let mut state = self.state.write().await; if let Some((item_pos, item)) = - rfind_event_by_id(&state.items, &annotation.event_id) + rfind_event_by_id(&state.items, &annotated_event_id) { // Re-add the reaction to the mapping. let mut reactions = item.reactions().clone(); reactions - .entry(annotation.key.to_owned()) + .entry(key.to_owned()) .or_default() .insert(user_id.to_owned(), reaction_info); let new_item = item.with_reactions(reactions); @@ -756,6 +799,27 @@ impl TimelineInner

{ error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent"); } + // If the event had local reactions, upgrade the mapping from reaction to + // events, to indicate that the event is now remote. + if let Some(new_event_id) = new_event_id { + let reactions = item.reactions(); + for (_key, by_user) in reactions.iter() { + for (_user_id, info) in by_user.iter() { + if let ReactionStatus::LocalToLocal(Some(reaction_handle)) = &info.status { + let reaction_txn_id = reaction_handle.transaction_id().to_owned(); + if let Some(found) = txn + .meta + .reactions + .map + .get_mut(&TimelineEventItemId::TransactionId(reaction_txn_id)) + { + found.item = TimelineEventItemId::EventId(new_event_id.to_owned()); + } + } + } + } + } + let new_item = item.with_inner_kind(local_item.with_send_state(send_state)); txn.items.set(idx, new_item); @@ -790,10 +854,8 @@ impl TimelineInner

{ state.meta.reactions.map.remove(&TimelineEventItemId::TransactionId(txn_id.to_owned())) { let item = match &full_key.item { - TimelineEventItemId::TransactionId(_) => { - // TODO(bnjbvr): reactions on local echoes - warn!("reactions on local echoes are NYI"); - return false; + TimelineEventItemId::TransactionId(txn_id) => { + rfind_event_item(&state.items, |item| item.transaction_id() == Some(txn_id)) } TimelineEventItemId::EventId(event_id) => rfind_event_by_id(&state.items, event_id), }; @@ -1147,37 +1209,89 @@ impl TimelineInner

{ /// Handle a room send update that's a new local echo. pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) { - let content = match echo.serialized_event.deserialize() { - Ok(d) => d, - Err(err) => { - warn!("error deserializing local echo: {err}"); - return; + match echo.content { + LocalEchoContent::Event { serialized_event, send_handle, is_wedged } => { + let content = match serialized_event.deserialize() { + Ok(d) => d, + Err(err) => { + warn!("error deserializing local echo: {err}"); + return; + } + }; + + self.handle_local_event( + echo.transaction_id.clone(), + TimelineEventKind::Message { content, relations: Default::default() }, + Some(send_handle), + ) + .await; + + if is_wedged { + self.update_event_send_state( + &echo.transaction_id, + EventSendState::SendingFailed { + // Put a dummy error in this case, since we're not persisting the errors + // that occurred in previous sessions. + error: Arc::new(matrix_sdk::Error::UnknownError(Box::new( + MissingLocalEchoFailError, + ))), + is_recoverable: false, + }, + ) + .await; + } } - }; - self.handle_local_event( - echo.transaction_id.clone(), - TimelineEventKind::Message { content, relations: Default::default() }, - Some(echo.send_handle), - ) - .await; - - if echo.is_wedged { - self.update_event_send_state( - &echo.transaction_id, - EventSendState::SendingFailed { - // Put a dummy error in this case, since we're not persisting the errors that - // occurred in previous sessions. - error: Arc::new(matrix_sdk::Error::UnknownError(Box::new( - MissingLocalEchoFailError, - ))), - is_recoverable: false, - }, - ) - .await; + LocalEchoContent::React { key, send_handle, applies_to } => { + self.handle_local_reaction(key, send_handle, applies_to).await; + } } } + /// Adds a reaction (local echo) to a local echo. + #[instrument(skip(self, send_handle))] + async fn handle_local_reaction( + &self, + reaction_key: String, + send_handle: SendReactionHandle, + applies_to: OwnedTransactionId, + ) { + let mut state = self.state.write().await; + + let Some((item_pos, item)) = + rfind_event_item(&state.items, |item| item.transaction_id() == Some(&applies_to)) + else { + warn!("Local item not found anymore."); + return; + }; + + let user_id = self.room_data_provider.own_user_id(); + + let reaction_txn_id = send_handle.transaction_id().to_owned(); + let reaction_info = ReactionInfo { + timestamp: MilliSecondsSinceUnixEpoch::now(), + status: ReactionStatus::LocalToLocal(Some(send_handle)), + }; + + let mut reactions = item.reactions().clone(); + let by_user = reactions.entry(reaction_key.clone()).or_default(); + by_user.insert(user_id.to_owned(), reaction_info); + + trace!("Adding local reaction to local echo"); + let new_item = item.with_reactions(reactions); + state.items.set(item_pos, new_item); + + // Add it to the reaction map, so we can discard it later if needs be. + state.meta.reactions.map.insert( + TimelineEventItemId::TransactionId(reaction_txn_id), + FullReactionKey { + item: TimelineEventItemId::TransactionId(applies_to), + key: reaction_key, + sender: user_id.to_owned(), + }, + ); + } + /// Handle a single room send queue update. pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) { match update { diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index b4472e1560a..f1459946ad9 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -41,7 +41,6 @@ use ruma::{ UnstablePollStartEventContent, }, receipt::{Receipt, ReceiptThread}, - relation::Annotation, room::{ message::{ AddMentions, ForwardThread, OriginalRoomMessageEvent, RoomMessageEventContent, @@ -548,17 +547,20 @@ impl Timeline { Ok(()) } - /// Toggle a reaction on an event + /// Toggle a reaction on an event. + /// + /// The `unique_id` parameter is a string returned by + /// [`TimelineItem::unique_id()`]. /// /// Adds or redacts a reaction based on the state of the reaction at the /// time it is called. /// - /// When redacting an event, the redaction reason is not sent. + /// When redacting a previous reaction, the redaction reason is not set. /// /// Ensures that only one reaction is sent at a time to avoid race /// conditions and spamming the homeserver with requests. - pub async fn toggle_reaction(&self, annotation: &Annotation) -> Result<(), Error> { - self.inner.toggle_reaction_local(annotation).await?; + pub async fn toggle_reaction(&self, unique_id: &str, reaction_key: &str) -> Result<(), Error> { + self.inner.toggle_reaction_local(unique_id, reaction_key).await?; Ok(()) } diff --git a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs index d38efaf93c8..971e0c98063 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs @@ -257,10 +257,24 @@ impl TestTimeline { self.inner.handle_read_receipts(ev_content).await; } - async fn toggle_reaction_local(&self, annotation: &Annotation) -> Result<(), super::Error> { - if self.inner.toggle_reaction_local(annotation).await? { - // Fake a local echo, for new reactions. - self.handle_local_event(ReactionEventContent::new(annotation.clone()).into()).await; + async fn toggle_reaction_local(&self, unique_id: &str, key: &str) -> Result<(), super::Error> { + if self.inner.toggle_reaction_local(unique_id, key).await? { + // TODO(bnjbvr): hacky? + if let Some(event_id) = self + .inner + .items() + .await + .iter() + .rfind(|item| item.unique_id() == unique_id) + .and_then(|item| item.as_event()?.as_remote()) + .map(|event_item| event_item.event_id.clone()) + { + // Fake a local echo, for new reactions. + self.handle_local_event( + ReactionEventContent::new(Annotation::new(event_id, key.to_owned())).into(), + ) + .await; + } } Ok(()) } diff --git a/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs b/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs index 1a6d4e4de60..2c2f514dfcb 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs @@ -21,9 +21,8 @@ use futures_util::{FutureExt as _, StreamExt as _}; use matrix_sdk::{deserialized_responses::SyncTimelineEvent, test_utils::events::EventFactory}; use matrix_sdk_test::{async_test, sync_timeline_event, ALICE, BOB}; use ruma::{ - event_id, - events::{relation::Annotation, AnyMessageLikeEventContent}, - server_name, uint, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, + event_id, events::AnyMessageLikeEventContent, server_name, uint, EventId, + MilliSecondsSinceUnixEpoch, OwnedEventId, }; use stream_assert::assert_next_matches; use tokio::time::timeout; @@ -70,7 +69,9 @@ macro_rules! assert_reaction_is_updated { let reactions = event.reactions().get(&REACTION_KEY.to_owned()).unwrap(); let reaction = reactions.get(*ALICE).unwrap(); match reaction.status { - ReactionStatus::LocalToRemote(_) => assert!(!$is_remote_echo), + ReactionStatus::LocalToRemote(_) | ReactionStatus::LocalToLocal(_) => { + assert!(!$is_remote_echo) + } ReactionStatus::RemoteToRemote(_) => assert!($is_remote_echo), }; event @@ -82,10 +83,7 @@ async fn test_add_reaction_on_non_existent_event() { let timeline = TestTimeline::new(); let mut stream = timeline.subscribe().await; - let event_id = EventId::new(server_name!("example.org")); // non existent event - let reaction = create_annotation(&event_id); - - timeline.toggle_reaction_local(&reaction).await.unwrap_err(); + timeline.toggle_reaction_local("nonexisting_unique_id", REACTION_KEY).await.unwrap_err(); assert!(stream.next().now_or_never().is_none()); } @@ -94,11 +92,10 @@ async fn test_add_reaction_on_non_existent_event() { async fn test_add_reaction_success() { let timeline = TestTimeline::new(); let mut stream = timeline.subscribe().await; - let (event_id, item_pos) = send_first_message(&timeline, &mut stream).await; - let reaction = create_annotation(&event_id); + let (msg_uid, event_id, item_pos) = send_first_message(&timeline, &mut stream).await; // If I toggle a reaction on an event which didn't have any… - timeline.toggle_reaction_local(&reaction).await.unwrap(); + timeline.toggle_reaction_local(&msg_uid, REACTION_KEY).await.unwrap(); // The timeline item is updated, with a local echo for the reaction. assert_reaction_is_updated!(stream, &event_id, item_pos, false); @@ -126,22 +123,22 @@ async fn test_redact_reaction_success() { let f = &timeline.factory; let mut stream = timeline.subscribe().await; - let (msg_id, item_pos) = send_first_message(&timeline, &mut stream).await; - let reaction = create_annotation(&msg_id); + let (msg_uid, event_id, item_pos) = send_first_message(&timeline, &mut stream).await; // A reaction is added by sync. let reaction_id = event_id!("$reaction_id"); timeline .handle_live_event( - f.reaction(&msg_id, REACTION_KEY.to_owned()).sender(&ALICE).event_id(reaction_id), + f.reaction(&event_id, REACTION_KEY.to_owned()).sender(&ALICE).event_id(reaction_id), ) .await; - assert_reaction_is_updated!(stream, &msg_id, item_pos, true); + assert_reaction_is_updated!(stream, &event_id, item_pos, true); // Toggling the reaction locally… - timeline.toggle_reaction_local(&reaction).await.unwrap(); + timeline.toggle_reaction_local(&msg_uid, REACTION_KEY).await.unwrap(); + // Will immediately redact it on the item. - let event = assert_item_update!(stream, &msg_id, item_pos); + let event = assert_item_update!(stream, &event_id, item_pos); assert!(event.reactions().get(&REACTION_KEY.to_owned()).is_none()); // And send a redaction request for that reaction. { @@ -169,15 +166,14 @@ async fn test_redact_reaction_success() { async fn test_reactions_store_timestamp() { let timeline = TestTimeline::new(); let mut stream = timeline.subscribe().await; - let (msg_id, msg_pos) = send_first_message(&timeline, &mut stream).await; - let reaction = create_annotation(&msg_id); + let (msg_uid, event_id, msg_pos) = send_first_message(&timeline, &mut stream).await; // Creating a reaction adds a valid timestamp. let timestamp_before = MilliSecondsSinceUnixEpoch::now(); - timeline.toggle_reaction_local(&reaction).await.unwrap(); + timeline.toggle_reaction_local(&msg_uid, REACTION_KEY).await.unwrap(); - let event = assert_reaction_is_updated!(stream, &msg_id, msg_pos, false); + let event = assert_reaction_is_updated!(stream, &event_id, msg_pos, false); let reactions = event.reactions().get(&REACTION_KEY.to_owned()).unwrap(); let timestamp = reactions.values().next().unwrap().timestamp; @@ -216,25 +212,19 @@ async fn test_initial_reaction_timestamp_is_stored() { assert_eq!(reaction_timestamp, entry.values().next().unwrap().timestamp); } -fn create_annotation(related_message_id: &EventId) -> Annotation { - let reaction_key = REACTION_KEY.to_owned(); - let msg_id = related_message_id.to_owned(); - Annotation::new(msg_id, reaction_key) -} - -/// Returns the event id and position of the message. +/// Returns the unique item id, the event id, and position of the message. async fn send_first_message( timeline: &TestTimeline, stream: &mut (impl Stream>> + Unpin), -) -> (OwnedEventId, usize) { +) -> (String, OwnedEventId, usize) { timeline.handle_live_event(timeline.factory.text_msg("I want you to react").sender(&BOB)).await; let item = assert_next_matches!(*stream, VectorDiff::PushBack { value } => value); - let event_id = item.as_event().unwrap().clone().event_id().unwrap().to_owned(); + let event_id = item.as_event().unwrap().as_remote().unwrap().event_id.clone(); let position = timeline.len().await - 1; let day_divider = assert_next_matches!(*stream, VectorDiff::PushFront { value } => value); assert!(day_divider.is_day_divider()); - (event_id, position) + (item.unique_id().to_owned(), event_id, position) } diff --git a/crates/matrix-sdk-ui/src/timeline/util.rs b/crates/matrix-sdk-ui/src/timeline/util.rs index 7d1c648be6a..af9e316cb42 100644 --- a/crates/matrix-sdk-ui/src/timeline/util.rs +++ b/crates/matrix-sdk-ui/src/timeline/util.rs @@ -39,12 +39,7 @@ impl<'a> EventTimelineItemWithId<'a> { /// Create a clone of the underlying [`TimelineItem`] with the given /// reactions. pub fn with_reactions(&self, reactions: ReactionsByKeyBySender) -> Arc { - let remote_item = self - .inner - .as_remote() - .expect("should only be remote at the moment (TODO: local)") - .with_reactions(reactions); - let event_item = self.inner.with_kind(remote_item); + let event_item = self.inner.with_reactions(reactions); TimelineItem::new(event_item, self.internal_id.to_owned()) } } @@ -57,13 +52,10 @@ impl Deref for EventTimelineItemWithId<'_> { } } -/// Finds an item in the vector of `items` given a predicate `f`. -/// -/// WARNING/FIXME: this does a linear scan of the items, so this can be slow if -/// there are many items in the timeline. -pub(super) fn rfind_event_item( +#[inline(always)] +fn rfind_event_item_internal( items: &Vector>, - mut f: impl FnMut(&EventTimelineItem) -> bool, + mut f: impl FnMut(&EventTimelineItemWithId<'_>) -> bool, ) -> Option<(usize, EventTimelineItemWithId<'_>)> { items .iter() @@ -74,10 +66,35 @@ pub(super) fn rfind_event_item( EventTimelineItemWithId { inner: item.as_event()?, internal_id: &item.internal_id }, )) }) - .rfind(|(_, it)| f(it.inner)) + .rfind(|(_, it)| f(it)) +} + +/// Finds an item in the vector of `items` given a predicate `f`. +/// +/// WARNING/FIXME: this does a linear scan of the items, so this can be slow if +/// there are many items in the timeline. +pub(super) fn rfind_event_item( + items: &Vector>, + mut f: impl FnMut(&EventTimelineItem) -> bool, +) -> Option<(usize, EventTimelineItemWithId<'_>)> { + rfind_event_item_internal(items, |item_with_id| f(item_with_id.inner)) +} + +/// Find the timeline item that matches the given internal id, if any. +/// +/// WARNING: Linear scan of the items, see documentation of +/// [`rfind_event_item`]. +pub(super) fn rfind_event_by_uid<'a>( + items: &'a Vector>, + internal_id: &'a str, +) -> Option<(usize, EventTimelineItemWithId<'a>)> { + rfind_event_item_internal(items, |item_with_id| item_with_id.internal_id == internal_id) } /// Find the timeline item that matches the given event id, if any. +/// +/// WARNING: Linear scan of the items, see documentation of +/// [`rfind_event_item`]. pub(super) fn rfind_event_by_id<'a>( items: &'a Vector>, event_id: &EventId, diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs b/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs index cb68d904f29..8202af31d40 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs @@ -12,19 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Duration; +use std::{sync::Mutex, time::Duration}; use assert_matches2::{assert_let, assert_matches}; use eyeball_im::VectorDiff; use futures_util::{FutureExt as _, StreamExt as _}; -use matrix_sdk::test_utils::{events::EventFactory, logged_in_client_with_server}; +use matrix_sdk::{ + assert_next_matches_with_timeout, + test_utils::{events::EventFactory, logged_in_client_with_server}, +}; use matrix_sdk_test::{ async_test, mocks::{mock_encryption_state, mock_redaction}, JoinedRoomBuilder, SyncResponseBuilder, ALICE, }; use matrix_sdk_ui::timeline::{ReactionStatus, RoomExt as _}; -use ruma::{event_id, events::relation::Annotation, room_id}; +use ruma::{event_id, events::room::message::RoomMessageEventContent, room_id}; use serde_json::json; use wiremock::{ matchers::{header, method, path_regex}, @@ -72,6 +75,7 @@ async fn test_abort_before_being_sent() { assert_let!(Some(VectorDiff::PushBack { value: first }) = stream.next().await); let item = first.as_event().unwrap(); + let unique_id = first.unique_id(); assert_eq!(item.content().as_message().unwrap().body(), "hello"); assert_let!(Some(VectorDiff::PushFront { value: day_divider }) = stream.next().await); @@ -98,7 +102,7 @@ async fn test_abort_before_being_sent() { mock_redaction(event_id!("$3")).mount(&server).await; // We add the reaction… - timeline.toggle_reaction(&Annotation::new(event_id.to_owned(), "👍".to_owned())).await.unwrap(); + timeline.toggle_reaction(unique_id, "👍").await.unwrap(); // First toggle (local echo). { @@ -115,7 +119,7 @@ async fn test_abort_before_being_sent() { } // We toggle another reaction at the same time… - timeline.toggle_reaction(&Annotation::new(event_id.to_owned(), "🥰".to_owned())).await.unwrap(); + timeline.toggle_reaction(unique_id, "🥰").await.unwrap(); { assert_let!(Some(VectorDiff::Set { index: 1, value: item }) = stream.next().await); @@ -136,7 +140,7 @@ async fn test_abort_before_being_sent() { // Then we remove the first one; because it was being sent, it should lead to a // redaction event. - timeline.toggle_reaction(&Annotation::new(event_id.to_owned(), "👍".to_owned())).await.unwrap(); + timeline.toggle_reaction(unique_id, "👍").await.unwrap(); { assert_let!(Some(VectorDiff::Set { index: 1, value: item }) = stream.next().await); @@ -153,7 +157,7 @@ async fn test_abort_before_being_sent() { // But because the first one was being sent, this one won't and the local echo // could be discarded. - timeline.toggle_reaction(&Annotation::new(event_id.to_owned(), "🥰".to_owned())).await.unwrap(); + timeline.toggle_reaction(unique_id, "🥰").await.unwrap(); { assert_let!(Some(VectorDiff::Set { index: 1, value: item }) = stream.next().await); @@ -210,16 +214,21 @@ async fn test_redact_failed() { let _response = client.sync_once(Default::default()).await.unwrap(); server.reset().await; - assert_let!(Some(VectorDiff::PushBack { value: item }) = stream.next().await); - let item = item.as_event().unwrap(); - assert_eq!(item.content().as_message().unwrap().body(), "hello"); - assert!(item.reactions().is_empty()); + let unique_id = assert_next_matches_with_timeout!(stream, VectorDiff::PushBack { value: item } => { + let unique_id = item.unique_id().to_owned(); + let item = item.as_event().unwrap(); + assert_eq!(item.content().as_message().unwrap().body(), "hello"); + assert!(item.reactions().is_empty()); + unique_id + }); - assert_let!(Some(VectorDiff::Set { index: 0, value: item }) = stream.next().await); - assert_eq!(item.as_event().unwrap().reactions().len(), 1); + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 0, value: item } => { + assert_eq!(item.as_event().unwrap().reactions().len(), 1); + }); - assert_let!(Some(VectorDiff::PushFront { value: day_divider }) = stream.next().await); - assert!(day_divider.is_day_divider()); + assert_next_matches_with_timeout!(stream, VectorDiff::PushFront { value: day_divider } => { + assert!(day_divider.is_day_divider()); + }); // Now, redact the annotation we previously added. @@ -233,19 +242,141 @@ async fn test_redact_failed() { .await; // We toggle the reaction, which fails with an error. - timeline - .toggle_reaction(&Annotation::new(event_id.to_owned(), "😆".to_owned())) - .await - .unwrap_err(); + timeline.toggle_reaction(&unique_id, "😆").await.unwrap_err(); // The local echo is removed (assuming the redaction works)… - assert_let!(Some(VectorDiff::Set { index: 1, value: item }) = stream.next().await); - assert!(item.as_event().unwrap().reactions().is_empty()); + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 1, value: item } => { + assert!(item.as_event().unwrap().reactions().is_empty()); + }); // …then added back, after redaction failed. - assert_let!(Some(VectorDiff::Set { index: 1, value: item }) = stream.next().await); - assert_eq!(item.as_event().unwrap().reactions().len(), 1); + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 1, value: item } => { + assert_eq!(item.as_event().unwrap().reactions().len(), 1); + }); + + tokio::time::sleep(Duration::from_millis(150)).await; + assert!(stream.next().now_or_never().is_none()); +} + +#[async_test] +async fn test_local_reaction_to_local_echo() { + // This test checks that if a reaction redaction failed, then we re-insert the + // reaction after displaying it was removed. + + let room_id = room_id!("!a98sd12bjh:example.org"); + let (client, server) = logged_in_client_with_server().await; + let user_id = client.user_id().unwrap(); + + // Make the test aware of the room. + let mut sync_builder = SyncResponseBuilder::new(); + sync_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + + mock_sync(&server, sync_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(Default::default()).await.unwrap(); + server.reset().await; + + mock_encryption_state(&server, false).await; + + let room = client.get_room(room_id).unwrap(); + let timeline = room.timeline().await.unwrap(); + let (initial_items, mut stream) = timeline.subscribe().await; + + assert!(initial_items.is_empty()); + + // Add a duration to the response, so we can check other things in the + // meanwhile. + let next_event_id = Mutex::new(0); + Mock::given(method("PUT")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*")) + .and(header("authorization", "Bearer 1234")) + .respond_with(move |_req: &wiremock::Request| { + let mut next_event_id = next_event_id.lock().unwrap(); + let event_id = *next_event_id; + *next_event_id += 1; + let mut tmp = ResponseTemplate::new(200).set_body_json(json!({ + "event_id": format!("${event_id}"), + })); + + if event_id == 0 { + tmp = tmp.set_delay(Duration::from_secs(1)); + } + + tmp + }) + .mount(&server) + .await; + + // Send a local event. + let _ = timeline.send(RoomMessageEventContent::text_plain("lol").into()).await.unwrap(); + + // Receive a local echo. + let unique_id = assert_next_matches_with_timeout!(stream, VectorDiff::PushBack { value: item } => { + let unique_id = item.unique_id().to_owned(); + let item = item.as_event().unwrap(); + assert!(item.is_local_echo()); + assert_eq!(item.content().as_message().unwrap().body(), "lol"); + assert!(item.reactions().is_empty()); + unique_id + }); + + // Good ol' day divider. + assert_next_matches_with_timeout!(stream, VectorDiff::PushFront { value: day_divider } => { + assert!(day_divider.is_day_divider()); + }); + + // Add a reaction before the remote echo comes back. + let key1 = "🤣"; + timeline.toggle_reaction(&unique_id, key1).await.unwrap(); + + // The reaction is added to the local echo. + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 1, value: item } => { + let reactions = item.as_event().unwrap().reactions(); + assert_eq!(reactions.len(), 1); + let reaction_info = reactions.get(key1).unwrap().get(user_id).unwrap(); + assert_matches!(&reaction_info.status, ReactionStatus::LocalToLocal(..)); + }); + + // Add another reaction. + let key2 = "😈"; + timeline.toggle_reaction(&unique_id, key2).await.unwrap(); + + // Also comes as a local echo. + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 1, value: item } => { + let reactions = item.as_event().unwrap().reactions(); + assert_eq!(reactions.len(), 2); + let reaction_info = reactions.get(key2).unwrap().get(user_id).unwrap(); + assert_matches!(&reaction_info.status, ReactionStatus::LocalToLocal(..)); + }); + + // Remove second reaction. It's immediately removed, since it was a local echo, + // and it wasn't being sent. + timeline.toggle_reaction(&unique_id, key2).await.unwrap(); + + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 1, value: item } => { + let reactions = item.as_event().unwrap().reactions(); + assert_eq!(reactions.len(), 1); + let reaction_info = reactions.get(key1).unwrap().get(user_id).unwrap(); + assert_matches!(&reaction_info.status, ReactionStatus::LocalToLocal(..)); + }); + + // Now, wait for the remote echo for the message itself. + assert_next_matches_with_timeout!(stream, 2000, VectorDiff::Set { index: 1, value: item } => { + let reactions = item.as_event().unwrap().reactions(); + assert_eq!(reactions.len(), 1); + let reaction_info = reactions.get(key1).unwrap().get(user_id).unwrap(); + // TODO(bnjbvr): why not LocalToRemote here? + assert_matches!(&reaction_info.status, ReactionStatus::LocalToLocal(..)); + }); + + // And then the remote echo for the reaction itself. + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 1, value: item } => { + let reactions = item.as_event().unwrap().reactions(); + assert_eq!(reactions.len(), 1); + let reaction_info = reactions.get(key1).unwrap().get(user_id).unwrap(); + assert_matches!(&reaction_info.status, ReactionStatus::RemoteToRemote(..)); + }); + // And we're done. tokio::time::sleep(Duration::from_millis(150)).await; assert!(stream.next().now_or_never().is_none()); } diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index 14712c57256..5ae92fffb28 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -61,6 +61,7 @@ use matrix_sdk_base::{ use matrix_sdk_common::executor::{spawn, JoinHandle}; use ruma::{ events::{ + reaction::ReactionEventContent, relation::Annotation, room::message::RoomMessageEventContentWithoutRelation, AnyMessageLikeEventContent, EventContent as _, }, @@ -265,6 +266,7 @@ pub struct RoomSendQueue { inner: Arc, } +#[cfg(not(tarpaulin_include))] impl std::fmt::Debug for RoomSendQueue { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RoomSendQueue").finish_non_exhaustive() @@ -343,9 +345,14 @@ impl RoomSendQueue { let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id: transaction_id.clone(), - serialized_event: content, - send_handle: SendHandle { room: self.clone(), transaction_id: transaction_id.clone() }, - is_wedged: false, + content: LocalEchoContent::Event { + serialized_event: content, + send_handle: SendHandle { + room: self.clone(), + transaction_id: transaction_id.clone(), + }, + is_wedged: false, + }, })); Ok(SendHandle { transaction_id, room: self.clone() }) @@ -381,22 +388,7 @@ impl RoomSendQueue { &self, ) -> Result<(Vec, broadcast::Receiver), RoomSendQueueError> { - let local_echoes = self - .inner - .queue - .local_echoes() - .await? - .into_iter() - .map(|queued| LocalEcho { - transaction_id: queued.transaction_id.clone(), - serialized_event: queued.event, - send_handle: SendHandle { - room: self.clone(), - transaction_id: queued.transaction_id, - }, - is_wedged: queued.is_wedged, - }) - .collect(); + let local_echoes = self.inner.queue.local_echoes(self).await?; Ok((local_echoes, self.inner.updates.subscribe())) } @@ -785,22 +777,95 @@ impl QueueStorage { Ok(edited) } + #[instrument(skip(self))] + async fn react( + &self, + transaction_id: &TransactionId, + key: String, + ) -> Result, RoomSendQueueStorageError> { + let client = self.client()?; + let store = client.store(); + + let queued_events = store.load_send_queue_events(&self.room_id).await?; + + // If the event has been already sent, abort immediately. + if !queued_events.iter().any(|item| item.transaction_id == transaction_id) { + return Ok(None); + } + + // Record the dependent event. + let reaction_txn_id = ChildTransactionId::new(); + store + .save_dependent_send_queue_event( + &self.room_id, + transaction_id, + reaction_txn_id.clone(), + DependentQueuedEventKind::React { key }, + ) + .await?; + + Ok(Some(reaction_txn_id)) + } + /// Returns a list of the local echoes, that is, all the events that we're /// about to send but that haven't been sent yet (or are being sent). - async fn local_echoes(&self) -> Result, RoomSendQueueStorageError> { - Ok(self.client()?.store().load_send_queue_events(&self.room_id).await?) + async fn local_echoes( + &self, + room: &RoomSendQueue, + ) -> Result, RoomSendQueueStorageError> { + let client = self.client()?; + let store = client.store(); + + let local_events = + store.load_send_queue_events(&self.room_id).await?.into_iter().map(|queued| { + LocalEcho { + transaction_id: queued.transaction_id.clone(), + content: LocalEchoContent::Event { + serialized_event: queued.event, + send_handle: SendHandle { + room: room.clone(), + transaction_id: queued.transaction_id, + }, + is_wedged: queued.is_wedged, + }, + } + }); + + let local_reactions = store + .list_dependent_send_queue_events(&self.room_id) + .await? + .into_iter() + .filter_map(|dep| match dep.kind { + DependentQueuedEventKind::Edit { .. } | DependentQueuedEventKind::Redact => None, + DependentQueuedEventKind::React { key } => Some(LocalEcho { + transaction_id: dep.own_transaction_id.clone().into(), + content: LocalEchoContent::React { + key, + send_handle: SendReactionHandle { + room: room.clone(), + transaction_id: dep.own_transaction_id, + }, + applies_to: dep.parent_transaction_id, + }, + }), + }); + + Ok(local_events.chain(local_reactions).collect()) } /// Try to apply a single dependent event, whether it's local or remote. /// /// This swallows errors that would retrigger every time if we retried /// applying the dependent event: invalid edit content, etc. + /// + /// Returns true if the dependent event has been sent (or should not be + /// retried later). #[instrument(skip_all)] async fn try_apply_single_dependent_event( &self, client: &Client, de: DependentQueuedEvent, - ) -> Result<(), RoomSendQueueError> { + ) -> Result { let store = client.store(); match de.kind { @@ -818,7 +883,7 @@ impl QueueStorage { Ok(c) => c, Err(err) => { warn!("unable to deserialize: {err}"); - return Ok(()); + return Ok(true); } }; @@ -826,7 +891,7 @@ impl QueueStorage { let AnyMessageLikeEventContent::RoomMessage(room_message_content) = content else { warn!("trying to send an edit event for a non-room message: aborting"); - return Ok(()); + return Ok(true); }; // Assume no relation. @@ -840,7 +905,7 @@ impl QueueStorage { Ok(e) => e, Err(err) => { warn!("couldn't create edited event: {err}"); - return Ok(()); + return Ok(true); } }; @@ -895,6 +960,7 @@ impl QueueStorage { room.redact(&event_id, None, Some(de.own_transaction_id.into())).await { warn!("error when sending a redact for {event_id}: {err}"); + return Ok(false); } } else { // The parent event is still local (sending must have failed); redact the local @@ -905,13 +971,38 @@ impl QueueStorage { .map_err(RoomSendQueueStorageError::StorageError)?; if !removed { - warn!("missing local echo upon dependent redact??"); + warn!("missing local echo upon dependent redact"); } } } + + DependentQueuedEventKind::React { key } => { + if let Some(event_id) = de.event_id { + // Queue the reaction event in the send queue 🧠. + let react_event = + ReactionEventContent::new(Annotation::new(event_id, key)).into(); + let serializable = SerializableEventContent::from_raw( + Raw::new(&react_event) + .map_err(RoomSendQueueStorageError::JsonSerialization)?, + react_event.event_type().to_string(), + ); + + store + .save_send_queue_event( + &self.room_id, + de.own_transaction_id.into(), + serializable, + ) + .await + .map_err(RoomSendQueueStorageError::StorageError)?; + } else { + // Not applied yet, we should retry later => false. + return Ok(false); + } + } } - Ok(()) + Ok(true) } #[instrument(skip(self))] @@ -928,6 +1019,10 @@ impl QueueStorage { .map_err(RoomSendQueueStorageError::StorageError)?; let num_initial_dependent_events = dependent_events.len(); + if num_initial_dependent_events == 0 { + // Returning early here avoids a bit of useless logging. + return Ok(()); + } let canonicalized_dependent_events = canonicalize_dependent_events(&dependent_events); @@ -955,14 +1050,16 @@ impl QueueStorage { let dependent_id = dependent.own_transaction_id.clone(); match self.try_apply_single_dependent_event(&client, dependent).await { - Ok(()) => { - // The dependent event has been successfully applied, forget about it. - store - .remove_dependent_send_queue_event(&self.room_id, &dependent_id) - .await - .map_err(RoomSendQueueStorageError::StorageError)?; - - num_dependent_events -= 1; + Ok(should_remove) => { + if should_remove { + // The dependent event has been successfully applied, forget about it. + store + .remove_dependent_send_queue_event(&self.room_id, &dependent_id) + .await + .map_err(RoomSendQueueStorageError::StorageError)?; + + num_dependent_events -= 1; + } } Err(err) => { @@ -978,6 +1075,47 @@ impl QueueStorage { Ok(()) } + + /// Remove a single dependent event from storage. + async fn remove_dependent_send_queue_event( + &self, + dependent_event_id: &ChildTransactionId, + ) -> Result { + // Keep the lock until we're done touching the storage. + let _being_sent = self.being_sent.read().await; + + Ok(self + .client()? + .store() + .remove_dependent_send_queue_event(&self.room_id, dependent_event_id) + .await?) + } +} + +/// The content of a local echo. +#[derive(Clone, Debug)] +pub enum LocalEchoContent { + /// The local echo contains an actual event ready to display. + Event { + /// Content of the event itself (along with its type) that we are about + /// to send. + serialized_event: SerializableEventContent, + /// A handle to manipulate the sending of the associated event. + send_handle: SendHandle, + /// Whether trying to send this local echo failed in the past with an + /// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]). + is_wedged: bool, + }, + + /// A local echo has been reacted to. + React { + /// The key with which the local echo has been reacted to. + key: String, + /// A handle to manipulate the sending of the reaction. + send_handle: SendReactionHandle, + /// The local echo which has been reacted to. + applies_to: OwnedTransactionId, + }, } /// An event that has been locally queued for sending, but hasn't been sent yet. @@ -985,14 +1123,8 @@ impl QueueStorage { pub struct LocalEcho { /// Transaction id used to identify this event. pub transaction_id: OwnedTransactionId, - /// Content of the event itself (along with its type) that we are about to - /// send. - pub serialized_event: SerializableEventContent, - /// A handle to manipulate the sending of the associated event. - pub send_handle: SendHandle, - /// Whether trying to send this local echo failed in the past with an - /// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]). - pub is_wedged: bool, + /// The content for the local echo. + pub content: LocalEchoContent, } /// An update to a room send queue, observable with @@ -1159,6 +1291,91 @@ impl SendHandle { ) .await } + + /// Send a reaction to the event as soon as it's sent. + /// + /// If returning `Ok(None)`; this means the reaction couldn't be sent + /// because the event is already a remote one. + #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))] + pub async fn react( + &self, + key: String, + ) -> Result, RoomSendQueueStorageError> { + trace!("received an intent to react"); + + if let Some(reaction_txn_id) = + self.room.inner.queue.react(&self.transaction_id, key.clone()).await? + { + trace!("successfully queued react"); + + // Wake up the queue, in case the room was asleep before the sending. + self.room.inner.notifier.notify_one(); + + // Propagate a new local event. + let send_handle = SendReactionHandle { + room: self.room.clone(), + transaction_id: reaction_txn_id.clone(), + }; + + let _ = self.room.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + // Note: we do want to use the txn_id we're going to use for the reaction, not the + // one for the event we're reacting to. + transaction_id: reaction_txn_id.into(), + content: LocalEchoContent::React { + key, + send_handle: send_handle.clone(), + applies_to: self.transaction_id.clone(), + }, + })); + + Ok(Some(send_handle)) + } else { + debug!("local echo doesn't exist anymore, can't react"); + Ok(None) + } + } +} + +/// A handle to execute actions on the sending of a reaction. +#[derive(Clone, Debug)] +pub struct SendReactionHandle { + /// Reference to the send queue for the room where this reaction was sent. + room: RoomSendQueue, + /// The own transaction id for the reaction. + transaction_id: ChildTransactionId, +} + +impl SendReactionHandle { + /// Abort the sending of the reaction. + /// + /// Will return true if the reaction could be aborted, false if it's been + /// sent (and there's no matching local echo anymore). + pub async fn abort(&self) -> Result { + if self.room.inner.queue.remove_dependent_send_queue_event(&self.transaction_id).await? { + // Simple case: the reaction was found in the dependent event list. + + // Propagate a cancelled update too. + let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent { + transaction_id: self.transaction_id.clone().into(), + }); + + return Ok(true); + } + + // The reaction has already been queued for sending, try to abort it using a + // regular abort. + let handle = SendHandle { + room: self.room.clone(), + transaction_id: self.transaction_id.clone().into(), + }; + + handle.abort().await + } + + /// The transaction id that will be used to send this reaction later. + pub fn transaction_id(&self) -> &TransactionId { + &self.transaction_id + } } /// From a given source of [`DependentQueuedEvent`], return only the most @@ -1185,10 +1402,14 @@ fn canonicalize_dependent_events(dependent: &[DependentQueuedEvent]) -> Vec { + prevs.push(d); + } + DependentQueuedEventKind::Redact => { // Remove every other dependent action. prevs.clear(); @@ -1413,4 +1634,37 @@ mod tests { } } } + + #[test] + fn test_canonicalize_reactions_after_edits() { + // Sending reactions should happen after edits to a given event. + let txn = TransactionId::new(); + + let react_id = ChildTransactionId::new(); + let react = DependentQueuedEvent { + own_transaction_id: react_id.clone(), + kind: DependentQueuedEventKind::React { key: "🧠".to_owned() }, + parent_transaction_id: txn.clone(), + event_id: None, + }; + + let edit_id = ChildTransactionId::new(); + let edit = DependentQueuedEvent { + own_transaction_id: edit_id.clone(), + kind: DependentQueuedEventKind::Edit { + new_content: SerializableEventContent::new( + &RoomMessageEventContent::text_plain("edit").into(), + ) + .unwrap(), + }, + parent_transaction_id: txn, + event_id: None, + }; + + let res = canonicalize_dependent_events(&[react, edit]); + + assert_eq!(res.len(), 2); + assert_eq!(res[0].own_transaction_id, edit_id); + assert_eq!(res[1].own_transaction_id, react_id); + } } diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index 703419b3d54..53494791f11 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -10,7 +10,7 @@ use std::{ use assert_matches2::{assert_let, assert_matches}; use matrix_sdk::{ config::{RequestConfig, StoreConfig}, - send_queue::{LocalEcho, RoomSendQueueError, RoomSendQueueUpdate}, + send_queue::{LocalEcho, LocalEchoContent, RoomSendQueueError, RoomSendQueueUpdate}, test_utils::{ events::EventFactory, logged_in_client, logged_in_client_with_server, set_client_session, }, @@ -68,11 +68,13 @@ macro_rules! assert_update { ($watch:ident => local echo { body = $body:expr }) => {{ assert_let!( Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { - serialized_event, + content: LocalEchoContent::Event { + serialized_event, + send_handle, + // New local echoes should always start as not wedged. + is_wedged: false, + }, transaction_id: txn, - send_handle, - // New local echoes should always start as not wedged. - is_wedged: false, }))) = timeout(Duration::from_secs(1), $watch.recv()).await ); @@ -83,6 +85,26 @@ macro_rules! assert_update { (txn, send_handle) }}; + // Check the next stream event is a local echo for a reaction with the content $key which + // applies to the local echo with transaction id $parent. + ($watch:ident => local reaction { key = $key:expr, parent = $parent_txn_id:expr }) => {{ + assert_let!( + Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + content: LocalEchoContent::React { + key, + applies_to, + send_handle: _, + }, + transaction_id: txn, + }))) = timeout(Duration::from_secs(1), $watch.recv()).await + ); + + assert_eq!(key, $key); + assert_eq!(applies_to, $parent_txn_id); + + txn + }}; + // Check the next stream event is an edit for a local echo with the content $body, and that the // transaction id is the one we expect. ($watch:ident => edit { body = $body:expr, txn = $transaction_id:expr }) => {{ @@ -338,9 +360,8 @@ async fn test_smoke_raw() { assert_let!( Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { - serialized_event, + content: LocalEchoContent::Event { serialized_event, .. }, transaction_id: txn1, - .. }))) = timeout(Duration::from_secs(1), watch.recv()).await ); @@ -758,7 +779,8 @@ async fn test_cancellation() { // Let the background task start now. tokio::task::yield_now().await; - // While the first item is being sent, the system records the intent to edit it. + // While the first item is being sent, the system records the intent to abort + // it. assert!(handle1.abort().await.unwrap()); assert_update!(watch => cancelled { txn = txn1 }); assert!(watch.is_empty()); @@ -785,8 +807,9 @@ async fn test_cancellation() { let local_echo4 = local_echoes.remove(1); assert_eq!(local_echo4.transaction_id, txn4, "local echoes: {local_echoes:?}"); - let handle4 = local_echo4.send_handle; - + let LocalEchoContent::Event { send_handle: handle4, .. } = local_echo4.content else { + panic!("unexpected local echo content"); + }; assert!(handle4.abort().await.unwrap()); assert_update!(watch => cancelled { txn = txn4 }); assert!(watch.is_empty()); @@ -1014,7 +1037,11 @@ async fn test_edit_while_being_sent_and_fails() { assert_eq!(local_echoes.len(), 1); assert_eq!(local_echoes[0].transaction_id, txn1); - let event = local_echoes[0].serialized_event.deserialize().unwrap(); + let LocalEchoContent::Event { serialized_event, .. } = &local_echoes[0].content else { + panic!("unexpected local echo content") + }; + let event = serialized_event.deserialize().unwrap(); + assert_let!(AnyMessageLikeEventContent::RoomMessage(msg) = event); assert_eq!(msg.body(), "it's never too late!"); } @@ -1197,6 +1224,8 @@ async fn test_abort_or_edit_after_send() { .not()); // Neither will aborting. assert!(handle.abort().await.unwrap().not()); + // Or sending a reaction. + assert!(handle.react("😊".to_owned()).await.unwrap().is_none()); assert!(watch.is_empty()); } @@ -1535,3 +1564,142 @@ async fn test_reloading_rooms_with_unsent_events() { // The real assertion is on the expect(2) on the above Mock. server.verify().await; } + +#[async_test] +async fn test_reactions() { + let (client, server) = logged_in_client_with_server().await; + + // Mark the room as joined. + let room_id = room_id!("!a:b.c"); + + let room = mock_sync_with_new_room( + |builder| { + builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + }, + &client, + &server, + room_id, + ) + .await; + + let q = room.send_queue(); + + let (local_echoes, mut watch) = q.subscribe().await.unwrap(); + assert!(local_echoes.is_empty()); + assert!(watch.is_empty()); + + let lock = Arc::new(Mutex::new(0)); + let lock_guard = lock.lock().await; + + let mock_lock = lock.clone(); + + mock_encryption_state(&server, false).await; + + Mock::given(method("PUT")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*")) + .and(header("authorization", "Bearer 1234")) + .respond_with(move |_req: &Request| { + // Wait for the signal from the main thread that we can process this query. + let mock_lock = mock_lock.clone(); + let event_id = std::thread::spawn(move || { + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut event_id = mock_lock.lock().await; + let ret = *event_id; + *event_id += 1; + ret + }) + }) + .join() + .unwrap(); + + ResponseTemplate::new(200).set_body_json(json!({ + "event_id": format!("${event_id}"), + })) + }) + .expect(3) + .mount(&server) + .await; + + // Sending of the second emoji has started; abort it, it will result in a redact + // request. + Mock::given(method("PUT")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/redact/.*?/.*?")) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({"event_id": "$3"}))) + .expect(1) + .mount(&server) + .await; + + // Send a message. + let msg_handle = + room.send_queue().send(RoomMessageEventContent::text_plain("1").into()).await.unwrap(); + + // React to it a few times. + let emoji_handle = + msg_handle.react("💯".to_owned()).await.unwrap().expect("first emoji was queued"); + let emoji_handle2 = + msg_handle.react("🍭".to_owned()).await.unwrap().expect("second emoji was queued"); + let emoji_handle3 = + msg_handle.react("👍".to_owned()).await.unwrap().expect("fourth emoji was queued"); + + let (txn1, _) = assert_update!(watch => local echo { body = "1" }); + let emoji1_txn = assert_update!(watch => local reaction { key = "💯", parent = txn1 }); + let emoji2_txn = assert_update!(watch => local reaction { key = "🍭", parent = txn1 }); + let emoji3_txn = assert_update!(watch => local reaction { key = "👍", parent = txn1 }); + + { + let (local_echoes, _) = q.subscribe().await.unwrap(); + + assert_eq!(local_echoes.len(), 4); + assert_eq!(local_echoes[0].transaction_id, txn1); + + assert_eq!(local_echoes[1].transaction_id, emoji1_txn); + assert_let!(LocalEchoContent::React { key, applies_to, .. } = &local_echoes[1].content); + assert_eq!(key, "💯"); + assert_eq!(*applies_to, txn1); + + assert_eq!(local_echoes[2].transaction_id, emoji2_txn); + assert_let!(LocalEchoContent::React { key, applies_to, .. } = &local_echoes[2].content); + assert_eq!(key, "🍭"); + assert_eq!(*applies_to, txn1); + + assert_eq!(local_echoes[3].transaction_id, emoji3_txn); + assert_let!(LocalEchoContent::React { key, applies_to, .. } = &local_echoes[3].content); + assert_eq!(key, "👍"); + assert_eq!(*applies_to, txn1); + } + + // Cancel the first reaction before the original event is sent. + let aborted = emoji_handle.abort().await.unwrap(); + assert!(aborted); + assert_update!(watch => cancelled { txn = emoji1_txn }); + assert!(watch.is_empty()); + + // Let the original event be sent, and re-take the lock immediately so no + // reactions aren't sent (since the lock is fair). + drop(lock_guard); + assert_update!(watch => sent { txn = txn1, event_id = event_id!("$0") }); + let lock_guard = lock.lock().await; + assert!(watch.is_empty()); + + // Abort sending of the second emoji. It was being sent, so it's first cancelled + // *then* sent and redacted. + let aborted = emoji_handle2.abort().await.unwrap(); + assert!(aborted); + assert_update!(watch => cancelled { txn = emoji2_txn }); + assert!(watch.is_empty()); + + // Drop the guard to let the mock server process events. + drop(lock_guard); + + // Previous emoji has been sent; it will be redacted later. + assert_update!(watch => sent { txn = emoji2_txn, event_id = event_id!("$1") }); + + // The final emoji is sent. + assert_update!(watch => sent { txn = emoji3_txn, event_id = event_id!("$2") }); + + // Cancelling sending of the third emoji fails because it's been sent already. + assert!(emoji_handle3.abort().await.unwrap().not()); + + assert!(watch.is_empty()); +} diff --git a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs index fc9c8129a6a..80ebce3b2db 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs @@ -22,8 +22,7 @@ use eyeball_im::{Vector, VectorDiff}; use futures_util::{FutureExt, StreamExt}; use matrix_sdk::ruma::{ api::client::room::create_room::v3::Request as CreateRoomRequest, - events::{relation::Annotation, room::message::RoomMessageEventContent}, - MilliSecondsSinceUnixEpoch, + events::room::message::RoomMessageEventContent, MilliSecondsSinceUnixEpoch, }; use matrix_sdk_ui::timeline::{EventSendState, ReactionStatus, RoomExt, TimelineItem}; use tokio::{ @@ -85,15 +84,17 @@ async fn test_toggling_reaction() -> Result<()> { items.iter().find_map(|item| { let event = item.as_event()?; if !event.is_local_echo() && event.content().as_message()?.body().trim() == "hi!" { - event.event_id().map(|event_id| event_id.to_owned()) + event + .event_id() + .map(|event_id| (item.unique_id().to_owned(), event_id.to_owned())) } else { None } }) }; - if let Some(event_id) = find_event_id(&items) { - return Ok(event_id); + if let Some(pair) = find_event_id(&items) { + return Ok(pair); } warn!(?items, "Waiting for updates…"); @@ -101,8 +102,8 @@ async fn test_toggling_reaction() -> Result<()> { while let Some(diff) = stream.next().await { warn!(?diff, "received a diff"); diff.apply(&mut items); - if let Some(event_id) = find_event_id(&items) { - return Ok(event_id); + if let Some(pair) = find_event_id(&items) { + return Ok(pair); } } @@ -117,7 +118,7 @@ async fn test_toggling_reaction() -> Result<()> { debug!("Sending initial message…"); timeline.send(RoomMessageEventContent::text_plain("hi!").into()).await.unwrap(); - let event_id = timeout(Duration::from_secs(10), event_id_task) + let (msg_uid, event_id) = timeout(Duration::from_secs(10), event_id_task) .await .expect("timeout") .expect("failed to join tokio task") @@ -143,19 +144,19 @@ async fn test_toggling_reaction() -> Result<()> { .find_map(|(i, item)| (item.as_event()?.event_id()? == event_id).then_some(i)) .expect("couldn't find the final position for the event id"); - let reaction = Annotation::new(event_id.clone(), "👍".to_owned()); + let reaction_key = "👍".to_owned(); // Toggle reaction multiple times. for _ in 0..3 { debug!("Starting the toggle reaction tests…"); // Add the reaction. - timeline.toggle_reaction(&reaction).await.expect("toggling reaction"); + timeline.toggle_reaction(&msg_uid, &reaction_key).await.expect("toggling reaction"); // Local echo is added. { let event = assert_event_is_updated!(stream, event_id, message_position); - let reactions = event.reactions().get(&reaction.key).unwrap(); + let reactions = event.reactions().get(&reaction_key).unwrap(); let reaction = reactions.get(&user_id).unwrap(); assert_matches!(reaction.status, ReactionStatus::LocalToRemote(..)); } @@ -164,7 +165,7 @@ async fn test_toggling_reaction() -> Result<()> { { let event = assert_event_is_updated!(stream, event_id, message_position); - let reactions = event.reactions().get(&reaction.key).unwrap(); + let reactions = event.reactions().get(&reaction_key).unwrap(); assert_eq!(reactions.keys().count(), 1); let reaction = reactions.get(&user_id).unwrap(); @@ -178,7 +179,10 @@ async fn test_toggling_reaction() -> Result<()> { } // Redact the reaction. - timeline.toggle_reaction(&reaction).await.expect("toggling reaction the second time"); + timeline + .toggle_reaction(&msg_uid, &reaction_key) + .await + .expect("toggling reaction the second time"); // The reaction is removed. let event = assert_event_is_updated!(stream, event_id, message_position); diff --git a/xtask/src/ci.rs b/xtask/src/ci.rs index 56ac0f9a679..4507baf44c1 100644 --- a/xtask/src/ci.rs +++ b/xtask/src/ci.rs @@ -293,7 +293,7 @@ fn run_wasm_checks(cmd: Option) -> Result<()> { WasmFeatureSet::MatrixSdkNoDefault, "-p matrix-sdk --no-default-features --features js,rustls-tls", ), - (WasmFeatureSet::MatrixSdkBase, "-p matrix-sdk-base --features js"), + (WasmFeatureSet::MatrixSdkBase, "-p matrix-sdk-base --features js,test-send-sync"), (WasmFeatureSet::MatrixSdkCommon, "-p matrix-sdk-common --features js"), ( WasmFeatureSet::MatrixSdkIndexeddbStoresNoCrypto,