Skip to content

Commit

Permalink
feat(room): allow subscribing to requests to join a room
Browse files Browse the repository at this point in the history
This subscription will combine 3 streams: one notifying the members in the room have changed, another notifying the seen join requests have changed, and finally a third one notifying when the room members are no longer synced.

With this info we can track when we need to generate a new list of join requests to be emitted so the client can always have an up to date list.
  • Loading branch information
jmartinesp committed Dec 10, 2024
1 parent 1e35d52 commit 1b9e69f
Show file tree
Hide file tree
Showing 4 changed files with 326 additions and 5 deletions.
5 changes: 5 additions & 0 deletions crates/matrix-sdk-base/src/rooms/normal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1302,6 +1302,11 @@ impl RoomInfo {
self.members_synced = false;
}

/// Returns whether the room members are synced.
pub fn are_members_synced(&self) -> bool {
self.members_synced
}

/// Mark this Room as still missing some state information.
pub fn mark_state_partially_synced(&mut self) {
self.sync_info = SyncInfo::PartiallySynced;
Expand Down
148 changes: 147 additions & 1 deletion crates/matrix-sdk/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
time::Duration,
};

use async_stream::stream;
#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
use async_trait::async_trait;
use eyeball::SharedObservable;
Expand Down Expand Up @@ -85,6 +86,7 @@ use ruma::{
avatar::{self, RoomAvatarEventContent},
encryption::RoomEncryptionEventContent,
history_visibility::HistoryVisibility,
member::{MembershipChange, SyncRoomMemberEvent},
message::{
AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
FormattedBody, ImageMessageEventContent, MessageType, RoomMessageEventContent,
Expand Down Expand Up @@ -116,6 +118,7 @@ use ruma::{
use serde::de::DeserializeOwned;
use thiserror::Error;
use tokio::sync::broadcast;
use tokio_stream::StreamExt;
use tracing::{debug, info, instrument, warn};

use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
Expand All @@ -135,7 +138,10 @@ use crate::{
live_location_share::ObservableLiveLocation,
media::{MediaFormat, MediaRequestParameters},
notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
room::power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
room::{
power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
request_to_join::JoinRequest,
},
sync::RoomUpdate,
utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
Expand Down Expand Up @@ -3195,6 +3201,135 @@ impl Room {
ObservableLiveLocation::new(&self.client, self.room_id())
}

/// Helper to requests to join this `Room`.
///
/// The current requests to join the room will be emitted immediately
/// when subscribing. When a new membership event is received, a request is
/// marked as seen or there is a limited sync, a new set of requests
/// will be emitted.
pub async fn subscribe_to_join_requests(&self) -> Result<impl Stream<Item = Vec<JoinRequest>>> {
let this = Arc::new(self.clone());

let requests_observable =
this.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());

let (current_seen_ids, mut seen_request_ids_stream) =
this.subscribe_to_seen_join_request_ids().await?;

let mut room_info_stream = this.subscribe_info();

let combined_stream = stream! {
// Emit current requests to join
match this.clone().get_current_join_requests(&current_seen_ids).await {
Ok(initial_requests) => yield initial_requests,
Err(e) => warn!("Failed to get initial requests to join: {e:?}")
}

let mut requests_stream = requests_observable.subscribe();

let mut new_event: Option<SyncRoomMemberEvent> = None;
let mut seen_ids = current_seen_ids.clone();
let mut prev_seen_ids = current_seen_ids;
let mut prev_missing_room_members: bool = false;
let mut missing_room_members: bool = false;

loop {
// This is equivalent to a combine stream operation, triggering a new emission
// when any of the branches changes
tokio::select! {
Some((next, _)) = requests_stream.next() => { new_event = Some(next); }
Some(next) = seen_request_ids_stream.next() => { seen_ids = next; }
Some(next) = room_info_stream.next() => {
missing_room_members = !next.are_members_synced()
}
else => break,
}

// We need to emit new items when we may have missing room members:
// this usually happens after a gappy (limited) sync
let has_missing_room_members = prev_missing_room_members != missing_room_members;
if has_missing_room_members {
prev_missing_room_members = missing_room_members;
}

// We need to emit new items if the seen join request ids have changed
let has_new_seen_ids = prev_seen_ids != seen_ids;
if has_new_seen_ids {
prev_seen_ids = seen_ids.clone();
}

if let Some(SyncStateEvent::Original(event)) = new_event.clone() {
// Reset the new event value so we can check this again in the next loop
new_event = None;

// If we can calculate the membership change, try to emit only when needed
if event.prev_content().is_some() {
match event.membership_change() {
MembershipChange::Banned |
MembershipChange::Knocked |
MembershipChange::KnockAccepted |
MembershipChange::KnockDenied |
MembershipChange::KnockRetracted => {
match this.clone().get_current_join_requests(&seen_ids).await {
Ok(requests) => yield requests,
Err(e) => {
warn!("Failed to get updated requests to join on membership change: {e:?}")
}
}
}
_ => (),
}
} else {
// If we can't calculate the membership change, assume we need to
// emit updated values
match this.clone().get_current_join_requests(&seen_ids).await {
Ok(requests) => yield requests,
Err(e) => {
warn!("Failed to get updated requests to join on new member event: {e:?}")
}
}
}
} else if has_new_seen_ids || has_missing_room_members {
// If seen requests have changed or we have missing room members,
// we need to recalculate all the requests to join
match this.clone().get_current_join_requests(&seen_ids).await {
Ok(requests) => yield requests,
Err(e) => {
warn!("Failed to get updated requests to join on seen ids changed: {e:?}")
}
}
}
}
};

Ok(combined_stream)
}

async fn get_current_join_requests(
&self,
seen_request_ids: &HashSet<OwnedEventId>,
) -> Result<Vec<JoinRequest>> {
Ok(self
.members(RoomMemberships::KNOCK)
.await?
.into_iter()
.filter_map(|member| {
if let Some(event_id) = member.event().event_id() {
let event_id = event_id.to_owned();
Some(JoinRequest::new(
self,
&event_id,
member.event().timestamp(),
member.into(),
seen_request_ids.contains(&event_id),
))
} else {
None
}
})
.collect())
}

/// Mark a list of requests to join the room as seen, given their state
/// event ids.
pub async fn mark_join_requests_as_seen(&self, event_ids: &[OwnedEventId]) -> Result<()> {
Expand Down Expand Up @@ -3236,6 +3371,17 @@ impl Room {
};
Ok(current_join_request_ids)
}

/// Subscribes to the set of requests to join that have been marked as
/// 'seen'.
pub async fn subscribe_to_seen_join_request_ids(
&self,
) -> Result<(HashSet<OwnedEventId>, impl Stream<Item = HashSet<OwnedEventId>>)> {
let current = self.get_seen_join_request_ids().await?;
let subscriber =
self.seen_join_request_ids.subscribe().map(|values| values.unwrap_or_default());
Ok((current, subscriber))
}
}

#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
Expand Down
62 changes: 60 additions & 2 deletions crates/matrix-sdk/src/test_utils/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use matrix_sdk_test::{
};
use ruma::{
directory::PublicRoomsChunk,
events::{AnyStateEvent, AnyTimelineEvent, MessageLikeEventType, StateEventType},
events::{
room::member::RoomMemberEvent, AnyStateEvent, AnyTimelineEvent, MessageLikeEventType,
StateEventType,
},
serde::Raw,
time::Duration,
MxcUri, OwnedEventId, OwnedRoomId, RoomId, ServerName,
Expand Down Expand Up @@ -608,6 +611,49 @@ impl MatrixMockServer {
MockEndpoint { mock, server: &self.server, endpoint: DeleteRoomKeysVersionEndpoint }
}

/// Create a prebuilt mock for getting the room members in a room.
///
/// # Examples
///
/// ``` #
/// tokio_test::block_on(async {
/// use matrix_sdk_base::RoomMemberships;
/// use ruma::events::room::member::MembershipState;
/// use ruma::events::room::member::RoomMemberEventContent;
/// use ruma::user_id;
/// use matrix_sdk_test::event_factory::EventFactory;
/// use matrix_sdk::{
/// ruma::{event_id, room_id},
/// test_utils::mocks::MatrixMockServer,
/// };
/// let mock_server = MatrixMockServer::new().await;
/// let client = mock_server.client_builder().build().await;
/// let event_id = event_id!("$id");
/// let room_id = room_id!("!room_id:localhost");
///
/// let f = EventFactory::new().room(room_id);
/// let alice_user_id = user_id!("@alice:b.c");
/// let alice_knock_event = f
/// .event(RoomMemberEventContent::new(MembershipState::Knock))
/// .event_id(event_id)
/// .sender(alice_user_id)
/// .state_key(alice_user_id)
/// .into_raw_timeline()
/// .cast();
///
/// mock_server.mock_get_members().ok(vec![alice_knock_event]).mock_once().mount().await;
/// let room = mock_server.sync_joined_room(&client, room_id).await;
///
/// let members = room.members(RoomMemberships::all()).await.unwrap();
/// assert_eq!(members.len(), 1);
/// # });
/// ```
pub fn mock_get_members(&self) -> MockEndpoint<'_, GetRoomMembersEndpoint> {
let mock =
Mock::given(method("GET")).and(path_regex(r"^/_matrix/client/v3/rooms/.*/members$"));
MockEndpoint { mock, server: &self.server, endpoint: GetRoomMembersEndpoint }
}

/// Creates a prebuilt mock for inviting a user to a room by its id.
///
/// # Examples
Expand Down Expand Up @@ -1112,7 +1158,7 @@ impl<'a> MockEndpoint<'a, RoomSendEndpoint> {
///
/// let response = room.client().send(r, None).await.unwrap();
/// // The delayed `m.room.message` event type should be mocked by the server.
/// assert_eq!("$some_id", response.delay_id);
/// assert_eq!("$some_id", response.delay_id);
/// # anyhow::Ok(()) });
/// ```
pub fn with_delay(self, delay: Duration) -> Self {
Expand Down Expand Up @@ -1851,6 +1897,18 @@ impl<'a> MockEndpoint<'a, DeleteRoomKeysVersionEndpoint> {
}
}

/// A prebuilt mock for `GET /members` request.
pub struct GetRoomMembersEndpoint;

impl<'a> MockEndpoint<'a, GetRoomMembersEndpoint> {
/// Returns a successful get members request with a list of members.
pub fn ok(self, members: Vec<Raw<RoomMemberEvent>>) -> MatrixMock<'a> {
let mock = self.mock.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"chunk": members,
})));
MatrixMock { server: self.server, mock }
}
}

/// A prebuilt mock for `POST /invite` request.
pub struct InviteUserByIdEndpoint;
Expand Down
Loading

0 comments on commit 1b9e69f

Please sign in to comment.