diff --git a/changelog.d/17629.misc b/changelog.d/17629.misc new file mode 100644 index 00000000000..1eb46b2c682 --- /dev/null +++ b/changelog.d/17629.misc @@ -0,0 +1 @@ +Sliding Sync: Split up `get_room_membership_for_user_at_to_token`. diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 0e6cb285244..f41180fedab 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -25,6 +25,7 @@ Mapping, Optional, Set, + Tuple, Union, ) @@ -46,6 +47,7 @@ Sentinel as StateSentinel, ) from synapse.storage.databases.main.stream import CurrentStateDeltaMembership +from synapse.storage.roommember import RoomsForUser from synapse.types import ( MutableStateMap, PersistedEventPosition, @@ -425,68 +427,30 @@ async def compute_interested_rooms( ) @trace - async def get_room_membership_for_user_at_to_token( + async def _get_rewind_changes_to_current_membership_to_token( self, user: UserID, + rooms_for_user: Mapping[str, RoomsForUser], to_token: StreamToken, - from_token: Optional[StreamToken], - ) -> Dict[str, _RoomMembershipForUser]: + ) -> Mapping[str, Optional[RoomsForUser]]: """ - Fetch room IDs that the user has had membership in (the full room list including - long-lost left rooms that will be filtered, sorted, and sliced). - - We're looking for rooms where the user has had any sort of membership in the - token range (> `from_token` and <= `to_token`) - - In order for bans/kicks to not show up, you need to `/forget` those rooms. This - doesn't modify the event itself though and only adds the `forgotten` flag to the - `room_memberships` table in Synapse. There isn't a way to tell when a room was - forgotten at the moment so we can't factor it into the token range. + Takes the current set of rooms for a user (retrieved after the given + token), and returns the changes needed to "rewind" it to match the set of + memberships *at that token* (<= `to_token`). Args: user: User to fetch rooms for - to_token: The token to fetch rooms up to. - from_token: The point in the stream to sync from. + rooms_for_user: The set of rooms for the user after the `to_token`. + to_token: The token to rewind to Returns: - A dictionary of room IDs that the user has had membership in along with - membership information in that room at the time of `to_token`. + The changes to apply to rewind the the current memberships. """ - user_id = user.to_string() - - # First grab a current snapshot rooms for the user - # (also handles forgotten rooms) - room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is( - user_id=user_id, - # We want to fetch any kind of membership (joined and left rooms) in order - # to get the `event_pos` of the latest room membership event for the - # user. - membership_list=Membership.LIST, - excluded_rooms=self.rooms_to_exclude_globally, - ) - # If the user has never joined any rooms before, we can just return an empty list - if not room_for_user_list: + if not rooms_for_user: return {} - # Our working list of rooms that can show up in the sync response - sync_room_id_set = { - # Note: The `room_for_user` we're assigning here will need to be fixed up - # (below) because they are potentially from the current snapshot time - # instead from the time of the `to_token`. - room_for_user.room_id: _RoomMembershipForUser( - room_id=room_for_user.room_id, - event_id=room_for_user.event_id, - event_pos=room_for_user.event_pos, - membership=room_for_user.membership, - sender=room_for_user.sender, - # We will update these fields below to be accurate - newly_joined=False, - newly_left=False, - is_dm=False, - ) - for room_for_user in room_for_user_list - } + user_id = user.to_string() # Get the `RoomStreamToken` that represents the spot we queried up to when we got # our membership snapshot from `get_rooms_for_local_user_where_membership_is()`. @@ -494,7 +458,7 @@ async def get_room_membership_for_user_at_to_token( # First, we need to get the max stream_ordering of each event persister instance # that we queried events from. instance_to_max_stream_ordering_map: Dict[str, int] = {} - for room_for_user in room_for_user_list: + for room_for_user in rooms_for_user.values(): instance_name = room_for_user.event_pos.instance_name stream_ordering = room_for_user.event_pos.stream @@ -521,18 +485,14 @@ async def get_room_membership_for_user_at_to_token( ), ) - # Since we fetched the users room list at some point in time after the from/to + # Since we fetched the users room list at some point in time after the # tokens, we need to revert/rewind some membership changes to match the point in # time of the `to_token`. In particular, we need to make these fixups: # - # - 1a) Remove rooms that the user joined after the `to_token` - # - 1b) Add back rooms that the user left after the `to_token` - # - 1c) Update room membership events to the point in time of the `to_token` - # - 2) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`) - # - 3) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`) - # - 4) Figure out which rooms are DM's - - # 1) Fetch membership changes that fall in the range from `to_token` up to + # - a) Remove rooms that the user joined after the `to_token` + # - b) Update room membership events to the point in time of the `to_token` + + # Fetch membership changes that fall in the range from `to_token` up to # `membership_snapshot_token` # # If our `to_token` is already the same or ahead of the latest room membership @@ -549,7 +509,15 @@ async def get_room_membership_for_user_at_to_token( ) ) - # 1) Assemble a list of the first membership event after the `to_token` so we can + if not current_state_delta_membership_changes_after_to_token: + # There have been no membership changes, so we can early return. + return {} + + # Otherwise we're about to make changes to `rooms_for_user`, so we turn + # it into a mutable dict. + changes: Dict[str, Optional[RoomsForUser]] = {} + + # Assemble a list of the first membership event after the `to_token` so we can # step backward to the previous membership that would apply to the from/to # range. first_membership_change_by_room_id_after_to_token: Dict[ @@ -561,8 +529,6 @@ async def get_room_membership_for_user_at_to_token( membership_change.room_id, membership_change ) - # 1) Fixup - # # Since we fetched a snapshot of the users room list at some point in time after # the from/to tokens, we need to revert/rewind some membership changes to match # the point in time of the `to_token`. @@ -572,7 +538,7 @@ async def get_room_membership_for_user_at_to_token( ) in first_membership_change_by_room_id_after_to_token.items(): # 1a) Remove rooms that the user joined after the `to_token` if first_membership_change_after_to_token.prev_event_id is None: - sync_room_id_set.pop(room_id, None) + changes[room_id] = None # 1b) 1c) From the first membership event after the `to_token`, step backward to the # previous membership that would apply to the from/to range. else: @@ -583,25 +549,161 @@ async def get_room_membership_for_user_at_to_token( first_membership_change_after_to_token.prev_event_pos is not None and first_membership_change_after_to_token.prev_membership is not None + and first_membership_change_after_to_token.prev_sender is not None ): - sync_room_id_set[room_id] = _RoomMembershipForUser( + # We need to know the room version ID, which we normally we + # can get from the current membership, but if we don't have + # that then we need to query the DB. + current_membership = rooms_for_user.get(room_id) + if current_membership is not None: + room_version_id = current_membership.room_version_id + else: + room_version_id = await self.store.get_room_version_id(room_id) + + changes[room_id] = RoomsForUser( room_id=room_id, event_id=first_membership_change_after_to_token.prev_event_id, event_pos=first_membership_change_after_to_token.prev_event_pos, membership=first_membership_change_after_to_token.prev_membership, sender=first_membership_change_after_to_token.prev_sender, - # We will update these fields below to be accurate - newly_joined=False, - newly_left=False, - is_dm=False, + room_version_id=room_version_id, ) else: # If we can't find the previous membership event, we shouldn't # include the room in the sync response since we can't determine the # exact membership state and shouldn't rely on the current snapshot. - sync_room_id_set.pop(room_id, None) + changes[room_id] = None + + return changes + + @trace + async def get_room_membership_for_user_at_to_token( + self, + user: UserID, + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> Dict[str, _RoomMembershipForUser]: + """ + Fetch room IDs that the user has had membership in (the full room list including + long-lost left rooms that will be filtered, sorted, and sliced). - # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token` + We're looking for rooms where the user has had any sort of membership in the + token range (> `from_token` and <= `to_token`) + + In order for bans/kicks to not show up, you need to `/forget` those rooms. This + doesn't modify the event itself though and only adds the `forgotten` flag to the + `room_memberships` table in Synapse. There isn't a way to tell when a room was + forgotten at the moment so we can't factor it into the token range. + + Args: + user: User to fetch rooms for + to_token: The token to fetch rooms up to. + from_token: The point in the stream to sync from. + + Returns: + A dictionary of room IDs that the user has had membership in along with + membership information in that room at the time of `to_token`. + """ + user_id = user.to_string() + + # First grab a current snapshot rooms for the user + # (also handles forgotten rooms) + room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is( + user_id=user_id, + # We want to fetch any kind of membership (joined and left rooms) in order + # to get the `event_pos` of the latest room membership event for the + # user. + membership_list=Membership.LIST, + excluded_rooms=self.rooms_to_exclude_globally, + ) + + # If the user has never joined any rooms before, we can just return an empty list + if not room_for_user_list: + return {} + + # Since we fetched the users room list at some point in time after the + # tokens, we need to revert/rewind some membership changes to match the point in + # time of the `to_token`. + rooms_for_user = {room.room_id: room for room in room_for_user_list} + changes = await self._get_rewind_changes_to_current_membership_to_token( + user, rooms_for_user, to_token + ) + for room_id, change_room_for_user in changes.items(): + if change_room_for_user is None: + rooms_for_user.pop(room_id, None) + else: + rooms_for_user[room_id] = change_room_for_user + + newly_joined_room_ids, newly_left_room_ids = ( + await self._get_newly_joined_and_left_rooms( + user_id, to_token=to_token, from_token=from_token + ) + ) + + dm_room_ids = await self._get_dm_rooms_for_user(user_id) + + # Our working list of rooms that can show up in the sync response + sync_room_id_set = { + room_for_user.room_id: _RoomMembershipForUser( + room_id=room_for_user.room_id, + event_id=room_for_user.event_id, + event_pos=room_for_user.event_pos, + membership=room_for_user.membership, + sender=room_for_user.sender, + newly_joined=room_id in newly_joined_room_ids, + newly_left=room_id in newly_left_room_ids, + is_dm=room_id in dm_room_ids, + ) + for room_id, room_for_user in rooms_for_user.items() + } + + # Ensure we have entries for rooms that the user has been "state reset" + # out of. These are rooms appear in the `newly_left_rooms` map but + # aren't in the `rooms_for_user` map. + for room_id, left_event_pos in newly_left_room_ids.items(): + if room_id in sync_room_id_set: + continue + + sync_room_id_set[room_id] = _RoomMembershipForUser( + room_id=room_id, + event_id=None, + event_pos=left_event_pos, + membership=Membership.LEAVE, + sender=None, + newly_joined=False, + newly_left=True, + is_dm=room_id in dm_room_ids, + ) + + return sync_room_id_set + + @trace + async def _get_newly_joined_and_left_rooms( + self, + user_id: str, + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> Tuple[StrCollection, Mapping[str, PersistedEventPosition]]: + """Fetch the sets of rooms that the user newly joined or left in the + given token range. + + Note: there may be rooms in the newly left rooms where the user was + "state reset" out of the room, and so that room would not be part of the + "current memberships" of the user. + + Returns: + A 2-tuple of newly joined room IDs and a map of newly left room + IDs to the event position the leave happened at. + """ + newly_joined_room_ids: Set[str] = set() + newly_left_room_map: Dict[str, PersistedEventPosition] = {} + + # We need to figure out the + # + # - 1) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`) + # - 2) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`) + + # 1) Fetch membership changes that fall in the range from `from_token` up to `to_token` current_state_delta_membership_changes_in_from_to_range = [] if from_token: current_state_delta_membership_changes_in_from_to_range = ( @@ -613,7 +715,7 @@ async def get_room_membership_for_user_at_to_token( ) ) - # 2) Assemble a list of the last membership events in some given ranges. Someone + # 1) Assemble a list of the last membership events in some given ranges. Someone # could have left and joined multiple times during the given range but we only # care about end-result so we grab the last one. last_membership_change_by_room_id_in_from_to_range: Dict[ @@ -646,9 +748,9 @@ async def get_room_membership_for_user_at_to_token( if membership_change.membership != Membership.JOIN: has_non_join_event_by_room_id_in_from_to_range[room_id] = True - # 2) Fixup + # 1) Fixup # - # 3) We also want to assemble a list of possibly newly joined rooms. Someone + # 2) We also want to assemble a list of possibly newly joined rooms. Someone # could have left and joined multiple times during the given range but we only # care about whether they are joined at the end of the token range so we are # working with the last membership even in the token range. @@ -658,46 +760,18 @@ async def get_room_membership_for_user_at_to_token( ) in last_membership_change_by_room_id_in_from_to_range.values(): room_id = last_membership_change_in_from_to_range.room_id - # 3) + # 2) if last_membership_change_in_from_to_range.membership == Membership.JOIN: possibly_newly_joined_room_ids.add(room_id) - # 2) Figure out newly_left rooms (> `from_token` and <= `to_token`). + # 1) Figure out newly_left rooms (> `from_token` and <= `to_token`). if last_membership_change_in_from_to_range.membership == Membership.LEAVE: - # 2) Mark this room as `newly_left` - - # If we're seeing a membership change here, we should expect to already - # have it in our snapshot but if a state reset happens, it wouldn't have - # shown up in our snapshot but appear as a change here. - existing_sync_entry = sync_room_id_set.get(room_id) - if existing_sync_entry is not None: - # Normal expected case - sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace( - newly_left=True - ) - else: - # State reset! - logger.warn( - "State reset detected for room_id %s with %s who is no longer in the room", - room_id, - user_id, - ) - # Even though a state reset happened which removed the person from - # the room, we still add it the list so the user knows they left the - # room. Downstream code can check for a state reset by looking for - # `event_id=None and membership is not None`. - sync_room_id_set[room_id] = _RoomMembershipForUser( - room_id=room_id, - event_id=last_membership_change_in_from_to_range.event_id, - event_pos=last_membership_change_in_from_to_range.event_pos, - membership=last_membership_change_in_from_to_range.membership, - sender=last_membership_change_in_from_to_range.sender, - newly_joined=False, - newly_left=True, - is_dm=False, - ) + # 1) Mark this room as `newly_left` + newly_left_room_map[room_id] = ( + last_membership_change_in_from_to_range.event_pos + ) - # 3) Figure out `newly_joined` + # 2) Figure out `newly_joined` for room_id in possibly_newly_joined_room_ids: has_non_join_in_from_to_range = ( has_non_join_event_by_room_id_in_from_to_range.get(room_id, False) @@ -706,9 +780,7 @@ async def get_room_membership_for_user_at_to_token( # also some non-join in the range, we know they `newly_joined`. if has_non_join_in_from_to_range: # We found a `newly_joined` room (we left and joined within the token range) - sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace( - newly_joined=True - ) + newly_joined_room_ids.add(room_id) else: prev_event_id = first_membership_change_by_room_id_in_from_to_range[ room_id @@ -720,20 +792,23 @@ async def get_room_membership_for_user_at_to_token( if prev_event_id is None: # We found a `newly_joined` room (we are joining the room for the # first time within the token range) - sync_room_id_set[room_id] = sync_room_id_set[ - room_id - ].copy_and_replace(newly_joined=True) + newly_joined_room_ids.add(room_id) # Last resort, we need to step back to the previous membership event # just before the token range to see if we're joined then or not. elif prev_membership != Membership.JOIN: # We found a `newly_joined` room (we left before the token range # and joined within the token range) - sync_room_id_set[room_id] = sync_room_id_set[ - room_id - ].copy_and_replace(newly_joined=True) + newly_joined_room_ids.add(room_id) + + return newly_joined_room_ids, newly_left_room_map + + @trace + async def _get_dm_rooms_for_user( + self, + user_id: str, + ) -> StrCollection: + """Get the set of DM rooms for the user.""" - # 4) Figure out which rooms the user considers to be direct-message (DM) rooms - # # We're using global account data (`m.direct`) instead of checking for # `is_direct` on membership events because that property only appears for # the invitee membership event (doesn't show up for the inviter). @@ -755,13 +830,7 @@ async def get_room_membership_for_user_at_to_token( if isinstance(room_id, str): dm_room_id_set.add(room_id) - # 4) Fixup - for room_id in sync_room_id_set: - sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace( - is_dm=room_id in dm_room_id_set - ) - - return sync_room_id_set + return dm_room_id_set @trace async def filter_rooms_relevant_for_sync(