Skip to content

Commit

Permalink
feat(sdk): Implement Client::observe_events and `Client::observe_ro…
Browse files Browse the repository at this point in the history
…om_events`.

Changelog: This patch introduces a mechanism similar to
 `Client::add_event_handler` and `Client::add_room_event_handler`
 but with a reactive programming pattern. This patch adds
 `Client::observe_events` and `Client::observe_room_events`.

 ```rust
 // Get an observer.
 let observer =
     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();

 // Subscribe to the observer.
 let mut subscriber = observer.subscribe();

 // Use the subscriber as a `Stream`.
 let (message_event, (room, push_actions)) = subscriber.next().await.unwrap();
 ```

 When calling `observe_events`, one has to specify the type of event
 (in the example, `SyncRoomMessageEvent`) and a context (in the example,
 `(Room, Vec<Action>)`, respectively for the room and the push actions).
  • Loading branch information
Hywan committed Nov 12, 2024
1 parent c1a45e4 commit e817a85
Show file tree
Hide file tree
Showing 4 changed files with 370 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/matrix-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ matrix-sdk-sqlite = { workspace = true, optional = true }
matrix-sdk-test = { workspace = true, optional = true }
mime = "0.3.16"
mime2ext = "0.1.52"
pin-project-lite = { workspace = true }
rand = { workspace = true , optional = true }
ruma = { workspace = true, features = ["rand", "unstable-msc2448", "unstable-msc2965", "unstable-msc3930", "unstable-msc3245-v1-compat", "unstable-msc2867"] }
serde = { workspace = true }
Expand Down
89 changes: 87 additions & 2 deletions crates/matrix-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::{
collections::{btree_map, BTreeMap},
fmt::{self, Debug},
future::Future,
future::{ready, Future},
pin::Pin,
sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
};
Expand Down Expand Up @@ -88,7 +88,8 @@ use crate::{
error::{HttpError, HttpResult},
event_cache::EventCache,
event_handler::{
EventHandler, EventHandlerDropGuard, EventHandlerHandle, EventHandlerStore, SyncEvent,
EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
EventHandlerStore, ObservableEventHandler, SyncEvent,
},
http_client::HttpClient,
matrix_auth::MatrixAuth,
Expand Down Expand Up @@ -804,6 +805,90 @@ impl Client {
self.add_event_handler_impl(handler, Some(room_id.to_owned()))
}

/// Observe a specific event type.
///
/// `Ev` represents the kind of event that will be observed. `Ctx`
/// represents the context that will come with the event. It relies on the
/// same mechanism as [`Self::add_event_handler`]. The main difference is
/// that it returns a [`ObservableEventHandler`] and doesn't require a
/// user-defined closure. It is possible to subscribe to the
/// [`ObservableEventHandler`] to get a [`EventHandlerSubscriber`], which
/// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
/// Ctx)`.
///
/// # Example
///
/// ```
/// use futures_util::StreamExt as _;
/// use matrix_sdk::{
/// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
/// Client, Room,
/// };
///
/// # async fn example(client: Client) {
/// let observer =
/// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
///
/// let mut subscriber = observer.subscribe();
///
/// let (message_event, (room, push_actions)) =
/// subscriber.next().await.unwrap();
/// # }
/// ```
///
/// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
where
Ev: SyncEvent + DeserializeOwned + Send + Sync + 'static,
Ctx: EventHandlerContext + Send + Sync + 'static,
{
self.observe_room_events_impl(None)
}

/// Observe a specific room, and event type.
///
/// This method works the same way as
/// [`observe_events`][Self::observe_events], except that the observability
/// will be only be applied for events in the room with the specified ID.
/// See that method for more details.
pub fn observe_room_events<Ev, Ctx>(
&self,
room_id: &RoomId,
) -> ObservableEventHandler<(Ev, Ctx)>
where
Ev: SyncEvent + DeserializeOwned + Send + Sync + 'static,
Ctx: EventHandlerContext + Send + Sync + 'static,
{
self.observe_room_events_impl(Some(room_id.to_owned()))
}

/// Shared implementation for `Self::observe_events` and
/// `Self::observe_room_events`.
fn observe_room_events_impl<Ev, Ctx>(
&self,
room_id: Option<OwnedRoomId>,
) -> ObservableEventHandler<(Ev, Ctx)>
where
Ev: SyncEvent + DeserializeOwned + Send + Sync + 'static,
Ctx: EventHandlerContext + Send + Sync + 'static,
{
// The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
// new value.
let shared_observable = SharedObservable::new(None);

ObservableEventHandler::new(
shared_observable.clone(),
self.event_handler_drop_guard(self.add_event_handler_impl(
move |event: Ev, context: Ctx| {
shared_observable.set(Some((event, context)));

ready(())
},
room_id,
)),
)
}

/// Remove the event handler associated with the handle.
///
/// Note that you **must not** call `remove_event_handler` from the
Expand Down
Loading

0 comments on commit e817a85

Please sign in to comment.