From c0a9cd533ff55a008472f103064969d3bf932b57 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 10 Jun 2024 14:32:10 +0200 Subject: [PATCH] chore(ui): Move the `RoomEventCacheUpdate` code block into its own function. This patch is a refactoring. It moves the code block responsible to handle the `RoomEventCacheUpdate`s into its own function for the sake of clarity. --- crates/matrix-sdk-ui/src/timeline/builder.rs | 186 ++++++++++--------- 1 file changed, 96 insertions(+), 90 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index ac7849995e0..a5b710d187e 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -16,13 +16,13 @@ use std::{collections::BTreeSet, sync::Arc}; use futures_util::{pin_mut, StreamExt}; use matrix_sdk::{ - event_cache::{EventsOrigin, RoomEventCacheUpdate}, + event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheUpdate}, executor::spawn, send_queue::{LocalEcho, RoomSendQueueUpdate}, Room, }; use ruma::{events::AnySyncTimelineEvent, RoomVersionId}; -use tokio::sync::broadcast::error::RecvError; +use tokio::sync::broadcast::{error::RecvError, Receiver}; use tracing::{info, info_span, trace, warn, Instrument, Span}; #[cfg(feature = "e2e-encryption")] @@ -156,7 +156,7 @@ impl TimelineBuilder { event_cache.subscribe()?; let (room_event_cache, event_cache_drop) = room.event_cache().await?; - let (_, mut event_subscriber) = room_event_cache.subscribe().await?; + let (_, event_subscriber) = room_event_cache.subscribe().await?; let is_live = matches!(focus, TimelineFocus::Live); @@ -169,98 +169,14 @@ impl TimelineBuilder { let client = room.client(); let room_update_join_handle = spawn({ - let room_event_cache = room_event_cache.clone(); - let inner = inner.clone(); - let span = info_span!(parent: Span::none(), "room_update_handler", room_id = ?room.room_id()); span.follows_from(Span::current()); - async move { - trace!("Spawned the event subscriber task."); - - loop { - trace!("Waiting for an event."); - - let update = match event_subscriber.recv().await { - Ok(up) => up, - Err(RecvError::Closed) => break, - Err(RecvError::Lagged(num_skipped)) => { - warn!( - num_skipped, - "Lagged behind event cache updates, resetting timeline" - ); - - // The updates might have lagged, but the room event cache might have - // events, so retrieve them and add them back again to the timeline, - // after clearing it. - // - // If we can't get a handle on the room cache's events, just clear the - // current timeline. - match room_event_cache.subscribe().await { - Ok((events, _)) => { - inner.replace_with_initial_remote_events(events, RemoteEventOrigin::Sync).await; - } - Err(err) => { - warn!("Error when re-inserting initial events into the timeline: {err}"); - inner.clear().await; - } - } - - continue; - } - }; - - match update { - RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => { - trace!(target = %event_id, "Handling fully read marker."); - inner.handle_fully_read_marker(event_id).await; - } - - RoomEventCacheUpdate::Clear => { - if !inner.is_live().await { - // Ignore a clear for a timeline not in the live mode; the - // focused-on-event mode doesn't add any new items to the timeline - // anyways. - continue; - } - - trace!("Clearing the timeline."); - inner.clear().await; - } - - RoomEventCacheUpdate::AddTimelineEvents { events: diffs, origin } => { - trace!("Received new timeline events."); + trace!("Spawned the event subscriber task."); - inner.add_events_with_diffs( - diffs, - match origin { - EventsOrigin::Sync => RemoteEventOrigin::Sync, - EventsOrigin::Pagination => RemoteEventOrigin::Pagination, - } - ).await; - } - - RoomEventCacheUpdate::AddEphemeralEvents { events } => { - trace!("Received new ephemeral events from sync."); - - // TODO: (bnjbvr) ephemeral should be handled by the event cache. - inner.handle_ephemeral_events(events).await; - } - - RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => { - if !ambiguity_changes.is_empty() { - let member_ambiguity_changes = ambiguity_changes - .values() - .flat_map(|change| change.user_ids()) - .collect::>(); - inner.force_update_sender_profiles(&member_ambiguity_changes).await; - } - } - } - } - } - .instrument(span) + handle_room_update(inner.clone(), room_event_cache.clone(), event_subscriber) + .instrument(span) }); let local_echo_listener_handle = if is_live { @@ -423,3 +339,93 @@ impl TimelineBuilder { Ok(timeline) } } + +/// This function is responsible to handle all `RoomEventCacheUpdate` and to +/// dispatch them. +async fn handle_room_update( + timeline: TimelineInner, + room_event_cache: RoomEventCache, + mut event_subscriber: Receiver, +) { + loop { + trace!("Waiting for an event."); + + let update = match event_subscriber.recv().await { + Ok(up) => up, + Err(RecvError::Closed) => break, + Err(RecvError::Lagged(num_skipped)) => { + warn!(num_skipped, "Lagged behind event cache updates, resetting timeline"); + + // The updates might have lagged, but the room event cache might have + // events, so retrieve them and add them back again to the timeline, + // after clearing it. + // + // If we can't get a handle on the room cache's events, just clear the + // current timeline. + match room_event_cache.subscribe().await { + Ok((events, _)) => { + timeline + .replace_with_initial_remote_events(events, RemoteEventOrigin::Sync) + .await; + } + Err(err) => { + warn!("Error when re-inserting initial events into the timeline: {err}"); + timeline.clear().await; + } + } + + continue; + } + }; + + match update { + RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => { + trace!(target = %event_id, "Handling fully read marker."); + timeline.handle_fully_read_marker(event_id).await; + } + + RoomEventCacheUpdate::Clear => { + if !timeline.is_live().await { + // Ignore a clear for a timeline not in the live mode; the + // focused-on-event mode doesn't add any new items to the timeline + // anyways. + continue; + } + + trace!("Clearing the timeline."); + timeline.clear().await; + } + + RoomEventCacheUpdate::AddTimelineEvents { events: diffs, origin } => { + trace!("Received new timeline events."); + + timeline + .add_events_with_diffs( + diffs, + match origin { + EventsOrigin::Sync => RemoteEventOrigin::Sync, + EventsOrigin::Pagination => RemoteEventOrigin::Pagination, + }, + ) + .await; + } + + RoomEventCacheUpdate::AddEphemeralEvents { events } => { + trace!("Received new ephemeral events from sync."); + + // TODO: (bnjbvr) ephemeral should be handled by the event cache. + timeline.handle_ephemeral_events(events).await; + } + + RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => { + if !ambiguity_changes.is_empty() { + let member_ambiguity_changes = ambiguity_changes + .values() + .flat_map(|change| change.user_ids()) + .collect::>(); + timeline.force_update_sender_profiles(&member_ambiguity_changes).await; + } + } + } + } +}