From 174191ecc44fd9c436ee39490d1052b78de7812b Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 12 Nov 2024 18:01:49 +0100 Subject: [PATCH] feat(sdk): Implement `Client::observe_events` and `Client::observe_room_events`. Changelog: This patch introduces a mechanism similar to `Client::add_event_handler` and `Client::add_room_event_handler` but with a reactive programming pattern. This patch adds `Client::observe_events` and `Client::observe_room_events`. ```rust // Get an observer. let observer = client.observe_events::)>(); // Subscribe to the observer. let mut subscriber = observer.subscribe(); // Use the subscriber as a `Stream`. let (message_event, (room, push_actions)) = subscriber.next().await.unwrap(); ``` When calling `observe_events`, one has to specify the type of event (in the example, `SyncRoomMessageEvent`) and a context (in the example, `(Room, Vec)`, respectively for the room and the push actions). --- Cargo.lock | 1 + crates/matrix-sdk/Cargo.toml | 1 + crates/matrix-sdk/src/client/mod.rs | 89 ++++++- crates/matrix-sdk/src/event_handler/mod.rs | 282 ++++++++++++++++++++- 4 files changed, 370 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be3ee36dc0b..3eb9db568af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2915,6 +2915,7 @@ dependencies = [ "mime2ext", "once_cell", "openidconnect", + "pin-project-lite", "proptest", "rand", "reqwest", diff --git a/crates/matrix-sdk/Cargo.toml b/crates/matrix-sdk/Cargo.toml index 27f91ed7ef4..d761ca73949 100644 --- a/crates/matrix-sdk/Cargo.toml +++ b/crates/matrix-sdk/Cargo.toml @@ -97,6 +97,7 @@ matrix-sdk-sqlite = { workspace = true, optional = true } matrix-sdk-test = { workspace = true, optional = true } mime = "0.3.16" mime2ext = "0.1.52" +pin-project-lite = { workspace = true } rand = { workspace = true , optional = true } ruma = { workspace = true, features = ["rand", "unstable-msc2448", "unstable-msc2965", "unstable-msc3930", "unstable-msc3245-v1-compat", "unstable-msc2867"] } serde = { workspace = true } diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index ca0e34c3b42..cb40365ba7d 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -17,7 +17,7 @@ use std::{ collections::{btree_map, BTreeMap}, fmt::{self, Debug}, - future::Future, + future::{ready, Future}, pin::Pin, sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak}, }; @@ -88,7 +88,8 @@ use crate::{ error::{HttpError, HttpResult}, event_cache::EventCache, event_handler::{ - EventHandler, EventHandlerDropGuard, EventHandlerHandle, EventHandlerStore, SyncEvent, + EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle, + EventHandlerStore, ObservableEventHandler, SyncEvent, }, http_client::HttpClient, matrix_auth::MatrixAuth, @@ -804,6 +805,90 @@ impl Client { self.add_event_handler_impl(handler, Some(room_id.to_owned())) } + /// Observe a specific event type. + /// + /// `Ev` represents the kind of event that will be observed. `Ctx` + /// represents the context that will come with the event. It relies on the + /// same mechanism as [`Self::add_event_handler`]. The main difference is + /// that it returns a [`ObservableEventHandler`] and doesn't require a + /// user-defined closure. It is possible to subscribe to the + /// [`ObservableEventHandler`] to get a [`EventHandlerSubscriber`], which + /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev, + /// Ctx)`. + /// + /// # Example + /// + /// ``` + /// use futures_util::StreamExt as _; + /// use matrix_sdk::{ + /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action}, + /// Client, Room, + /// }; + /// + /// # async fn example(client: Client) { + /// let observer = + /// client.observe_events::)>(); + /// + /// let mut subscriber = observer.subscribe(); + /// + /// let (message_event, (room, push_actions)) = + /// subscriber.next().await.unwrap(); + /// # } + /// ``` + /// + /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber + pub fn observe_events(&self) -> ObservableEventHandler<(Ev, Ctx)> + where + Ev: SyncEvent + DeserializeOwned + Send + Sync + 'static, + Ctx: EventHandlerContext + Send + Sync + 'static, + { + self.observe_room_events_impl(None) + } + + /// Observe a specific room, and event type. + /// + /// This method works the same way as + /// [`observe_events`][Self::observe_events], except that the observability + /// will be only be applied for events in the room with the specified ID. + /// See that method for more details. + pub fn observe_room_events( + &self, + room_id: &RoomId, + ) -> ObservableEventHandler<(Ev, Ctx)> + where + Ev: SyncEvent + DeserializeOwned + Send + Sync + 'static, + Ctx: EventHandlerContext + Send + Sync + 'static, + { + self.observe_room_events_impl(Some(room_id.to_owned())) + } + + /// Shared implementation for `Self::observe_events` and + /// `Self::observe_room_events`. + fn observe_room_events_impl( + &self, + room_id: Option, + ) -> ObservableEventHandler<(Ev, Ctx)> + where + Ev: SyncEvent + DeserializeOwned + Send + Sync + 'static, + Ctx: EventHandlerContext + Send + Sync + 'static, + { + // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a + // new value. + let shared_observable = SharedObservable::new(None); + + ObservableEventHandler::new( + shared_observable.clone(), + self.event_handler_drop_guard(self.add_event_handler_impl( + move |event: Ev, context: Ctx| { + shared_observable.set(Some((event, context))); + + ready(()) + }, + room_id, + )), + ) + } + /// Remove the event handler associated with the handle. /// /// Note that you **must not** call `remove_event_handler` from the diff --git a/crates/matrix-sdk/src/event_handler/mod.rs b/crates/matrix-sdk/src/event_handler/mod.rs index 0c63f33ee21..440e8389fff 100644 --- a/crates/matrix-sdk/src/event_handler/mod.rs +++ b/crates/matrix-sdk/src/event_handler/mod.rs @@ -40,16 +40,20 @@ use std::{ pin::Pin, sync::{ atomic::{AtomicU64, Ordering::SeqCst}, - RwLock, + Arc, RwLock, Weak, }, + task::{Context, Poll}, }; use anymap2::any::CloneAnySendSync; +use eyeball::{SharedObservable, Subscriber}; +use futures_core::Stream; use futures_util::stream::{FuturesUnordered, StreamExt}; use matrix_sdk_base::{ deserialized_responses::{EncryptionInfo, SyncTimelineEvent}, SendOutsideWasm, SyncOutsideWasm, }; +use pin_project_lite::pin_project; use ruma::{events::AnySyncStateEvent, push::Action, serde::Raw, OwnedRoomId}; use serde::{de::DeserializeOwned, Deserialize}; use serde_json::value::RawValue as RawJsonValue; @@ -535,11 +539,139 @@ impl_event_handler!(A, B, C, D, E, F); impl_event_handler!(A, B, C, D, E, F, G); impl_event_handler!(A, B, C, D, E, F, G, H); +/// An observer of events (may be tailored to a room). +/// +/// To create such observer, use [`Client::observe_events`] or +/// [`Client::observe_room_events`]. +#[derive(Debug)] +pub struct ObservableEventHandler { + /// This type is actually nothing more than a thin glue layer between the + /// [`EventHandler`] mechanism and the reactive programming types from + /// [`eyeball`]. Here, we use a [`SharedObservable`] that is updated by the + /// [`EventHandler`]. + shared_observable: SharedObservable>, + + /// This type owns the [`EventHandlerDropGuard`]. As soon as this type goes + /// out of scope, the event handler is unregistered/removed. + /// + /// [`EventHandlerSubscriber`] holds a weak, non-owning reference, to this + /// guard. It is useful to detect when to close the [`Stream`]: as soon as + /// this type goes out of scope, the subscriber will close itself on poll. + event_handler_guard: Arc, +} + +impl ObservableEventHandler { + pub(crate) fn new( + shared_observable: SharedObservable>, + event_handler_guard: EventHandlerDropGuard, + ) -> Self { + Self { shared_observable, event_handler_guard: Arc::new(event_handler_guard) } + } + + /// Subscribe to this observe. + /// + /// It returns a [`EventHandlerSubscriber`], which implements [`Stream`]. + /// See its documentation to learn more. + pub fn subscribe(&self) -> EventHandlerSubscriber { + EventHandlerSubscriber::new( + self.shared_observable.subscribe(), + // The subscriber holds a weak non-owning reference to the event handler guard, so that + // it can detect when this observer is dropped, and can close the subscriber's stream. + Arc::downgrade(&self.event_handler_guard), + ) + } +} + +pin_project! { + /// The subscriber of an [`ObservableEventHandler`]. + /// + /// To create such subscriber, use [`ObservableEventHandler::subscribe`]. + /// + /// This type implements [`Stream`], which means it is possible to poll the + /// next value asynchronously. In other terms, polling this type will return + /// the new event as soon as they are synced. See [`Client::observe_events`] + /// to learn more. + #[derive(Debug)] + pub struct EventHandlerSubscriber { + // The `Subscriber` associated to the `SharedObservable` inside + // `ObservableEventHandle`. + // + // Keep in mind all this API is just a thin glue between `EventHandle` + // and `SharedObservable`, that's… maagiic! + #[pin] + subscriber: Subscriber>, + + // A weak non-owning reference to the event handler guard from + // `ObservableEventHandler`. When this type is polled (via its `Stream` + // implementation), it is possible to detect whether the observable has + // been dropped by upgrading this weak reference, and close the `Stream` + // if it needs to. + event_handler_guard: Weak, + } +} + +impl EventHandlerSubscriber { + fn new( + subscriber: Subscriber>, + event_handler_handle: Weak, + ) -> Self { + Self { subscriber, event_handler_guard: event_handler_handle } + } +} + +impl Stream for EventHandlerSubscriber +where + T: Clone, +{ + type Item = T; + + fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + let Some(_) = this.event_handler_guard.upgrade() else { + // The `EventHandlerHandle` has been dropped via `EventHandlerDropGuard`. It + // means the `ObservableEventHandler` has been dropped. It's time to + // close this stream. + return Poll::Ready(None); + }; + + // First off, the subscriber is of type `Subscriber>` because the + // `SharedObservable` starts with a `None` value to indicate it has no yet + // received any update. We want the `Stream` to return `T`, not `Option`. We + // then filter out all `None` value. + // + // Second, when a `None` value is met, we want to poll again (hence the `loop`). + // At best, there is a new value to return. At worst, the subscriber will return + // `Poll::Pending` and will register the wakers accordingly. + + loop { + match this.subscriber.as_mut().poll_next(context) { + // Stream has been closed somehow. + Poll::Ready(None) => return Poll::Ready(None), + + // The initial value (of the `SharedObservable` behind `self.subscriber`) has been + // polled. We want to filter it out. + Poll::Ready(Some(None)) => { + // Loop over. + continue; + } + + // We have a new value! + Poll::Ready(Some(Some(value))) => return Poll::Ready(Some(value)), + + // Classical pending. + Poll::Pending => return Poll::Pending, + } + } + } +} + #[cfg(test)] mod tests { use matrix_sdk_test::{ async_test, InvitedRoomBuilder, JoinedRoomBuilder, DEFAULT_TEST_ROOM_ID, }; + use stream_assert::{assert_closed, assert_pending, assert_ready}; #[cfg(target_arch = "wasm32")] wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); use std::{ @@ -884,4 +1016,152 @@ mod tests { assert_eq!(counter.load(SeqCst), 1); Ok(()) } + + #[async_test] + #[allow(dependency_on_unit_never_type_fallback)] + async fn test_observe_events() -> crate::Result<()> { + let client = logged_in_client(None).await; + + let room_id_0 = room_id!("!r0.matrix.org"); + let room_id_1 = room_id!("!r1.matrix.org"); + + let observable = client.observe_events::(); + + let mut subscriber = observable.subscribe(); + + assert_pending!(subscriber); + + let mut response_builder = SyncResponseBuilder::new(); + let response = response_builder + .add_joined_room(JoinedRoomBuilder::new(room_id_0).add_state_event( + StateTestEvent::Custom(json!({ + "content": { + "name": "Name 0" + }, + "event_id": "$ev0", + "origin_server_ts": 1, + "sender": "@mnt_io:matrix.org", + "state_key": "", + "type": "m.room.name", + "unsigned": { + "age": 1, + } + })), + )) + .build_sync_response(); + client.process_sync(response).await?; + + let (room_name, room) = assert_ready!(subscriber); + + assert_eq!(room_name.event_id.as_str(), "$ev0"); + assert_eq!(room.room_id(), room_id_0); + assert_eq!(room.name().unwrap(), "Name 0"); + + assert_pending!(subscriber); + + let response = response_builder + .add_joined_room(JoinedRoomBuilder::new(room_id_1).add_state_event( + StateTestEvent::Custom(json!({ + "content": { + "name": "Name 1" + }, + "event_id": "$ev1", + "origin_server_ts": 2, + "sender": "@mnt_io:matrix.org", + "state_key": "", + "type": "m.room.name", + "unsigned": { + "age": 2, + } + })), + )) + .build_sync_response(); + client.process_sync(response).await?; + + let (room_name, room) = assert_ready!(subscriber); + + assert_eq!(room_name.event_id.as_str(), "$ev1"); + assert_eq!(room.room_id(), room_id_1); + assert_eq!(room.name().unwrap(), "Name 1"); + + assert_pending!(subscriber); + + drop(observable); + assert_closed!(subscriber); + + Ok(()) + } + + #[async_test] + #[allow(dependency_on_unit_never_type_fallback)] + async fn test_observe_room_events() -> crate::Result<()> { + let client = logged_in_client(None).await; + + let room_id = room_id!("!r0.matrix.org"); + + let observable_for_room = + client.observe_room_events::(room_id); + + let mut subscriber_for_room = observable_for_room.subscribe(); + + assert_pending!(subscriber_for_room); + + let mut response_builder = SyncResponseBuilder::new(); + let response = response_builder + .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event( + StateTestEvent::Custom(json!({ + "content": { + "name": "Name 0" + }, + "event_id": "$ev0", + "origin_server_ts": 1, + "sender": "@mnt_io:matrix.org", + "state_key": "", + "type": "m.room.name", + "unsigned": { + "age": 1, + } + })), + )) + .build_sync_response(); + client.process_sync(response).await?; + + let (room_name, (room, _client)) = assert_ready!(subscriber_for_room); + + assert_eq!(room_name.event_id.as_str(), "$ev0"); + assert_eq!(room.name().unwrap(), "Name 0"); + + assert_pending!(subscriber_for_room); + + let response = response_builder + .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event( + StateTestEvent::Custom(json!({ + "content": { + "name": "Name 1" + }, + "event_id": "$ev1", + "origin_server_ts": 2, + "sender": "@mnt_io:matrix.org", + "state_key": "", + "type": "m.room.name", + "unsigned": { + "age": 2, + } + })), + )) + .build_sync_response(); + client.process_sync(response).await?; + + let (room_name, (room, _client)) = assert_ready!(subscriber_for_room); + + assert_eq!(room_name.event_id.as_str(), "$ev1"); + assert_eq!(room.name().unwrap(), "Name 1"); + + assert_pending!(subscriber_for_room); + + drop(observable_for_room); + assert_closed!(subscriber_for_room); + + Ok(()) + } }