From 66d6f494ea343e4fbe59c038cb0e414d410ec4b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 28 Nov 2024 13:03:22 +0100 Subject: [PATCH] feat(timeline): Listen to the room keys stream to retry decryptions --- crates/matrix-sdk-ui/src/timeline/builder.rs | 40 ++++++++++++++++++++ crates/matrix-sdk-ui/src/timeline/mod.rs | 2 + 2 files changed, 42 insertions(+) diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index 4de77f57862..6f794d5b433 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -23,6 +23,7 @@ use matrix_sdk::{ }; use ruma::{events::AnySyncTimelineEvent, RoomVersionId}; use tokio::sync::broadcast::error::RecvError; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tracing::{info, info_span, trace, warn, Instrument, Span}; use super::{ @@ -426,6 +427,44 @@ impl TimelineBuilder { }) }; + // TODO: Technically this should™ be the only stream we need to listen to get + // notified when we should retry to decrypt an event. We sadly can't do that, + // since the cross-process support kills the `OlmMachine` which then in + // turn kills this stream. Once this is solved remove all the other ways we + // listen for room keys. + let room_keys_received_join_handle = { + let inner = controller.clone(); + let stream = client.encryption().room_keys_received_stream().await.expect(""); + + spawn(async move { + pin_mut!(stream); + + while let Some(room_keys) = stream.next().await { + let session_ids = match room_keys { + Ok(room_keys) => { + let session_ids: BTreeSet = room_keys + .into_iter() + .filter(|info| info.room_id == inner.room().room_id()) + .map(|info| info.session_id) + .collect(); + + Some(session_ids) + } + Err(BroadcastStreamRecvError::Lagged(missed_updates)) => { + // We lagged, let's retry to decrypt anything we have, maybe something + // was received. + warn!(missed_updates, "The room keys stream has lagged, retrying to decrypt the whole timeline"); + + None + } + }; + + let room = inner.room(); + inner.retry_event_decryption(room, session_ids).await; + } + }) + }; + let timeline = Timeline { controller, event_cache: room_event_cache, @@ -436,6 +475,7 @@ impl TimelineBuilder { pinned_events_join_handle, room_key_from_backups_join_handle, room_key_backup_enabled_join_handle, + room_keys_received_join_handle, local_echo_listener_handle, _event_cache_drop_handle: event_cache_drop, encryption_changes_handle, diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index da5dcbfef78..57e0a8c103a 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -816,6 +816,7 @@ struct TimelineDropHandle { room_update_join_handle: JoinHandle<()>, pinned_events_join_handle: Option>, room_key_from_backups_join_handle: JoinHandle<()>, + room_keys_received_join_handle: JoinHandle<()>, room_key_backup_enabled_join_handle: JoinHandle<()>, local_echo_listener_handle: JoinHandle<()>, _event_cache_drop_handle: Arc, @@ -836,6 +837,7 @@ impl Drop for TimelineDropHandle { self.room_update_join_handle.abort(); self.room_key_from_backups_join_handle.abort(); self.room_key_backup_enabled_join_handle.abort(); + self.room_keys_received_join_handle.abort(); self.encryption_changes_handle.abort(); } }