Skip to content

Commit

Permalink
crypto: Expose new stream about room_key withheld messages (#3660)
Browse files Browse the repository at this point in the history
Part of the fix to element-hq/element-web#27653.
  • Loading branch information
richvdh authored Jul 9, 2024
1 parent 1bc0443 commit 2d3e2da
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 34 deletions.
4 changes: 4 additions & 0 deletions crates/matrix-sdk-crypto/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ Deprecations:

Additions:

- Expose new method `OlmMachine::room_keys_withheld_received_stream`, to allow
applications to receive notifications about received `m.room_key.withheld`
events.
([#3660](https://github.com/matrix-org/matrix-rust-sdk/pull/3660))

- Expose new method `OlmMachine::clear_crypto_cache()`, with FFI bindings
([#3462](https://github.com/matrix-org/matrix-rust-sdk/pull/3462))
Expand Down
24 changes: 22 additions & 2 deletions crates/matrix-sdk-crypto/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2372,7 +2372,7 @@ pub(crate) mod tests {
};

use assert_matches2::{assert_let, assert_matches};
use futures_util::{FutureExt, StreamExt};
use futures_util::{pin_mut, FutureExt, StreamExt};
use itertools::Itertools;
use matrix_sdk_common::deserialized_responses::{
DeviceLinkProblem, ShieldState, UnableToDecryptInfo, UnsignedDecryptionResult,
Expand Down Expand Up @@ -2428,7 +2428,9 @@ pub(crate) mod tests {
types::{
events::{
room::encrypted::{EncryptedToDeviceEvent, ToDeviceEncryptedEventContent},
room_key_withheld::{RoomKeyWithheldContent, WithheldCode},
room_key_withheld::{
MegolmV1AesSha2WithheldContent, RoomKeyWithheldContent, WithheldCode,
},
ToDeviceEvent,
},
CrossSigningKey, DeviceKeys, EventEncryptionAlgorithm, SignedKey, SigningKeys,
Expand Down Expand Up @@ -3241,6 +3243,9 @@ pub(crate) mod tests {
get_machine_pair_with_setup_sessions_test_helper(alice_id(), user_id(), false).await;
let room_id = room_id!("!test:example.org");

let room_keys_withheld_received_stream = bob.store().room_keys_withheld_received_stream();
pin_mut!(room_keys_withheld_received_stream);

let encryption_settings = EncryptionSettings::default();
let encryption_settings = EncryptionSettings {
sharing_strategy: CollectStrategy::new_device_based(true),
Expand Down Expand Up @@ -3280,6 +3285,21 @@ pub(crate) mod tests {
.await
.unwrap();

// We should receive a notification on the room_keys_withheld_received_stream
let withheld_received = room_keys_withheld_received_stream
.next()
.now_or_never()
.flatten()
.expect("We should have received a notification of room key being withheld");
assert_eq!(withheld_received.len(), 1);
assert_matches!(
&withheld_received[0].content,
RoomKeyWithheldContent::MegolmV1AesSha2(MegolmV1AesSha2WithheldContent::Unverified(
unverified_withheld_content
))
);
assert_eq!(unverified_withheld_content.room_id, room_id);

let plaintext = "You shouldn't be able to decrypt that message";

let content = RoomMessageEventContent::text_plain(plaintext);
Expand Down
86 changes: 54 additions & 32 deletions crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{ops::Deref, sync::Arc};
use std::{future, ops::Deref, sync::Arc};

use futures_core::Stream;
use futures_util::StreamExt;
Expand All @@ -13,6 +13,7 @@ use crate::{
olm::InboundGroupSession,
store,
store::{Changes, DynCryptoStore, IntoCryptoStore, RoomKeyInfo},
types::events::room_key_withheld::RoomKeyWithheldEvent,
GossippedSecret, ReadOnlyOwnUserIdentity,
};

Expand All @@ -29,6 +30,10 @@ pub(crate) struct CryptoStoreWrapper {
/// an update to an inbound group session.
room_keys_received_sender: broadcast::Sender<Vec<RoomKeyInfo>>,

/// The sender side of a broadcast stream that is notified whenever we
/// receive an `m.room_key.withheld` message.
room_keys_withheld_received_sender: broadcast::Sender<Vec<RoomKeyWithheldEvent>>,

/// The sender side of a broadcast channel which sends out secrets we
/// received as a `m.secret.send` event.
secrets_broadcaster: broadcast::Sender<GossippedSecret>,
Expand All @@ -42,6 +47,7 @@ pub(crate) struct CryptoStoreWrapper {
impl CryptoStoreWrapper {
pub(crate) fn new(user_id: &UserId, store: impl IntoCryptoStore) -> Self {
let room_keys_received_sender = broadcast::Sender::new(10);
let room_keys_withheld_received_sender = broadcast::Sender::new(10);
let secrets_broadcaster = broadcast::Sender::new(10);
// The identities broadcaster is responsible for user identities as well as
// devices, that's why we increase the capacity here.
Expand All @@ -51,6 +57,7 @@ impl CryptoStoreWrapper {
user_id: user_id.to_owned(),
store: store.into_crypto_store(),
room_keys_received_sender,
room_keys_withheld_received_sender,
secrets_broadcaster,
identities_broadcaster,
}
Expand All @@ -68,6 +75,12 @@ impl CryptoStoreWrapper {
let room_key_updates: Vec<_> =
changes.inbound_group_sessions.iter().map(RoomKeyInfo::from).collect();

let withheld_session_updates: Vec<_> = changes
.withheld_session_info
.values()
.flat_map(|session_map| session_map.values().cloned())
.collect();

let secrets = changes.secrets.to_owned();
let devices = changes.devices.to_owned();
let identities = changes.identities.to_owned();
Expand All @@ -79,6 +92,10 @@ impl CryptoStoreWrapper {
let _ = self.room_keys_received_sender.send(room_key_updates);
}

if !withheld_session_updates.is_empty() {
let _ = self.room_keys_withheld_received_sender.send(withheld_session_updates);
}

for secret in secrets {
let _ = self.secrets_broadcaster.send(secret);
}
Expand Down Expand Up @@ -131,38 +148,29 @@ impl CryptoStoreWrapper {
/// logged and items will be dropped.
pub fn room_keys_received_stream(&self) -> impl Stream<Item = Vec<RoomKeyInfo>> {
let stream = BroadcastStream::new(self.room_keys_received_sender.subscribe());
Self::filter_errors_out_of_stream(stream, "room_keys_received_stream")
}

// the raw BroadcastStream gives us Results which can fail with
// BroadcastStreamRecvError if the reader falls behind. That's annoying to work
// with, so here we just drop the errors.
stream.filter_map(|result| async move {
match result {
Ok(r) => Some(r),
Err(BroadcastStreamRecvError::Lagged(lag)) => {
warn!("room_keys_received_stream missed {lag} updates");
None
}
}
})
/// Receive notifications of received `m.room_key.withheld` messages.
///
/// Each time an `m.room_key.withheld` is received and stored, an update
/// will be sent to the stream. Updates that happen at the same time are
/// batched into a [`Vec`].
///
/// If the reader of the stream lags too far behind, a warning will be
/// logged and items will be dropped.
pub fn room_keys_withheld_received_stream(
&self,
) -> impl Stream<Item = Vec<RoomKeyWithheldEvent>> {
let stream = BroadcastStream::new(self.room_keys_withheld_received_sender.subscribe());
Self::filter_errors_out_of_stream(stream, "room_keys_withheld_received_stream")
}

/// Receive notifications of gossipped secrets being received and stored in
/// the secret inbox as a [`Stream`].
pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> {
let stream = BroadcastStream::new(self.secrets_broadcaster.subscribe());

// the raw BroadcastStream gives us Results which can fail with
// BroadcastStreamRecvError if the reader falls behind. That's annoying to work
// with, so here we just drop the errors.
stream.filter_map(|result| async move {
match result {
Ok(r) => Some(r),
Err(BroadcastStreamRecvError::Lagged(lag)) => {
warn!("secrets_stream missed {lag} updates");
None
}
}
})
Self::filter_errors_out_of_stream(stream, "secrets_stream")
}

/// Returns a stream of newly created or updated cryptographic identities.
Expand All @@ -173,17 +181,31 @@ impl CryptoStoreWrapper {
&self,
) -> impl Stream<Item = (Option<ReadOnlyOwnUserIdentity>, IdentityChanges, DeviceChanges)> {
let stream = BroadcastStream::new(self.identities_broadcaster.subscribe());
Self::filter_errors_out_of_stream(stream, "identities_stream")
}

// See the comment in the [`Store::room_keys_received_stream()`] on why we're
// ignoring the lagged error.
stream.filter_map(|result| async move {
match result {
/// Helper for *_stream functions: filters errors out of the stream,
/// creating a new Stream.
///
/// `BroadcastStream`s gives us `Result`s which can fail with
/// `BroadcastStreamRecvError` if the reader falls behind. That's annoying
/// to work with, so here we just emit a warning and drop the errors.
fn filter_errors_out_of_stream<ItemType>(
stream: BroadcastStream<ItemType>,
stream_name: &str,
) -> impl Stream<Item = ItemType>
where
ItemType: 'static + Clone + Send,
{
let stream_name = stream_name.to_owned();
stream.filter_map(move |result| {
future::ready(match result {
Ok(r) => Some(r),
Err(BroadcastStreamRecvError::Lagged(lag)) => {
warn!("devices_stream missed {lag} updates");
warn!("{stream_name} missed {lag} updates");
None
}
}
})
})
}

Expand Down
14 changes: 14 additions & 0 deletions crates/matrix-sdk-crypto/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,20 @@ impl Store {
self.inner.store.room_keys_received_stream()
}

/// Receive notifications of received `m.room_key.withheld` messages.
///
/// Each time an `m.room_key.withheld` is received and stored, an update
/// will be sent to the stream. Updates that happen at the same time are
/// batched into a [`Vec`].
///
/// If the reader of the stream lags too far behind, a warning will be
/// logged and items will be dropped.
pub fn room_keys_withheld_received_stream(
&self,
) -> impl Stream<Item = Vec<RoomKeyWithheldEvent>> {
self.inner.store.room_keys_withheld_received_stream()
}

/// Returns a stream of user identity updates, allowing users to listen for
/// notifications about new or changed user identities.
///
Expand Down

0 comments on commit 2d3e2da

Please sign in to comment.