Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry decryptions based on the room key stream of the OlmMachine #4348

Merged
merged 5 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions crates/matrix-sdk-crypto/src/machine/tests/megolm_sender_data.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
/*
Copyright 2024 The Matrix.org Foundation C.I.C.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Copyright 2024 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{fmt::Debug, iter, pin::Pin};

Expand All @@ -23,6 +21,7 @@ use matrix_sdk_test::async_test;
use ruma::{room_id, user_id, RoomId, TransactionId, UserId};
use serde::Serialize;
use serde_json::json;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;

use crate::{
machine::{
Expand Down Expand Up @@ -305,13 +304,16 @@ where
/// Given the `room_keys_received_stream`, check that there is a pending update,
/// and pop it.
fn get_room_key_received_update(
room_keys_received_stream: &mut Pin<Box<impl Stream<Item = Vec<RoomKeyInfo>>>>,
room_keys_received_stream: &mut Pin<
Box<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>>,
>,
) -> RoomKeyInfo {
room_keys_received_stream
.next()
.now_or_never()
.flatten()
.expect("We should have received an update of room key infos")
.unwrap()
.pop()
.expect("Received an empty room key info update")
}
Expand Down
3 changes: 2 additions & 1 deletion crates/matrix-sdk-crypto/src/machine/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,8 @@ async fn test_megolm_encryption() {
.next()
.now_or_never()
.flatten()
.expect("We should have received an update of room key infos");
.expect("We should have received an update of room key infos")
.unwrap();
assert_eq!(room_keys.len(), 1);
assert_eq!(room_keys[0].session_id, group_session.session_id());

Expand Down
16 changes: 10 additions & 6 deletions crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use futures_core::Stream;
use futures_util::StreamExt;
use matrix_sdk_common::store_locks::CrossProcessStoreLock;
use ruma::{DeviceId, OwnedDeviceId, OwnedUserId, UserId};
use tokio::sync::{broadcast, Mutex};
use tokio::sync::{
broadcast::{self},
Mutex,
};
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
use tracing::{debug, trace, warn};

Expand Down Expand Up @@ -292,11 +295,12 @@ impl CryptoStoreWrapper {
/// the stream. Updates that happen at the same time are batched into a
/// [`Vec`].
///
/// If the reader of the stream lags too far behind, a warning will be
/// logged and items will be dropped.
pub fn room_keys_received_stream(&self) -> impl Stream<Item = Vec<RoomKeyInfo>> {
let stream = BroadcastStream::new(self.room_keys_received_sender.subscribe());
Self::filter_errors_out_of_stream(stream, "room_keys_received_stream")
/// If the reader of the stream lags too far behind an error will be sent to
/// the reader.
pub fn room_keys_received_stream(
&self,
) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> {
BroadcastStream::new(self.room_keys_received_sender.subscribe())
}

/// Receive notifications of received `m.room_key.withheld` messages.
Expand Down
12 changes: 8 additions & 4 deletions crates/matrix-sdk-crypto/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use ruma::{
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::{Mutex, MutexGuard, Notify, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tracing::{info, warn};
use vodozemac::{base64_encode, megolm::SessionOrdering, Curve25519PublicKey};
use zeroize::{Zeroize, ZeroizeOnDrop};
Expand Down Expand Up @@ -1593,12 +1594,14 @@ impl Store {
/// the stream. Updates that happen at the same time are batched into a
/// [`Vec`].
///
/// If the reader of the stream lags too far behind, a warning will be
/// logged and items will be dropped.
/// If the reader of the stream lags too far behind an error will be sent to
/// the reader.
///
/// The stream will terminate once all references to the underlying
/// `CryptoStoreWrapper` are dropped.
pub fn room_keys_received_stream(&self) -> impl Stream<Item = Vec<RoomKeyInfo>> {
pub fn room_keys_received_stream(
&self,
) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> {
self.inner.store.room_keys_received_stream()
}

Expand Down Expand Up @@ -2043,7 +2046,8 @@ mod tests {
.next()
.now_or_never()
.flatten()
.expect("We should have received an update of room key infos");
.expect("We should have received an update of room key infos")
.unwrap();
assert_eq!(room_keys.len(), 1);
assert_eq!(room_keys[0].room_id, "!room1:localhost");
}
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-ui/src/room_list_service/room_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ fn merge_stream_and_receiver(
/// When a [`RoomList`] is displayed to the user, it can be in various states.
/// This enum tries to represent those states with a correct level of
/// abstraction.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RoomListLoadingState {
/// The [`RoomList`] has not been loaded yet, i.e. a sync might run
/// or not run at all, there is nothing to show in this `RoomList` yet.
Expand Down
43 changes: 43 additions & 0 deletions crates/matrix-sdk-ui/src/timeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use matrix_sdk::{
};
use ruma::{events::AnySyncTimelineEvent, RoomVersionId};
use tokio::sync::broadcast::error::RecvError;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tracing::{info, info_span, trace, warn, Instrument, Span};

use super::{
Expand Down Expand Up @@ -433,6 +434,47 @@ impl TimelineBuilder {
})
};

// TODO: Technically, this should be the only stream we need to listen to get
// notified when we should retry to decrypt an event. We sadly can't do that,
// since the cross-process support kills the `OlmMachine` which then in
// turn kills this stream. Once this is solved remove all the other ways we
// listen for room keys.
let room_keys_received_join_handle = {
let inner = controller.clone();
let stream = client.encryption().room_keys_received_stream().await.expect(
"We should be logged in by now, so we should have access to an OlmMachine \
to be able to listen to this stream",
);

spawn(async move {
pin_mut!(stream);

while let Some(room_keys) = stream.next().await {
let session_ids = match room_keys {
Ok(room_keys) => {
let session_ids: BTreeSet<String> = room_keys
.into_iter()
.filter(|info| info.room_id == inner.room().room_id())
.map(|info| info.session_id)
.collect();

Some(session_ids)
}
Err(BroadcastStreamRecvError::Lagged(missed_updates)) => {
// We lagged, let's retry to decrypt anything we have, maybe something
// was received.
warn!(missed_updates, "The room keys stream has lagged, retrying to decrypt the whole timeline");

None
}
};

let room = inner.room();
inner.retry_event_decryption(room, session_ids).await;
}
})
};

let timeline = Timeline {
controller,
event_cache: room_event_cache,
Expand All @@ -443,6 +485,7 @@ impl TimelineBuilder {
pinned_events_join_handle,
room_key_from_backups_join_handle,
room_key_backup_enabled_join_handle,
room_keys_received_join_handle,
local_echo_listener_handle,
_event_cache_drop_handle: event_cache_drop,
encryption_changes_handle,
Expand Down
2 changes: 2 additions & 0 deletions crates/matrix-sdk-ui/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ struct TimelineDropHandle {
room_update_join_handle: JoinHandle<()>,
pinned_events_join_handle: Option<JoinHandle<()>>,
room_key_from_backups_join_handle: JoinHandle<()>,
room_keys_received_join_handle: JoinHandle<()>,
room_key_backup_enabled_join_handle: JoinHandle<()>,
local_echo_listener_handle: JoinHandle<()>,
_event_cache_drop_handle: Arc<EventCacheDropHandles>,
Expand All @@ -852,6 +853,7 @@ impl Drop for TimelineDropHandle {
self.room_update_join_handle.abort();
self.room_key_from_backups_join_handle.abort();
self.room_key_backup_enabled_join_handle.abort();
self.room_keys_received_join_handle.abort();
self.encryption_changes_handle.abort();
}
}
Expand Down
41 changes: 41 additions & 0 deletions crates/matrix-sdk/src/encryption/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use futures_util::{
stream::{self, StreamExt},
};
use matrix_sdk_base::crypto::{
store::RoomKeyInfo,
types::requests::{
OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
},
Expand Down Expand Up @@ -58,6 +59,7 @@ use ruma::{
};
use serde::Deserialize;
use tokio::sync::{Mutex, RwLockReadGuard};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tracing::{debug, error, instrument, trace, warn};
use url::Url;
use vodozemac::Curve25519PublicKey;
Expand Down Expand Up @@ -1444,6 +1446,45 @@ impl Encryption {
Ok(ret)
}

/// Receive notifications of room keys being received as a [`Stream`].
///
/// Each time a room key is updated in any way, an update will be sent to
/// the stream. Updates that happen at the same time are batched into a
/// [`Vec`].
///
/// If the reader of the stream lags too far behind, an error is broadcast
/// containing the number of skipped items.
///
/// # Examples
///
/// ```no_run
/// # use matrix_sdk::Client;
/// # use url::Url;
/// # async {
/// # let homeserver = Url::parse("http://example.com")?;
/// # let client = Client::new(homeserver).await?;
/// use futures_util::StreamExt;
///
/// let Some(mut room_keys_stream) =
/// client.encryption().room_keys_received_stream().await
/// else {
/// return Ok(());
/// };
///
/// while let Some(update) = room_keys_stream.next().await {
/// println!("Received room keys {update:?}");
/// }
/// # anyhow::Ok(()) };
/// ```
pub async fn room_keys_received_stream(
&self,
) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>> {
let olm = self.client.olm_machine().await;
let olm = olm.as_ref()?;

Some(olm.store().room_keys_received_stream())
}

/// Get the secret storage manager of the client.
pub fn secret_storage(&self) -> SecretStorage {
SecretStorage { client: self.client.to_owned() }
Expand Down
Loading
Loading