diff --git a/Cargo.lock b/Cargo.lock index be3ee36dc0b..3eb9db568af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2915,6 +2915,7 @@ dependencies = [ "mime2ext", "once_cell", "openidconnect", + "pin-project-lite", "proptest", "rand", "reqwest", diff --git a/crates/matrix-sdk/Cargo.toml b/crates/matrix-sdk/Cargo.toml index 27f91ed7ef4..d761ca73949 100644 --- a/crates/matrix-sdk/Cargo.toml +++ b/crates/matrix-sdk/Cargo.toml @@ -97,6 +97,7 @@ matrix-sdk-sqlite = { workspace = true, optional = true } matrix-sdk-test = { workspace = true, optional = true } mime = "0.3.16" mime2ext = "0.1.52" +pin-project-lite = { workspace = true } rand = { workspace = true , optional = true } ruma = { workspace = true, features = ["rand", "unstable-msc2448", "unstable-msc2965", "unstable-msc3930", "unstable-msc3245-v1-compat", "unstable-msc2867"] } serde = { workspace = true } diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index ca0e34c3b42..b707691fb85 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -17,7 +17,7 @@ use std::{ collections::{btree_map, BTreeMap}, fmt::{self, Debug}, - future::Future, + future::{ready, Future}, pin::Pin, sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak}, }; @@ -88,7 +88,8 @@ use crate::{ error::{HttpError, HttpResult}, event_cache::EventCache, event_handler::{ - EventHandler, EventHandlerDropGuard, EventHandlerHandle, EventHandlerStore, SyncEvent, + EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle, + EventHandlerStore, ObservableEventHandler, SyncEvent, }, http_client::HttpClient, matrix_auth::MatrixAuth, @@ -681,8 +682,6 @@ impl Client { /// # Examples /// /// ```no_run - /// # use url::Url; - /// # let homeserver = Url::parse("http://localhost:8080").unwrap(); /// use matrix_sdk::{ /// deserialized_responses::EncryptionInfo, /// event_handler::Ctx, @@ -699,14 +698,7 @@ impl Client { /// }; /// use serde::{Deserialize, Serialize}; /// - /// # futures_executor::block_on(async { - /// # let client = matrix_sdk::Client::builder() - /// # .homeserver_url(homeserver) - /// # .server_versions([ruma::api::MatrixVersion::V1_0]) - /// # .build() - /// # .await - /// # .unwrap(); - /// # + /// # async fn example(client: Client) { /// client.add_event_handler( /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move { /// // Common usage: Room event plus room and client. @@ -772,11 +764,11 @@ impl Client { /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move { /// println!("Calling the handler with identifier {data}"); /// }); - /// # }); + /// # } /// ``` pub fn add_event_handler(&self, handler: H) -> EventHandlerHandle where - Ev: SyncEvent + DeserializeOwned + Send + 'static, + Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static, H: EventHandler, { self.add_event_handler_impl(handler, None) @@ -798,12 +790,133 @@ impl Client { handler: H, ) -> EventHandlerHandle where - Ev: SyncEvent + DeserializeOwned + Send + 'static, + Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static, H: EventHandler, { self.add_event_handler_impl(handler, Some(room_id.to_owned())) } + /// Observe a specific event type. + /// + /// `Ev` represents the kind of event that will be observed. `Ctx` + /// represents the context that will come with the event. It relies on the + /// same mechanism as [`Client::add_event_handler`]. The main difference is + /// that it returns an [`ObservableEventHandler`] and doesn't require a + /// user-defined closure. It is possible to subscribe to the + /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which + /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev, + /// Ctx)`. + /// + /// # Example + /// + /// Let's see a classical usage: + /// + /// ``` + /// use futures_util::StreamExt as _; + /// use matrix_sdk::{ + /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action}, + /// Client, Room, + /// }; + /// + /// # async fn example(client: Client) -> Option<()> { + /// let observer = + /// client.observe_events::)>(); + /// + /// let mut subscriber = observer.subscribe(); + /// + /// let (event, (room, push_actions)) = subscriber.next().await?; + /// # Some(()) + /// # } + /// ``` + /// + /// Now let's see how to get several contexts that can be useful for you: + /// + /// ``` + /// use matrix_sdk::{ + /// deserialized_responses::EncryptionInfo, + /// ruma::{ + /// events::room::{ + /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent, + /// }, + /// push::Action, + /// }, + /// Client, Room, + /// }; + /// + /// # async fn example(client: Client) { + /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`. + /// let _ = client.observe_events::(); + /// + /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo` + /// // to distinguish between unencrypted events and events that were decrypted + /// // by the SDK. + /// let _ = client + /// .observe_events::)>( + /// ); + /// + /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions. + /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))` + /// // should be highlighted in the timeline. + /// let _ = + /// client.observe_events::)>(); + /// + /// // Observe `SyncRoomTopicEvent` and fetch nothing else. + /// let _ = client.observe_events::(); + /// # } + /// ``` + /// + /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber + pub fn observe_events(&self) -> ObservableEventHandler<(Ev, Ctx)> + where + Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static, + Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static, + { + self.observe_room_events_impl(None) + } + + /// Observe a specific room, and event type. + /// + /// This method works the same way as [`Client::observe_events`], except + /// that the observability will only be applied for events in the room with + /// the specified ID. See that method for more details. + pub fn observe_room_events( + &self, + room_id: &RoomId, + ) -> ObservableEventHandler<(Ev, Ctx)> + where + Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static, + Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static, + { + self.observe_room_events_impl(Some(room_id.to_owned())) + } + + /// Shared implementation for `Client::observe_events` and + /// `Client::observe_room_events`. + fn observe_room_events_impl( + &self, + room_id: Option, + ) -> ObservableEventHandler<(Ev, Ctx)> + where + Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static, + Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static, + { + // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a + // new value. + let shared_observable = SharedObservable::new(None); + + ObservableEventHandler::new( + shared_observable.clone(), + self.event_handler_drop_guard(self.add_event_handler_impl( + move |event: Ev, context: Ctx| { + shared_observable.set(Some((event, context))); + + ready(()) + }, + room_id, + )), + ) + } + /// Remove the event handler associated with the handle. /// /// Note that you **must not** call `remove_event_handler` from the diff --git a/crates/matrix-sdk/src/event_handler/context.rs b/crates/matrix-sdk/src/event_handler/context.rs index 46213a0a96b..2b921f61fb1 100644 --- a/crates/matrix-sdk/src/event_handler/context.rs +++ b/crates/matrix-sdk/src/event_handler/context.rs @@ -107,3 +107,38 @@ impl Deref for Ctx { &self.0 } } + +// `EventHandlerContext` for tuples. + +impl EventHandlerContext for () { + fn from_data(_data: &EventHandlerData<'_>) -> Option { + Some(()) + } +} + +macro_rules! impl_context_for_tuple { + ( $( $ty:ident ),* $(,)? ) => { + #[allow(non_snake_case)] + impl< $( $ty ),* > EventHandlerContext for ( $( $ty ),* , ) + where + $( $ty : EventHandlerContext, )* + { + fn from_data(data: &EventHandlerData<'_>) -> Option { + $( + let $ty = $ty ::from_data(data)?; + )* + + Some(( $( $ty ),* , )) + } + } + }; +} + +impl_context_for_tuple!(A); +impl_context_for_tuple!(A, B); +impl_context_for_tuple!(A, B, C); +impl_context_for_tuple!(A, B, C, D); +impl_context_for_tuple!(A, B, C, D, E); +impl_context_for_tuple!(A, B, C, D, E, F); +impl_context_for_tuple!(A, B, C, D, E, F, G); +impl_context_for_tuple!(A, B, C, D, E, F, G, H); diff --git a/crates/matrix-sdk/src/event_handler/mod.rs b/crates/matrix-sdk/src/event_handler/mod.rs index c375619811d..83bffe2559f 100644 --- a/crates/matrix-sdk/src/event_handler/mod.rs +++ b/crates/matrix-sdk/src/event_handler/mod.rs @@ -40,16 +40,20 @@ use std::{ pin::Pin, sync::{ atomic::{AtomicU64, Ordering::SeqCst}, - RwLock, + Arc, RwLock, Weak, }, + task::{Context, Poll}, }; use anymap2::any::CloneAnySendSync; +use eyeball::{SharedObservable, Subscriber}; +use futures_core::Stream; use futures_util::stream::{FuturesUnordered, StreamExt}; use matrix_sdk_base::{ deserialized_responses::{EncryptionInfo, SyncTimelineEvent}, SendOutsideWasm, SyncOutsideWasm, }; +use pin_project_lite::pin_project; use ruma::{events::AnySyncStateEvent, push::Action, serde::Raw, OwnedRoomId}; use serde::{de::DeserializeOwned, Deserialize}; use serde_json::value::RawValue as RawJsonValue; @@ -287,7 +291,7 @@ impl Client { room_id: Option, ) -> EventHandlerHandle where - Ev: SyncEvent + DeserializeOwned + Send + 'static, + Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static, H: EventHandler, { let handler_fn: Box = Box::new(move |data| { @@ -535,11 +539,139 @@ impl_event_handler!(A, B, C, D, E, F); impl_event_handler!(A, B, C, D, E, F, G); impl_event_handler!(A, B, C, D, E, F, G, H); +/// An observer of events (may be tailored to a room). +/// +/// To create such observer, use [`Client::observe_events`] or +/// [`Client::observe_room_events`]. +#[derive(Debug)] +pub struct ObservableEventHandler { + /// This type is actually nothing more than a thin glue layer between the + /// [`EventHandler`] mechanism and the reactive programming types from + /// [`eyeball`]. Here, we use a [`SharedObservable`] that is updated by the + /// [`EventHandler`]. + shared_observable: SharedObservable>, + + /// This type owns the [`EventHandlerDropGuard`]. As soon as this type goes + /// out of scope, the event handler is unregistered/removed. + /// + /// [`EventHandlerSubscriber`] holds a weak, non-owning reference, to this + /// guard. It is useful to detect when to close the [`Stream`]: as soon as + /// this type goes out of scope, the subscriber will close itself on poll. + event_handler_guard: Arc, +} + +impl ObservableEventHandler { + pub(crate) fn new( + shared_observable: SharedObservable>, + event_handler_guard: EventHandlerDropGuard, + ) -> Self { + Self { shared_observable, event_handler_guard: Arc::new(event_handler_guard) } + } + + /// Subscribe to this observer. + /// + /// It returns an [`EventHandlerSubscriber`], which implements [`Stream`]. + /// See its documentation to learn more. + pub fn subscribe(&self) -> EventHandlerSubscriber { + EventHandlerSubscriber::new( + self.shared_observable.subscribe(), + // The subscriber holds a weak non-owning reference to the event handler guard, so that + // it can detect when this observer is dropped, and can close the subscriber's stream. + Arc::downgrade(&self.event_handler_guard), + ) + } +} + +pin_project! { + /// The subscriber of an [`ObservableEventHandler`]. + /// + /// To create such subscriber, use [`ObservableEventHandler::subscribe`]. + /// + /// This type implements [`Stream`], which means it is possible to poll the + /// next value asynchronously. In other terms, polling this type will return + /// the new event as soon as they are synced. See [`Client::observe_events`] + /// to learn more. + #[derive(Debug)] + pub struct EventHandlerSubscriber { + // The `Subscriber` associated to the `SharedObservable` inside + // `ObservableEventHandle`. + // + // Keep in mind all this API is just a thin glue layer between + // `EventHandle` and `SharedObservable`, that's… maagiic! + #[pin] + subscriber: Subscriber>, + + // A weak non-owning reference to the event handler guard from + // `ObservableEventHandler`. When this type is polled (via its `Stream` + // implementation), it is possible to detect whether the observable has + // been dropped by upgrading this weak reference, and close the `Stream` + // if it needs to. + event_handler_guard: Weak, + } +} + +impl EventHandlerSubscriber { + fn new( + subscriber: Subscriber>, + event_handler_handle: Weak, + ) -> Self { + Self { subscriber, event_handler_guard: event_handler_handle } + } +} + +impl Stream for EventHandlerSubscriber +where + T: Clone, +{ + type Item = T; + + fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + let Some(_) = this.event_handler_guard.upgrade() else { + // The `EventHandlerHandle` has been dropped via `EventHandlerDropGuard`. It + // means the `ObservableEventHandler` has been dropped. It's time to + // close this stream. + return Poll::Ready(None); + }; + + // First off, the subscriber is of type `Subscriber>` because the + // `SharedObservable` starts with a `None` value to indicate it has no yet + // received any update. We want the `Stream` to return `T`, not `Option`. We + // then filter out all `None` value. + // + // Second, when a `None` value is met, we want to poll again (hence the `loop`). + // At best, there is a new value to return. At worst, the subscriber will return + // `Poll::Pending` and will register the wakers accordingly. + + loop { + match this.subscriber.as_mut().poll_next(context) { + // Stream has been closed somehow. + Poll::Ready(None) => return Poll::Ready(None), + + // The initial value (of the `SharedObservable` behind `self.subscriber`) has been + // polled. We want to filter it out. + Poll::Ready(Some(None)) => { + // Loop over. + continue; + } + + // We have a new value! + Poll::Ready(Some(Some(value))) => return Poll::Ready(Some(value)), + + // Classical pending. + Poll::Pending => return Poll::Pending, + } + } + } +} + #[cfg(test)] mod tests { use matrix_sdk_test::{ async_test, InvitedRoomBuilder, JoinedRoomBuilder, DEFAULT_TEST_ROOM_ID, }; + use stream_assert::{assert_closed, assert_pending, assert_ready}; #[cfg(target_arch = "wasm32")] wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); use std::{ @@ -753,6 +885,20 @@ mod tests { Ok(()) } + #[async_test] + #[allow(dependency_on_unit_never_type_fallback)] + async fn test_add_event_handler_with_tuples() -> crate::Result<()> { + let client = logged_in_client(None).await; + + client.add_event_handler( + |_ev: OriginalSyncRoomMemberEvent, (_room, _client): (Room, Client)| future::ready(()), + ); + + // If it compiles, it works. No need to assert anything. + + Ok(()) + } + #[async_test] #[allow(dependency_on_unit_never_type_fallback)] async fn test_remove_event_handler() -> crate::Result<()> { @@ -870,4 +1016,152 @@ mod tests { assert_eq!(counter.load(SeqCst), 1); Ok(()) } + + #[async_test] + #[allow(dependency_on_unit_never_type_fallback)] + async fn test_observe_events() -> crate::Result<()> { + let client = logged_in_client(None).await; + + let room_id_0 = room_id!("!r0.matrix.org"); + let room_id_1 = room_id!("!r1.matrix.org"); + + let observable = client.observe_events::(); + + let mut subscriber = observable.subscribe(); + + assert_pending!(subscriber); + + let mut response_builder = SyncResponseBuilder::new(); + let response = response_builder + .add_joined_room(JoinedRoomBuilder::new(room_id_0).add_state_event( + StateTestEvent::Custom(json!({ + "content": { + "name": "Name 0" + }, + "event_id": "$ev0", + "origin_server_ts": 1, + "sender": "@mnt_io:matrix.org", + "state_key": "", + "type": "m.room.name", + "unsigned": { + "age": 1, + } + })), + )) + .build_sync_response(); + client.process_sync(response).await?; + + let (room_name, room) = assert_ready!(subscriber); + + assert_eq!(room_name.event_id.as_str(), "$ev0"); + assert_eq!(room.room_id(), room_id_0); + assert_eq!(room.name().unwrap(), "Name 0"); + + assert_pending!(subscriber); + + let response = response_builder + .add_joined_room(JoinedRoomBuilder::new(room_id_1).add_state_event( + StateTestEvent::Custom(json!({ + "content": { + "name": "Name 1" + }, + "event_id": "$ev1", + "origin_server_ts": 2, + "sender": "@mnt_io:matrix.org", + "state_key": "", + "type": "m.room.name", + "unsigned": { + "age": 2, + } + })), + )) + .build_sync_response(); + client.process_sync(response).await?; + + let (room_name, room) = assert_ready!(subscriber); + + assert_eq!(room_name.event_id.as_str(), "$ev1"); + assert_eq!(room.room_id(), room_id_1); + assert_eq!(room.name().unwrap(), "Name 1"); + + assert_pending!(subscriber); + + drop(observable); + assert_closed!(subscriber); + + Ok(()) + } + + #[async_test] + #[allow(dependency_on_unit_never_type_fallback)] + async fn test_observe_room_events() -> crate::Result<()> { + let client = logged_in_client(None).await; + + let room_id = room_id!("!r0.matrix.org"); + + let observable_for_room = + client.observe_room_events::(room_id); + + let mut subscriber_for_room = observable_for_room.subscribe(); + + assert_pending!(subscriber_for_room); + + let mut response_builder = SyncResponseBuilder::new(); + let response = response_builder + .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event( + StateTestEvent::Custom(json!({ + "content": { + "name": "Name 0" + }, + "event_id": "$ev0", + "origin_server_ts": 1, + "sender": "@mnt_io:matrix.org", + "state_key": "", + "type": "m.room.name", + "unsigned": { + "age": 1, + } + })), + )) + .build_sync_response(); + client.process_sync(response).await?; + + let (room_name, (room, _client)) = assert_ready!(subscriber_for_room); + + assert_eq!(room_name.event_id.as_str(), "$ev0"); + assert_eq!(room.name().unwrap(), "Name 0"); + + assert_pending!(subscriber_for_room); + + let response = response_builder + .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event( + StateTestEvent::Custom(json!({ + "content": { + "name": "Name 1" + }, + "event_id": "$ev1", + "origin_server_ts": 2, + "sender": "@mnt_io:matrix.org", + "state_key": "", + "type": "m.room.name", + "unsigned": { + "age": 2, + } + })), + )) + .build_sync_response(); + client.process_sync(response).await?; + + let (room_name, (room, _client)) = assert_ready!(subscriber_for_room); + + assert_eq!(room_name.event_id.as_str(), "$ev1"); + assert_eq!(room.name().unwrap(), "Name 1"); + + assert_pending!(subscriber_for_room); + + drop(observable_for_room); + assert_closed!(subscriber_for_room); + + Ok(()) + } }