Skip to content

Commit

Permalink
Merge branch 'develop' into msc4098-scim
Browse files Browse the repository at this point in the history
  • Loading branch information
azmeuk authored Sep 13, 2024
2 parents a548e44 + 4ac7835 commit 76afb5d
Show file tree
Hide file tree
Showing 13 changed files with 2,307 additions and 1,799 deletions.
1 change: 1 addition & 0 deletions changelog.d/17662.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for the `tags` and `not_tags` filters for simplified sliding sync.
1 change: 1 addition & 0 deletions changelog.d/17696.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up sliding sync requests a bit where there are many room changes.
1 change: 1 addition & 0 deletions changelog.d/17703.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor sliding sync filter unit tests so the sliding sync API has better test coverage.
2 changes: 1 addition & 1 deletion synapse/handlers/sliding_sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ async def handle_room(room_id: str) -> None:

if relevant_rooms_to_send_map:
with start_active_span("sliding_sync.generate_room_entries"):
await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10)
await concurrently_execute(handle_room, relevant_rooms_to_send_map, 20)

extensions = await self.extensions.get_extensions_response(
sync_config=sync_config,
Expand Down
10 changes: 9 additions & 1 deletion synapse/handlers/sliding_sync/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
SlidingSyncConfig,
SlidingSyncResult,
)
from synapse.util.async_helpers import concurrently_execute

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -534,7 +535,10 @@ async def get_receipts_extension_response(
# For rooms we've previously sent down, but aren't up to date, we
# need to use the from token from the room status.
if previously_rooms:
for room_id, receipt_token in previously_rooms.items():
# Fetch any missing rooms concurrently.

async def handle_previously_room(room_id: str) -> None:
receipt_token = previously_rooms[room_id]
# TODO: Limit the number of receipts we're about to send down
# for the room, if its too many we should TODO
previously_receipts = (
Expand All @@ -546,6 +550,10 @@ async def get_receipts_extension_response(
)
fetched_receipts.extend(previously_receipts)

await concurrently_execute(
handle_previously_room, previously_rooms.keys(), 20
)

if initial_rooms:
# We also always send down receipts for the current user.
user_receipts = (
Expand Down
78 changes: 75 additions & 3 deletions synapse/handlers/sliding_sync/room_lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ async def _compute_interested_rooms_new_tables(
event_pos=change.event_pos,
room_version_id=change.room_version_id,
# We keep the current state of the room though
has_known_state=existing_room.has_known_state,
room_type=existing_room.room_type,
is_encrypted=existing_room.is_encrypted,
)
Expand All @@ -270,6 +271,7 @@ async def _compute_interested_rooms_new_tables(
event_id=change.event_id,
event_pos=change.event_pos,
room_version_id=change.room_version_id,
has_known_state=True,
room_type=room_type,
is_encrypted=is_encrypted,
)
Expand Down Expand Up @@ -305,6 +307,7 @@ async def _compute_interested_rooms_new_tables(
event_id=None,
event_pos=newly_left_room_map[room_id],
room_version_id=await self.store.get_room_version_id(room_id),
has_known_state=True,
room_type=room_type,
is_encrypted=is_encrypted,
)
Expand Down Expand Up @@ -1521,6 +1524,8 @@ async def filter_rooms(
A filtered dictionary of room IDs along with membership information in the
room at the time of `to_token`.
"""
user_id = user.to_string()

room_id_to_stripped_state_map: Dict[
str, Optional[StateMap[StrippedStateEvent]]
] = {}
Expand Down Expand Up @@ -1630,12 +1635,14 @@ async def filter_rooms(
and room_type not in filters.room_types
):
filtered_room_id_set.remove(room_id)
continue

if (
filters.not_room_types is not None
and room_type in filters.not_room_types
):
filtered_room_id_set.remove(room_id)
continue

if filters.room_name_like is not None:
with start_active_span("filters.room_name_like"):
Expand All @@ -1652,9 +1659,36 @@ async def filter_rooms(
# )
raise NotImplementedError()

# Filter by room tags according to the users account data
if filters.tags is not None or filters.not_tags is not None:
with start_active_span("filters.tags"):
raise NotImplementedError()
# Fetch the user tags for their rooms
room_tags = await self.store.get_tags_for_user(user_id)
room_id_to_tag_name_set: Dict[str, Set[str]] = {
room_id: set(tags.keys()) for room_id, tags in room_tags.items()
}

if filters.tags is not None:
tags_set = set(filters.tags)
filtered_room_id_set = {
room_id
for room_id in filtered_room_id_set
# Remove rooms that don't have one of the tags in the filter
if room_id_to_tag_name_set.get(room_id, set()).intersection(
tags_set
)
}

if filters.not_tags is not None:
not_tags_set = set(filters.not_tags)
filtered_room_id_set = {
room_id
for room_id in filtered_room_id_set
# Remove rooms if they have any of the tags in the filter
if not room_id_to_tag_name_set.get(room_id, set()).intersection(
not_tags_set
)
}

# Assemble a new sync room map but only with the `filtered_room_id_set`
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
Expand All @@ -1678,6 +1712,7 @@ async def filter_rooms_using_tables(
filters: Filters to apply
to_token: We filter based on the state of the room at this token
dm_room_ids: Set of room IDs which are DMs
room_tags: Mapping of room ID to tags
Returns:
A filtered dictionary of room IDs along with membership information in the
Expand Down Expand Up @@ -1705,7 +1740,10 @@ async def filter_rooms_using_tables(
filtered_room_id_set = {
room_id
for room_id in filtered_room_id_set
if sync_room_map[room_id].is_encrypted == filters.is_encrypted
# Remove rooms if we can't figure out what the encryption status is
if sync_room_map[room_id].has_known_state
# Or remove if it doesn't match the filter
and sync_room_map[room_id].is_encrypted == filters.is_encrypted
}

# Filter for rooms that the user has been invited to
Expand Down Expand Up @@ -1734,19 +1772,26 @@ async def filter_rooms_using_tables(
# Make a copy so we don't run into an error: `Set changed size during
# iteration`, when we filter out and remove items
for room_id in filtered_room_id_set.copy():
# Remove rooms if we can't figure out what room type it is
if not sync_room_map[room_id].has_known_state:
filtered_room_id_set.remove(room_id)
continue

room_type = sync_room_map[room_id].room_type

if (
filters.room_types is not None
and room_type not in filters.room_types
):
filtered_room_id_set.remove(room_id)
continue

if (
filters.not_room_types is not None
and room_type in filters.not_room_types
):
filtered_room_id_set.remove(room_id)
continue

if filters.room_name_like is not None:
with start_active_span("filters.room_name_like"):
Expand All @@ -1763,9 +1808,36 @@ async def filter_rooms_using_tables(
# )
raise NotImplementedError()

# Filter by room tags according to the users account data
if filters.tags is not None or filters.not_tags is not None:
with start_active_span("filters.tags"):
raise NotImplementedError()
# Fetch the user tags for their rooms
room_tags = await self.store.get_tags_for_user(user_id)
room_id_to_tag_name_set: Dict[str, Set[str]] = {
room_id: set(tags.keys()) for room_id, tags in room_tags.items()
}

if filters.tags is not None:
tags_set = set(filters.tags)
filtered_room_id_set = {
room_id
for room_id in filtered_room_id_set
# Remove rooms that don't have one of the tags in the filter
if room_id_to_tag_name_set.get(room_id, set()).intersection(
tags_set
)
}

if filters.not_tags is not None:
not_tags_set = set(filters.not_tags)
filtered_room_id_set = {
room_id
for room_id in filtered_room_id_set
# Remove rooms if they have any of the tags in the filter
if not room_id_to_tag_name_set.get(room_id, set()).intersection(
not_tags_set
)
}

# Assemble a new sync room map but only with the `filtered_room_id_set`
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ async def encode_rooms(
serialized_rooms[room_id]["heroes"] = serialized_heroes

# We should only include the `initial` key if it's `True` to save bandwidth.
# The absense of this flag means `False`.
# The absence of this flag means `False`.
if room_result.initial:
serialized_rooms[room_id]["initial"] = room_result.initial

Expand Down
6 changes: 4 additions & 2 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,7 @@ def get_sliding_sync_rooms_for_user_txn(
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
r.room_version,
m.event_instance_name, m.event_stream_ordering,
m.has_known_state,
COALESCE(j.room_type, m.room_type),
COALESCE(j.is_encrypted, m.is_encrypted)
FROM sliding_sync_membership_snapshots AS m
Expand All @@ -1437,8 +1438,9 @@ def get_sliding_sync_rooms_for_user_txn(
event_id=row[3],
room_version_id=row[4],
event_pos=PersistedEventPosition(row[5], row[6]),
room_type=row[7],
is_encrypted=bool(row[8]),
has_known_state=bool(row[7]),
room_type=row[8],
is_encrypted=bool(row[9]),
)
for row in txn
}
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class RoomsForUserSlidingSync:
event_pos: PersistedEventPosition
room_version_id: str

has_known_state: bool
room_type: Optional[str]
is_encrypted: bool

Expand Down
Loading

0 comments on commit 76afb5d

Please sign in to comment.