Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): Implement Client::observe_events and Client::observe_room_events #4253

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/matrix-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ matrix-sdk-sqlite = { workspace = true, optional = true }
matrix-sdk-test = { workspace = true, optional = true }
mime = "0.3.16"
mime2ext = "0.1.52"
pin-project-lite = { workspace = true }
rand = { workspace = true , optional = true }
ruma = { workspace = true, features = ["rand", "unstable-msc2448", "unstable-msc2965", "unstable-msc3930", "unstable-msc3245-v1-compat", "unstable-msc2867"] }
serde = { workspace = true }
Expand Down
143 changes: 128 additions & 15 deletions crates/matrix-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::{
collections::{btree_map, BTreeMap},
fmt::{self, Debug},
future::Future,
future::{ready, Future},
pin::Pin,
sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
};
Expand Down Expand Up @@ -88,7 +88,8 @@ use crate::{
error::{HttpError, HttpResult},
event_cache::EventCache,
event_handler::{
EventHandler, EventHandlerDropGuard, EventHandlerHandle, EventHandlerStore, SyncEvent,
EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
EventHandlerStore, ObservableEventHandler, SyncEvent,
},
http_client::HttpClient,
matrix_auth::MatrixAuth,
Expand Down Expand Up @@ -681,8 +682,6 @@ impl Client {
/// # Examples
///
/// ```no_run
/// # use url::Url;
/// # let homeserver = Url::parse("http://localhost:8080").unwrap();
/// use matrix_sdk::{
/// deserialized_responses::EncryptionInfo,
/// event_handler::Ctx,
Expand All @@ -699,14 +698,7 @@ impl Client {
/// };
/// use serde::{Deserialize, Serialize};
///
/// # futures_executor::block_on(async {
/// # let client = matrix_sdk::Client::builder()
/// # .homeserver_url(homeserver)
/// # .server_versions([ruma::api::MatrixVersion::V1_0])
/// # .build()
/// # .await
/// # .unwrap();
/// #
/// # async fn example(client: Client) {
/// client.add_event_handler(
/// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
/// // Common usage: Room event plus room and client.
Expand Down Expand Up @@ -772,11 +764,11 @@ impl Client {
/// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
/// println!("Calling the handler with identifier {data}");
/// });
/// # });
/// # }
/// ```
pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
where
Ev: SyncEvent + DeserializeOwned + Send + 'static,
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
H: EventHandler<Ev, Ctx>,
{
self.add_event_handler_impl(handler, None)
Expand All @@ -798,12 +790,133 @@ impl Client {
handler: H,
) -> EventHandlerHandle
where
Ev: SyncEvent + DeserializeOwned + Send + 'static,
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
H: EventHandler<Ev, Ctx>,
{
self.add_event_handler_impl(handler, Some(room_id.to_owned()))
}

/// Observe a specific event type.
///
/// `Ev` represents the kind of event that will be observed. `Ctx`
/// represents the context that will come with the event. It relies on the
/// same mechanism as [`Client::add_event_handler`]. The main difference is
/// that it returns an [`ObservableEventHandler`] and doesn't require a
/// user-defined closure. It is possible to subscribe to the
/// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
/// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
/// Ctx)`.
///
/// # Example
///
/// Let's see a classical usage:
///
/// ```
/// use futures_util::StreamExt as _;
/// use matrix_sdk::{
/// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
/// Client, Room,
/// };
///
/// # async fn example(client: Client) -> Option<()> {
/// let observer =
/// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
///
/// let mut subscriber = observer.subscribe();
///
/// let (event, (room, push_actions)) = subscriber.next().await?;
/// # Some(())
/// # }
/// ```
///
/// Now let's see how to get several contexts that can be useful for you:
///
/// ```
/// use matrix_sdk::{
/// deserialized_responses::EncryptionInfo,
/// ruma::{
/// events::room::{
/// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
/// },
/// push::Action,
/// },
/// Client, Room,
/// };
///
/// # async fn example(client: Client) {
/// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
/// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
///
/// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
/// // to distinguish between unencrypted events and events that were decrypted
/// // by the SDK.
/// let _ = client
/// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
/// );
///
/// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
/// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
/// // should be highlighted in the timeline.
/// let _ =
/// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
///
/// // Observe `SyncRoomTopicEvent` and fetch nothing else.
/// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
/// # }
Hywan marked this conversation as resolved.
Show resolved Hide resolved
/// ```
///
/// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
where
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
{
self.observe_room_events_impl(None)
}

/// Observe a specific room, and event type.
///
/// This method works the same way as [`Client::observe_events`], except
/// that the observability will only be applied for events in the room with
/// the specified ID. See that method for more details.
pub fn observe_room_events<Ev, Ctx>(
&self,
room_id: &RoomId,
) -> ObservableEventHandler<(Ev, Ctx)>
where
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
{
self.observe_room_events_impl(Some(room_id.to_owned()))
}

/// Shared implementation for `Client::observe_events` and
/// `Client::observe_room_events`.
fn observe_room_events_impl<Ev, Ctx>(
&self,
room_id: Option<OwnedRoomId>,
) -> ObservableEventHandler<(Ev, Ctx)>
where
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
{
// The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
// new value.
let shared_observable = SharedObservable::new(None);

ObservableEventHandler::new(
shared_observable.clone(),
self.event_handler_drop_guard(self.add_event_handler_impl(
move |event: Ev, context: Ctx| {
shared_observable.set(Some((event, context)));

ready(())
},
room_id,
)),
)
}

/// Remove the event handler associated with the handle.
///
/// Note that you **must not** call `remove_event_handler` from the
Expand Down
35 changes: 35 additions & 0 deletions crates/matrix-sdk/src/event_handler/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,38 @@ impl<T> Deref for Ctx<T> {
&self.0
}
}

// `EventHandlerContext` for tuples.

impl EventHandlerContext for () {
fn from_data(_data: &EventHandlerData<'_>) -> Option<Self> {
Some(())
}
}

macro_rules! impl_context_for_tuple {
( $( $ty:ident ),* $(,)? ) => {
#[allow(non_snake_case)]
impl< $( $ty ),* > EventHandlerContext for ( $( $ty ),* , )
where
$( $ty : EventHandlerContext, )*
{
fn from_data(data: &EventHandlerData<'_>) -> Option<Self> {
$(
let $ty = $ty ::from_data(data)?;
)*

Some(( $( $ty ),* , ))
}
}
};
}

impl_context_for_tuple!(A);
impl_context_for_tuple!(A, B);
impl_context_for_tuple!(A, B, C);
impl_context_for_tuple!(A, B, C, D);
impl_context_for_tuple!(A, B, C, D, E);
impl_context_for_tuple!(A, B, C, D, E, F);
impl_context_for_tuple!(A, B, C, D, E, F, G);
impl_context_for_tuple!(A, B, C, D, E, F, G, H);
Loading
Loading