diff --git a/crates/matrix-sdk-base/src/rooms/normal.rs b/crates/matrix-sdk-base/src/rooms/normal.rs index ed61badf8d8..d0d69b26796 100644 --- a/crates/matrix-sdk-base/src/rooms/normal.rs +++ b/crates/matrix-sdk-base/src/rooms/normal.rs @@ -1302,6 +1302,11 @@ impl RoomInfo { self.members_synced = false; } + /// Returns whether the room members are synced. + pub fn are_members_synced(&self) -> bool { + self.members_synced + } + /// Mark this Room as still missing some state information. pub fn mark_state_partially_synced(&mut self) { self.sync_info = SyncInfo::PartiallySynced; diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index a257c4fe3c9..25ba1098bb5 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -22,6 +22,7 @@ use std::{ time::Duration, }; +use async_stream::stream; #[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))] use async_trait::async_trait; use eyeball::SharedObservable; @@ -85,6 +86,7 @@ use ruma::{ avatar::{self, RoomAvatarEventContent}, encryption::RoomEncryptionEventContent, history_visibility::HistoryVisibility, + member::{MembershipChange, SyncRoomMemberEvent}, message::{ AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent, FormattedBody, ImageMessageEventContent, MessageType, RoomMessageEventContent, @@ -116,6 +118,7 @@ use ruma::{ use serde::de::DeserializeOwned; use thiserror::Error; use tokio::sync::broadcast; +use tokio_stream::StreamExt; use tracing::{debug, info, instrument, warn}; use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent}; @@ -135,7 +138,10 @@ use crate::{ live_location_share::ObservableLiveLocation, media::{MediaFormat, MediaRequestParameters}, notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode}, - room::power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt}, + room::{ + power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt}, + request_to_join::JoinRequest, + }, sync::RoomUpdate, utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent}, BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress, @@ -3195,6 +3201,135 @@ impl Room { ObservableLiveLocation::new(&self.client, self.room_id()) } + /// Helper to requests to join this `Room`. + /// + /// The current requests to join the room will be emitted immediately + /// when subscribing. When a new membership event is received, a request is + /// marked as seen or there is a limited sync, a new set of requests + /// will be emitted. + pub async fn subscribe_to_join_requests(&self) -> Result>> { + let this = Arc::new(self.clone()); + + let requests_observable = + this.client.observe_room_events::(this.room_id()); + + let (current_seen_ids, mut seen_request_ids_stream) = + this.subscribe_to_seen_join_request_ids().await?; + + let mut room_info_stream = this.subscribe_info(); + + let combined_stream = stream! { + // Emit current requests to join + match this.clone().get_current_join_requests(¤t_seen_ids).await { + Ok(initial_requests) => yield initial_requests, + Err(e) => warn!("Failed to get initial requests to join: {e:?}") + } + + let mut requests_stream = requests_observable.subscribe(); + + let mut new_event: Option = None; + let mut seen_ids = current_seen_ids.clone(); + let mut prev_seen_ids = current_seen_ids; + let mut prev_missing_room_members: bool = false; + let mut missing_room_members: bool = false; + + loop { + // This is equivalent to a combine stream operation, triggering a new emission + // when any of the branches changes + tokio::select! { + Some((next, _)) = requests_stream.next() => { new_event = Some(next); } + Some(next) = seen_request_ids_stream.next() => { seen_ids = next; } + Some(next) = room_info_stream.next() => { + missing_room_members = !next.are_members_synced() + } + else => break, + } + + // We need to emit new items when we may have missing room members: + // this usually happens after a gappy (limited) sync + let has_missing_room_members = prev_missing_room_members != missing_room_members; + if has_missing_room_members { + prev_missing_room_members = missing_room_members; + } + + // We need to emit new items if the seen join request ids have changed + let has_new_seen_ids = prev_seen_ids != seen_ids; + if has_new_seen_ids { + prev_seen_ids = seen_ids.clone(); + } + + if let Some(SyncStateEvent::Original(event)) = new_event.clone() { + // Reset the new event value so we can check this again in the next loop + new_event = None; + + // If we can calculate the membership change, try to emit only when needed + if event.prev_content().is_some() { + match event.membership_change() { + MembershipChange::Banned | + MembershipChange::Knocked | + MembershipChange::KnockAccepted | + MembershipChange::KnockDenied | + MembershipChange::KnockRetracted => { + match this.clone().get_current_join_requests(&seen_ids).await { + Ok(requests) => yield requests, + Err(e) => { + warn!("Failed to get updated requests to join on membership change: {e:?}") + } + } + } + _ => (), + } + } else { + // If we can't calculate the membership change, assume we need to + // emit updated values + match this.clone().get_current_join_requests(&seen_ids).await { + Ok(requests) => yield requests, + Err(e) => { + warn!("Failed to get updated requests to join on new member event: {e:?}") + } + } + } + } else if has_new_seen_ids || has_missing_room_members { + // If seen requests have changed or we have missing room members, + // we need to recalculate all the requests to join + match this.clone().get_current_join_requests(&seen_ids).await { + Ok(requests) => yield requests, + Err(e) => { + warn!("Failed to get updated requests to join on seen ids changed: {e:?}") + } + } + } + } + }; + + Ok(combined_stream) + } + + async fn get_current_join_requests( + &self, + seen_request_ids: &HashSet, + ) -> Result> { + Ok(self + .members(RoomMemberships::KNOCK) + .await? + .into_iter() + .filter_map(|member| { + if let Some(event_id) = member.event().event_id() { + let event_id = event_id.to_owned(); + Some(JoinRequest::new( + self, + &event_id, + member.event().timestamp(), + member.into(), + seen_request_ids.contains(&event_id), + )) + } else { + None + } + }) + .collect()) + } + /// Mark a list of requests to join the room as seen, given their state /// event ids. pub async fn mark_join_requests_as_seen(&self, event_ids: &[OwnedEventId]) -> Result<()> { @@ -3236,6 +3371,17 @@ impl Room { }; Ok(current_join_request_ids) } + + /// Subscribes to the set of requests to join that have been marked as + /// 'seen'. + pub async fn subscribe_to_seen_join_request_ids( + &self, + ) -> Result<(HashSet, impl Stream>)> { + let current = self.get_seen_join_request_ids().await?; + let subscriber = + self.seen_join_request_ids.subscribe().map(|values| values.unwrap_or_default()); + Ok((current, subscriber)) + } } #[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))] diff --git a/crates/matrix-sdk/src/test_utils/mocks.rs b/crates/matrix-sdk/src/test_utils/mocks.rs index 28fbf9b91e7..19ffee007ea 100644 --- a/crates/matrix-sdk/src/test_utils/mocks.rs +++ b/crates/matrix-sdk/src/test_utils/mocks.rs @@ -29,7 +29,10 @@ use matrix_sdk_test::{ }; use ruma::{ directory::PublicRoomsChunk, - events::{AnyStateEvent, AnyTimelineEvent, MessageLikeEventType, StateEventType}, + events::{ + room::member::RoomMemberEvent, AnyStateEvent, AnyTimelineEvent, MessageLikeEventType, + StateEventType, + }, serde::Raw, time::Duration, MxcUri, OwnedEventId, OwnedRoomId, RoomId, ServerName, @@ -608,6 +611,49 @@ impl MatrixMockServer { MockEndpoint { mock, server: &self.server, endpoint: DeleteRoomKeysVersionEndpoint } } + /// Create a prebuilt mock for getting the room members in a room. + /// + /// # Examples + /// + /// ``` # + /// tokio_test::block_on(async { + /// use matrix_sdk_base::RoomMemberships; + /// use ruma::events::room::member::MembershipState; + /// use ruma::events::room::member::RoomMemberEventContent; + /// use ruma::user_id; + /// use matrix_sdk_test::event_factory::EventFactory; + /// use matrix_sdk::{ + /// ruma::{event_id, room_id}, + /// test_utils::mocks::MatrixMockServer, + /// }; + /// let mock_server = MatrixMockServer::new().await; + /// let client = mock_server.client_builder().build().await; + /// let event_id = event_id!("$id"); + /// let room_id = room_id!("!room_id:localhost"); + /// + /// let f = EventFactory::new().room(room_id); + /// let alice_user_id = user_id!("@alice:b.c"); + /// let alice_knock_event = f + /// .event(RoomMemberEventContent::new(MembershipState::Knock)) + /// .event_id(event_id) + /// .sender(alice_user_id) + /// .state_key(alice_user_id) + /// .into_raw_timeline() + /// .cast(); + /// + /// mock_server.mock_get_members().ok(vec![alice_knock_event]).mock_once().mount().await; + /// let room = mock_server.sync_joined_room(&client, room_id).await; + /// + /// let members = room.members(RoomMemberships::all()).await.unwrap(); + /// assert_eq!(members.len(), 1); + /// # }); + /// ``` + pub fn mock_get_members(&self) -> MockEndpoint<'_, GetRoomMembersEndpoint> { + let mock = + Mock::given(method("GET")).and(path_regex(r"^/_matrix/client/v3/rooms/.*/members$")); + MockEndpoint { mock, server: &self.server, endpoint: GetRoomMembersEndpoint } + } + /// Creates a prebuilt mock for inviting a user to a room by its id. /// /// # Examples @@ -1112,7 +1158,7 @@ impl<'a> MockEndpoint<'a, RoomSendEndpoint> { /// /// let response = room.client().send(r, None).await.unwrap(); /// // The delayed `m.room.message` event type should be mocked by the server. - /// assert_eq!("$some_id", response.delay_id); + /// assert_eq!("$some_id", response.delay_id); /// # anyhow::Ok(()) }); /// ``` pub fn with_delay(self, delay: Duration) -> Self { @@ -1851,6 +1897,18 @@ impl<'a> MockEndpoint<'a, DeleteRoomKeysVersionEndpoint> { } } +/// A prebuilt mock for `GET /members` request. +pub struct GetRoomMembersEndpoint; + +impl<'a> MockEndpoint<'a, GetRoomMembersEndpoint> { + /// Returns a successful get members request with a list of members. + pub fn ok(self, members: Vec>) -> MatrixMock<'a> { + let mock = self.mock.respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "chunk": members, + }))); + MatrixMock { server: self.server, mock } + } +} /// A prebuilt mock for `POST /invite` request. pub struct InviteUserByIdEndpoint; diff --git a/crates/matrix-sdk/tests/integration/room/joined.rs b/crates/matrix-sdk/tests/integration/room/joined.rs index fa0b3f66af2..a6b5f38949e 100644 --- a/crates/matrix-sdk/tests/integration/room/joined.rs +++ b/crates/matrix-sdk/tests/integration/room/joined.rs @@ -3,8 +3,9 @@ use std::{ time::Duration, }; -use futures_util::future::join_all; +use futures_util::{future::join_all, pin_mut}; use matrix_sdk::{ + assert_next_with_timeout, config::SyncSettings, room::{edit::EditedContent, Receipts, ReportedContentScore, RoomMemberRole}, test_utils::mocks::MatrixMockServer, @@ -24,7 +25,10 @@ use ruma::{ events::{ direct::DirectUserIdentifier, receipt::ReceiptThread, - room::message::{RoomMessageEventContent, RoomMessageEventContentWithoutRelation}, + room::{ + member::{MembershipState, RoomMemberEventContent}, + message::{RoomMessageEventContent, RoomMessageEventContentWithoutRelation}, + }, TimelineEventType, }, int, mxc_uri, owned_event_id, room_id, thirdparty, user_id, OwnedUserId, TransactionId, @@ -833,3 +837,111 @@ async fn test_enable_encryption_doesnt_stay_unencrypted() { assert!(room.is_encrypted().await.unwrap()); } + +#[async_test] +async fn test_subscribe_to_requests_to_join() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + server.mock_room_state_encryption().plain().mount().await; + + let room_id = room_id!("!a:b.c"); + let f = EventFactory::new().room(room_id); + + let alice_user_id = user_id!("@alice:b.c"); + let alice_knock_event_id = event_id!("$alice-knock:b.c"); + let alice_knock_event = f + .event(RoomMemberEventContent::new(MembershipState::Knock)) + .event_id(alice_knock_event_id) + .sender(alice_user_id) + .state_key(alice_user_id) + .into_raw_timeline() + .cast(); + + server.mock_get_members().ok(vec![alice_knock_event]).mock_once().mount().await; + + let room = server.sync_joined_room(&client, room_id).await; + let stream = room.subscribe_to_join_requests().await.unwrap(); + + pin_mut!(stream); + + // We receive an initial request to join from Alice + let initial = assert_next_with_timeout!(stream, 100); + assert!(!initial.is_empty()); + + let alices_request_to_join = &initial[0]; + assert_eq!(alices_request_to_join.event_id, alice_knock_event_id); + assert!(!alices_request_to_join.is_seen); + + // We then mark the request to join as seen + room.mark_join_requests_as_seen(&[alice_knock_event_id.to_owned()]).await.unwrap(); + + // Now it's received again as seen + let seen = assert_next_with_timeout!(stream, 100); + assert!(!seen.is_empty()); + let alices_seen_request_to_join = &seen[0]; + assert_eq!(alices_seen_request_to_join.event_id, alice_knock_event_id); + assert!(alices_seen_request_to_join.is_seen); + + // If we then receive a new member event for Alice that's not 'knock' + let alice_join_event_id = event_id!("$alice-join:b.c"); + let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f + .event(RoomMemberEventContent::new(MembershipState::Invite)) + .event_id(alice_join_event_id) + .sender(alice_user_id) + .state_key(alice_user_id) + .into_raw_timeline() + .cast()]); + server.sync_room(&client, joined_room_builder).await; + + // The requests to join are now empty + let updated_requests = assert_next_with_timeout!(stream, 100); + assert!(updated_requests.is_empty()); +} + +#[async_test] +async fn test_subscribe_to_requests_to_join_reloads_members_on_limited_sync() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + server.mock_room_state_encryption().plain().mount().await; + + let room_id = room_id!("!a:b.c"); + let f = EventFactory::new().room(room_id); + + let alice_user_id = user_id!("@alice:b.c"); + let alice_knock_event_id = event_id!("$alice-knock:b.c"); + let alice_knock_event = f + .event(RoomMemberEventContent::new(MembershipState::Knock)) + .event_id(alice_knock_event_id) + .sender(alice_user_id) + .state_key(alice_user_id) + .into_raw_timeline() + .cast(); + + server + .mock_get_members() + .ok(vec![alice_knock_event]) + // The endpoint will be called twice: + // 1. For the initial loading of room members. + // 2. When a gappy (limited) sync is received. + .expect(2) + .mount() + .await; + + let room = server.sync_joined_room(&client, room_id).await; + let stream = room.subscribe_to_join_requests().await.unwrap(); + + pin_mut!(stream); + + // We receive an initial request to join from Alice + let initial = assert_next_with_timeout!(stream, 500); + assert!(!initial.is_empty()); + + // This limited sync should trigger a new emission of join requests, with a + // reloading of the room members + server.sync_room(&client, JoinedRoomBuilder::new(room_id).set_timeline_limited()).await; + + // We should receive a new list of join requests + assert_next_with_timeout!(stream, 500); +}