Skip to content

Commit

Permalink
feat: Add a stream to listen to room keys being inserted to the store
Browse files Browse the repository at this point in the history
  • Loading branch information
poljar committed Nov 28, 2024
1 parent f944396 commit ad77d20
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions crates/matrix-sdk/src/encryption/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use futures_util::{
stream::{self, StreamExt},
};
use matrix_sdk_base::crypto::{
store::RoomKeyInfo,
types::requests::{
OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
},
Expand All @@ -55,6 +56,7 @@ use ruma::{
};
use serde::Deserialize;
use tokio::sync::{Mutex, RwLockReadGuard};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tracing::{debug, error, instrument, trace, warn};
use url::Url;
use vodozemac::Curve25519PublicKey;
Expand Down Expand Up @@ -1432,6 +1434,45 @@ impl Encryption {
Ok(ret)
}

/// Receive notifications of room keys being received as a [`Stream`].
///
/// Each time a room key is updated in any way, an update will be sent to
/// the stream. Updates that happen at the same time are batched into a
/// [`Vec`].
///
/// If the reader of the stream lags too far behind, a warning will be
/// logged and items will be dropped.
///
/// # Examples
///
/// ```no_run
/// # use matrix_sdk::Client;
/// # use url::Url;
/// # async {
/// # let homeserver = Url::parse("http://example.com")?;
/// # let client = Client::new(homeserver).await?;
/// use futures_util::StreamExt;
///
/// let Some(mut room_keys_stream) =
/// client.encryption().room_keys_received_stream().await
/// else {
/// return Ok(());
/// };
///
/// while let Some(update) = room_keys_stream.next().await {
/// println!("Received room keys {update:?}");
/// }
/// # anyhow::Ok(()) };
/// ```
pub async fn room_keys_received_stream(
&self,
) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>> {
let olm = self.client.olm_machine().await;
let olm = olm.as_ref()?;

Some(olm.store().room_keys_received_stream())
}

/// Get the secret storage manager of the client.
pub fn secret_storage(&self) -> SecretStorage {
SecretStorage { client: self.client.to_owned() }
Expand Down

0 comments on commit ad77d20

Please sign in to comment.