Skip to content

Commit

Permalink
timeline: add support for local reactions to local echoes
Browse files Browse the repository at this point in the history
  • Loading branch information
bnjbvr committed Aug 26, 2024
1 parent e261d6a commit 4835265
Show file tree
Hide file tree
Showing 10 changed files with 396 additions and 123 deletions.
6 changes: 2 additions & 4 deletions bindings/matrix-sdk-ffi/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use ruma::{
},
},
receipt::ReceiptThread,
relation::Annotation,
room::message::{
ForwardThread, LocationMessageEventContent, MessageType,
RoomMessageEventContentWithoutRelation,
Expand Down Expand Up @@ -538,9 +537,8 @@ impl Timeline {
let _ = self.send(Arc::new(room_message_event_content)).await;
}

pub async fn toggle_reaction(&self, event_id: String, key: String) -> Result<(), ClientError> {
let event_id = EventId::parse(event_id)?;
self.inner.toggle_reaction(&Annotation::new(event_id, key)).await?;
pub async fn toggle_reaction(&self, unique_id: &str, key: &str) -> Result<(), ClientError> {
self.inner.toggle_reaction(unique_id, key).await?;
Ok(())
}

Expand Down
6 changes: 5 additions & 1 deletion crates/matrix-sdk-ui/src/timeline/event_item/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use as_variant::as_variant;
use indexmap::IndexMap;
use matrix_sdk::{
deserialized_responses::{EncryptionInfo, ShieldState},
send_queue::SendHandle,
send_queue::{SendHandle, SendReactionHandle},
Client, Error,
};
use matrix_sdk_base::{
Expand Down Expand Up @@ -625,6 +625,10 @@ pub enum EventItemOrigin {
/// What's the status of a reaction?
#[derive(Clone, Debug)]
pub enum ReactionStatus {
/// It's a local reaction to a local echo.
///
/// The handle is missing only in testing contexts.
LocalToLocal(Option<SendReactionHandle>),
/// It's a local reaction to a remote event.
///
/// The handle is missing only in testing contexts.
Expand Down
176 changes: 141 additions & 35 deletions crates/matrix-sdk-ui/src/timeline/inner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use matrix_sdk::crypto::OlmMachine;
use matrix_sdk::{
deserialized_responses::SyncTimelineEvent,
event_cache::{paginator::Paginator, RoomEventCache},
send_queue::{LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle},
send_queue::{
LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
},
Result, Room,
};
#[cfg(test)]
Expand All @@ -45,7 +47,8 @@ use ruma::{
AnySyncTimelineEvent, MessageLikeEventType,
},
serde::Raw,
EventId, OwnedEventId, OwnedTransactionId, RoomVersionId, TransactionId, UserId,
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, RoomVersionId,
TransactionId, UserId,
};
use tokio::sync::{RwLock, RwLockWriteGuard};
use tracing::{debug, error, field::debug, info, instrument, trace, warn};
Expand All @@ -64,14 +67,17 @@ use super::{
traits::RoomDataProvider,
util::{rfind_event_by_id, rfind_event_item, RelativePosition},
Error, EventSendState, EventTimelineItem, InReplyToDetails, Message, PaginationError, Profile,
RepliedToEvent, TimelineDetails, TimelineEventItemId, TimelineFocus, TimelineItem,
TimelineItemContent, TimelineItemKind,
ReactionInfo, RepliedToEvent, TimelineDetails, TimelineEventItemId, TimelineFocus,
TimelineItem, TimelineItemContent, TimelineItemKind,
};
use crate::{
timeline::{
day_dividers::DayDividerAdjuster,
event_handler::LiveTimelineUpdatesAllowed,
event_item::EventTimelineItemKind,
pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
reactions::FullReactionKey,
util::rfind_event_by_uid,
TimelineEventFilterFn,
},
unable_to_decrypt_hook::UtdHookManager,
Expand Down Expand Up @@ -416,7 +422,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {

pub(super) async fn subscribe(
&self,
) -> (Vector<Arc<TimelineItem>>, impl Stream<Item = VectorDiff<Arc<TimelineItem>>>) {
) -> (Vector<Arc<TimelineItem>>, impl Stream<Item = VectorDiff<Arc<TimelineItem>>> + Send) {
trace!("Creating timeline items signal");
let state = self.state.read().await;
(state.items.clone(), state.items.subscribe().into_stream())
Expand Down Expand Up @@ -448,49 +454,79 @@ impl<P: RoomDataProvider> TimelineInner<P> {
#[instrument(skip_all)]
pub(super) async fn toggle_reaction_local(
&self,
annotation: &Annotation,
unique_id: &str,
key: &str,
) -> Result<bool, Error> {
let mut state = self.state.write().await;

let user_id = self.room_data_provider.own_user_id();

let Some((item_pos, item)) = rfind_event_by_id(&state.items, &annotation.event_id) else {
let Some((item_pos, item)) = rfind_event_by_uid(&state.items, unique_id) else {
warn!("Timeline item not found, can't add reaction");
return Err(Error::FailedToToggleReaction);
};

let user_id = self.room_data_provider.own_user_id();
let prev_status = item
.reactions()
.get(&annotation.key)
.get(key)
.and_then(|group| group.get(user_id))
.map(|reaction_info| reaction_info.status.clone());

let Some(prev_status) = prev_status else {
// Add a reaction through the room data provider.
// No need to reflect the effect locally, since the local echo handling will
// take care of it.
trace!("adding a new reaction");
self.room_data_provider
.send(ReactionEventContent::from(annotation.clone()).into())
.await?;
return Ok(true);
match &item.inner.kind {
EventTimelineItemKind::Local(local) => {
if let Some(send_handle) = local.send_handle.clone() {
if send_handle
.react(key.to_owned())
.await
.map_err(|err| Error::SendQueueError(err.into()))?
.is_some()
{
trace!("adding a reaction to a local echo");
return Ok(true);
}

warn!("couldn't toggle reaction for local echo");
return Ok(false);
}

warn!("missing send handle for local echo; is this a test?");
return Ok(false);
}

EventTimelineItemKind::Remote(remote) => {
// Add a reaction through the room data provider.
// No need to reflect the effect locally, since the local echo handling will
// take care of it.
trace!("adding a reaction to a remote echo");
let annotation = Annotation::new(remote.event_id.to_owned(), key.to_owned());
self.room_data_provider
.send(ReactionEventContent::from(annotation).into())
.await?;
return Ok(true);
}
}
};

trace!("removing a previous reaction");
match prev_status {
ReactionStatus::LocalToRemote(send_handle) => {
// No need to keep the lock.
drop(state);
ReactionStatus::LocalToLocal(send_reaction_handle) => {
if let Some(handle) = send_reaction_handle {
if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
// Impossible state: the reaction has moved from local to echo under our
// feet, but the timeline was supposed to be locked!
warn!("unexpectedly unable to abort sending of local reaction");
}
} else {
warn!("no send reaction handle (this should only happen in testing contexts)");
}
}

ReactionStatus::LocalToRemote(send_handle) => {
// No need to reflect the change ourselves, since handling the discard of the
// local echo will take care of it.
trace!("aborting send of the previous reaction that was a local echo");
if let Some(send_handle) = send_handle {
if !send_handle
.abort()
.await
.map_err(|err| Error::SendQueueError(err.into()))?
{
if let Some(handle) = send_handle {
if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
// Impossible state: the reaction has moved from local to echo under our
// feet, but the timeline was supposed to be locked!
warn!("unexpectedly unable to abort sending of local reaction");
Expand All @@ -502,8 +538,15 @@ impl<P: RoomDataProvider> TimelineInner<P> {

ReactionStatus::RemoteToRemote(event_id) => {
// Assume the redaction will work; we'll re-add the reaction if it didn't.
let Some(annotated_event_id) =
item.as_remote().map(|event_item| event_item.event_id.clone())
else {
warn!("remote reaction to remote event, but the associated item isn't remote");
return Ok(false);
};

let mut reactions = item.reactions().clone();
let reaction_info = reactions.remove_reaction(user_id, &annotation.key);
let reaction_info = reactions.remove_reaction(user_id, key);

if reaction_info.is_some() {
let new_item = item.with_reactions(reactions);
Expand All @@ -522,12 +565,12 @@ impl<P: RoomDataProvider> TimelineInner<P> {

let mut state = self.state.write().await;
if let Some((item_pos, item)) =
rfind_event_by_id(&state.items, &annotation.event_id)
rfind_event_by_id(&state.items, &annotated_event_id)
{
// Re-add the reaction to the mapping.
let mut reactions = item.reactions().clone();
reactions
.entry(annotation.key.to_owned())
.entry(key.to_owned())
.or_default()
.insert(user_id.to_owned(), reaction_info);
let new_item = item.with_reactions(reactions);
Expand Down Expand Up @@ -756,6 +799,27 @@ impl<P: RoomDataProvider> TimelineInner<P> {
error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
}

// If the event had local reactions, upgrade the mapping from reaction to
// events, to indicate that the event is now remote.
if let Some(new_event_id) = new_event_id {
let reactions = item.reactions();
for (_key, by_user) in reactions.iter() {
for (_user_id, info) in by_user.iter() {
if let ReactionStatus::LocalToLocal(Some(reaction_handle)) = &info.status {
let reaction_txn_id = reaction_handle.transaction_id().to_owned();
if let Some(found) = txn
.meta
.reactions
.map
.get_mut(&TimelineEventItemId::TransactionId(reaction_txn_id))
{
found.item = TimelineEventItemId::EventId(new_event_id.to_owned());
}
}
}
}
}

let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
txn.items.set(idx, new_item);

Expand Down Expand Up @@ -790,10 +854,8 @@ impl<P: RoomDataProvider> TimelineInner<P> {
state.meta.reactions.map.remove(&TimelineEventItemId::TransactionId(txn_id.to_owned()))
{
let item = match &full_key.item {
TimelineEventItemId::TransactionId(_) => {
// TODO(bnjbvr): reactions on local echoes
warn!("reactions on local echoes are NYI");
return false;
TimelineEventItemId::TransactionId(txn_id) => {
rfind_event_item(&state.items, |item| item.transaction_id() == Some(txn_id))
}
TimelineEventItemId::EventId(event_id) => rfind_event_by_id(&state.items, event_id),
};
Expand Down Expand Up @@ -1181,11 +1243,55 @@ impl<P: RoomDataProvider> TimelineInner<P> {
}

LocalEchoContent::React { key, send_handle, applies_to } => {
todo!();
self.handle_local_reaction(key, send_handle, applies_to).await;
}
}
}

/// Adds a reaction (local echo) to a local echo.
#[instrument(skip(self, send_handle))]
async fn handle_local_reaction(
&self,
reaction_key: String,
send_handle: SendReactionHandle,
applies_to: OwnedTransactionId,
) {
let mut state = self.state.write().await;

let Some((item_pos, item)) =
rfind_event_item(&state.items, |item| item.transaction_id() == Some(&applies_to))
else {
warn!("Local item not found anymore.");
return;
};

let user_id = self.room_data_provider.own_user_id();

let reaction_txn_id = send_handle.transaction_id().to_owned();
let reaction_info = ReactionInfo {
timestamp: MilliSecondsSinceUnixEpoch::now(),
status: ReactionStatus::LocalToLocal(Some(send_handle)),
};

let mut reactions = item.reactions().clone();
let by_user = reactions.entry(reaction_key.clone()).or_default();
by_user.insert(user_id.to_owned(), reaction_info);

trace!("Adding local reaction to local echo");
let new_item = item.with_reactions(reactions);
state.items.set(item_pos, new_item);

// Add it to the reaction map, so we can discard it later if needs be.
state.meta.reactions.map.insert(
TimelineEventItemId::TransactionId(reaction_txn_id),
FullReactionKey {
item: TimelineEventItemId::TransactionId(applies_to),
key: reaction_key,
sender: user_id.to_owned(),
},
);
}

/// Handle a single room send queue update.
pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
match update {
Expand Down
9 changes: 4 additions & 5 deletions crates/matrix-sdk-ui/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use ruma::{
UnstablePollStartEventContent,
},
receipt::{Receipt, ReceiptThread},
relation::Annotation,
room::{
message::{
AddMentions, ForwardThread, OriginalRoomMessageEvent, RoomMessageEventContent,
Expand Down Expand Up @@ -548,17 +547,17 @@ impl Timeline {
Ok(())
}

/// Toggle a reaction on an event
/// Toggle a reaction on an event.
///
/// Adds or redacts a reaction based on the state of the reaction at the
/// time it is called.
///
/// When redacting an event, the redaction reason is not sent.
/// When redacting a previous reactino, the redaction reason is not set.
///
/// Ensures that only one reaction is sent at a time to avoid race
/// conditions and spamming the homeserver with requests.
pub async fn toggle_reaction(&self, annotation: &Annotation) -> Result<(), Error> {
self.inner.toggle_reaction_local(annotation).await?;
pub async fn toggle_reaction(&self, unique_id: &str, reaction_key: &str) -> Result<(), Error> {
self.inner.toggle_reaction_local(unique_id, reaction_key).await?;
Ok(())
}

Expand Down
22 changes: 18 additions & 4 deletions crates/matrix-sdk-ui/src/timeline/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,24 @@ impl TestTimeline {
self.inner.handle_read_receipts(ev_content).await;
}

async fn toggle_reaction_local(&self, annotation: &Annotation) -> Result<(), super::Error> {
if self.inner.toggle_reaction_local(annotation).await? {
// Fake a local echo, for new reactions.
self.handle_local_event(ReactionEventContent::new(annotation.clone()).into()).await;
async fn toggle_reaction_local(&self, unique_id: &str, key: &str) -> Result<(), super::Error> {
if self.inner.toggle_reaction_local(unique_id, key).await? {
// TODO(bnjbvr): hacky?
if let Some(event_id) = self
.inner
.items()
.await
.iter()
.rfind(|item| item.unique_id() == unique_id)
.and_then(|item| item.as_event()?.as_remote())
.map(|event_item| event_item.event_id.clone())
{
// Fake a local echo, for new reactions.
self.handle_local_event(
ReactionEventContent::new(Annotation::new(event_id, key.to_owned())).into(),
)
.await;
}
}
Ok(())
}
Expand Down
Loading

0 comments on commit 4835265

Please sign in to comment.