diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index c0c667c0b71..68d5445a849 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -1252,6 +1252,18 @@ pub enum DependentQueuedEventKind { /// The event should be redacted/aborted/removed. Redact, + + /// The event should be reacted to, with the given key. + React { + /// Key used for the reaction. + key: String, + + /// The transaction id to use for sending the reaction. + /// + /// Note: it's not the one for the local echo of the event we're + /// reacting to. + transaction_id: OwnedTransactionId, + }, } /// An event to be sent, depending on a [`QueuedEvent`] to be sent first. diff --git a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs index 0ed92a48b30..8b5afb586e0 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs @@ -1161,6 +1161,10 @@ impl TimelineInner

{ .await; } } + + LocalEchoContent::React { key, send_handle } => { + todo!(); + } } } diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index de915f3c494..1e4fcd60218 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -60,6 +60,7 @@ use matrix_sdk_base::{ use matrix_sdk_common::executor::{spawn, JoinHandle}; use ruma::{ events::{ + reaction::ReactionEventContent, relation::Annotation, room::message::RoomMessageEventContentWithoutRelation, AnyMessageLikeEventContent, EventContent as _, }, @@ -386,24 +387,7 @@ impl RoomSendQueue { &self, ) -> Result<(Vec, broadcast::Receiver), RoomSendQueueError> { - let local_echoes = self - .inner - .queue - .local_echoes() - .await? - .into_iter() - .map(|queued| LocalEcho { - transaction_id: queued.transaction_id.clone(), - content: LocalEchoContent::Event { - serialized_event: queued.event, - send_handle: SendHandle { - room: self.clone(), - transaction_id: queued.transaction_id, - }, - is_wedged: queued.is_wedged, - }, - }) - .collect(); + let local_echoes = self.inner.queue.local_echoes(self).await?; Ok((local_echoes, self.inner.updates.subscribe())) } @@ -790,22 +774,94 @@ impl QueueStorage { Ok(edited) } + #[instrument(skip(self))] + async fn react( + &self, + transaction_id: &TransactionId, + key: String, + ) -> Result, RoomSendQueueStorageError> { + let client = self.client()?; + let store = client.store(); + + let queued_events = store.load_send_queue_events(&self.room_id).await?; + + // If the event has been already sent, abort immediately. + if !queued_events.iter().any(|item| item.transaction_id == transaction_id) { + return Ok(None); + } + + // Record the dependent event. + let reaction_txn_id = TransactionId::new(); + let dependent_event_id = store + .save_dependent_send_queue_event( + &self.room_id, + transaction_id, + DependentQueuedEventKind::React { key, transaction_id: reaction_txn_id.clone() }, + ) + .await?; + + Ok(Some((reaction_txn_id, dependent_event_id))) + } + /// Returns a list of the local echoes, that is, all the events that we're /// about to send but that haven't been sent yet (or are being sent). - async fn local_echoes(&self) -> Result, RoomSendQueueStorageError> { - Ok(self.client()?.store().load_send_queue_events(&self.room_id).await?) + async fn local_echoes( + &self, + room: &RoomSendQueue, + ) -> Result, RoomSendQueueStorageError> { + let client = self.client()?; + let store = client.store(); + + let local_events = + store.load_send_queue_events(&self.room_id).await?.into_iter().map(|queued| { + LocalEcho { + transaction_id: queued.transaction_id.clone(), + content: LocalEchoContent::Event { + serialized_event: queued.event, + send_handle: SendHandle { + room: room.clone(), + transaction_id: queued.transaction_id, + }, + is_wedged: queued.is_wedged, + }, + } + }); + + let local_reactions = store + .list_dependent_send_queue_events(&self.room_id) + .await? + .into_iter() + .filter_map(|dep| match dep.kind { + DependentQueuedEventKind::Edit { .. } | DependentQueuedEventKind::Redact => None, + DependentQueuedEventKind::React { key, transaction_id } => Some(LocalEcho { + transaction_id: transaction_id.clone(), + content: LocalEchoContent::React { + key, + send_handle: SendReactionHandle { + room: room.clone(), + transaction_id, + dependent_event_id: dep.id, + }, + }, + }), + }); + + Ok(local_events.chain(local_reactions).collect()) } /// Try to apply a single dependent event, whether it's local or remote. /// /// This swallows errors that would retrigger every time if we retried /// applying the dependent event: invalid edit content, etc. + /// + /// Returns true if the dependent event has been sent (or should not be + /// retried later). #[instrument(skip_all)] async fn try_apply_single_dependent_event( &self, client: &Client, de: DependentQueuedEvent, - ) -> Result<(), RoomSendQueueError> { + ) -> Result { let store = client.store(); match de.kind { @@ -823,7 +879,7 @@ impl QueueStorage { Ok(c) => c, Err(err) => { warn!("unable to deserialize: {err}"); - return Ok(()); + return Ok(true); } }; @@ -831,7 +887,7 @@ impl QueueStorage { let AnyMessageLikeEventContent::RoomMessage(room_message_content) = content else { warn!("trying to send an edit event for a non-room message: aborting"); - return Ok(()); + return Ok(true); }; // Assume no relation. @@ -845,7 +901,7 @@ impl QueueStorage { Ok(e) => e, Err(err) => { warn!("couldn't create edited event: {err}"); - return Ok(()); + return Ok(true); } }; @@ -886,6 +942,7 @@ impl QueueStorage { if let Err(err) = room.redact(&event_id, None, None).await { warn!("error when sending a redact for {event_id}: {err}"); + return Ok(false); } } else { // Still local (sending must have failed); redact the local echo. @@ -895,13 +952,34 @@ impl QueueStorage { .map_err(RoomSendQueueStorageError::StorageError)?; if !removed { - warn!("missing local echo upon dependent redact??"); + warn!("missing local echo upon dependent redact"); } } } + + DependentQueuedEventKind::React { key, transaction_id } => { + if let Some(event_id) = de.event_id { + // Queue the reaction event in the send queue 🧠. + let react_event = + ReactionEventContent::new(Annotation::new(event_id, key)).into(); + let serializable = SerializableEventContent::from_raw( + Raw::new(&react_event) + .map_err(RoomSendQueueStorageError::JsonSerialization)?, + react_event.event_type().to_string(), + ); + + store + .save_send_queue_event(&self.room_id, transaction_id, serializable) + .await + .map_err(RoomSendQueueStorageError::StorageError)?; + } else { + // Not applied yet, we should retry later => false. + return Ok(false); + } + } } - Ok(()) + Ok(true) } #[instrument(skip(self))] @@ -931,14 +1009,16 @@ impl QueueStorage { let dependent_event_id = de.id; match self.try_apply_single_dependent_event(&client, de).await { - Ok(()) => { - // The dependent event has been successfully applied, forget about it. - store - .remove_dependent_send_queue_event(&self.room_id, dependent_event_id) - .await - .map_err(RoomSendQueueStorageError::StorageError)?; - - num_dependent_events -= 1; + Ok(should_remove) => { + if should_remove { + // The dependent event has been successfully applied, forget about it. + store + .remove_dependent_send_queue_event(&self.room_id, dependent_event_id) + .await + .map_err(RoomSendQueueStorageError::StorageError)?; + + num_dependent_events -= 1; + } } Err(err) => { @@ -951,6 +1031,21 @@ impl QueueStorage { Ok(()) } + + /// Remove a single dependent event from storage. + async fn remove_dependent_send_queue_event( + &self, + dependent_event_id: usize, + ) -> Result { + // Keep the lock until we're done touching the storage. + let _being_sent = self.being_sent.read().await; + + Ok(self + .client()? + .store() + .remove_dependent_send_queue_event(&self.room_id, dependent_event_id) + .await?) + } } /// The content of a local echo. @@ -967,6 +1062,14 @@ pub enum LocalEchoContent { /// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]). is_wedged: bool, }, + + /// A local echo has been reacted to. + React { + /// The key with which the local echo has been reacted to. + key: String, + /// A handle to manipulate the sending of the reaction. + send_handle: SendReactionHandle, + }, } /// An event that has been locally queued for sending, but hasn't been sent yet. @@ -1142,6 +1245,80 @@ impl SendHandle { ) .await } + + /// Send a reaction to the event as soon as it's sent. + /// + /// If returning `Ok(None)`; this means the reaction couldn't be sent + /// because the event is already a remote one. + #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))] + pub async fn react( + &self, + key: String, + ) -> Result, RoomSendQueueStorageError> { + trace!("received an intent to react"); + + if let Some((reaction_txn_id, dependent_event_id)) = + self.room.inner.queue.react(&self.transaction_id, key.clone()).await? + { + trace!("successfully queued react"); + + // Wake up the queue, in case the room was asleep before the sending. + self.room.inner.notifier.notify_one(); + + // Propagate a new local event. + let send_handle = SendReactionHandle { + room: self.room.clone(), + transaction_id: reaction_txn_id.clone(), + dependent_event_id, + }; + + let _ = self.room.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + // Note: we do want to use the txn_id we're going to use for the reaction, not the + // one for the event we're reacting to. + transaction_id: reaction_txn_id, + content: LocalEchoContent::React { key, send_handle: send_handle.clone() }, + })); + + Ok(Some(send_handle)) + } else { + debug!("local echo doesn't exist anymore, can't react"); + Ok(None) + } + } +} + +/// A handle to execute actions on the sending of a reaction. +#[derive(Clone, Debug)] +pub struct SendReactionHandle { + room: RoomSendQueue, + transaction_id: OwnedTransactionId, + dependent_event_id: usize, +} + +impl SendReactionHandle { + /// Abort the sending of the reaction. + /// + /// Will return true if the reaction could be aborted, false if it's been + /// sent (and there's no matching local echo anymore). + pub async fn abort(&self) -> Result { + if self.room.inner.queue.remove_dependent_send_queue_event(self.dependent_event_id).await? { + // Simple case: the reaction was found in the dependent event list. + + // Propagate a cancelled update too. + let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent { + transaction_id: self.transaction_id.clone(), + }); + + return Ok(true); + } + + // The reaction has already been queued for sending, try to abort it using a + // regular abort. + let handle = + SendHandle { room: self.room.clone(), transaction_id: self.transaction_id.clone() }; + + handle.abort().await + } } /// From a given source of [`DependentQueuedEvent`], return only the most @@ -1150,9 +1327,13 @@ impl SendHandle { fn canonicalize_dependent_events(dependent: &[DependentQueuedEvent]) -> Vec { let mut latest_edit = None; + let mut result = Vec::new(); for d in dependent { match &d.kind { DependentQueuedEventKind::Edit { .. } => latest_edit = Some(d.clone()), + DependentQueuedEventKind::React { .. } => { + result.push(d.clone()); + } DependentQueuedEventKind::Redact => { // Shortcut and return the redaction; any other action would be meaningless, // since it'll end up with a redact. @@ -1162,10 +1343,11 @@ fn canonicalize_dependent_events(dependent: &[DependentQueuedEvent]) -> Vec cancelled { txn = txn4 }); assert!(watch.is_empty()); @@ -1016,7 +1018,9 @@ async fn test_edit_while_being_sent_and_fails() { assert_eq!(local_echoes.len(), 1); assert_eq!(local_echoes[0].transaction_id, txn1); - let LocalEchoContent::Event { serialized_event, .. } = &local_echoes[0].content; + let LocalEchoContent::Event { serialized_event, .. } = &local_echoes[0].content else { + panic!("unexpected local echo content") + }; let event = serialized_event.deserialize().unwrap(); assert_let!(AnyMessageLikeEventContent::RoomMessage(msg) = event);