From 1cb84aaab5231532105f5a12f01387c9cca27a6a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Sep 2024 22:36:16 +0100 Subject: [PATCH] Sliding Sync: Increase concurrency of sliding sync a bit (#17696) For initial requests a typical page size is 20 rooms, so we may as well do the batching as 20. This should speed up bigger syncs a little bit. --- changelog.d/17696.misc | 1 + synapse/handlers/sliding_sync/__init__.py | 2 +- synapse/handlers/sliding_sync/extensions.py | 10 +++++++++- 3 files changed, 11 insertions(+), 2 deletions(-) create mode 100644 changelog.d/17696.misc diff --git a/changelog.d/17696.misc b/changelog.d/17696.misc new file mode 100644 index 0000000000..a2f1b1f399 --- /dev/null +++ b/changelog.d/17696.misc @@ -0,0 +1 @@ +Speed up sliding sync requests a bit where there are many room changes. diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 04493494a6..c3b5bbbf6f 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -267,7 +267,7 @@ async def handle_room(room_id: str) -> None: if relevant_rooms_to_send_map: with start_active_span("sliding_sync.generate_room_entries"): - await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10) + await concurrently_execute(handle_room, relevant_rooms_to_send_map, 20) extensions = await self.extensions.get_extensions_response( sync_config=sync_config, diff --git a/synapse/handlers/sliding_sync/extensions.py b/synapse/handlers/sliding_sync/extensions.py index 6f37cc3462..7c2f8a2569 100644 --- a/synapse/handlers/sliding_sync/extensions.py +++ b/synapse/handlers/sliding_sync/extensions.py @@ -38,6 +38,7 @@ SlidingSyncConfig, SlidingSyncResult, ) +from synapse.util.async_helpers import concurrently_execute if TYPE_CHECKING: from synapse.server import HomeServer @@ -534,7 +535,10 @@ async def get_receipts_extension_response( # For rooms we've previously sent down, but aren't up to date, we # need to use the from token from the room status. if previously_rooms: - for room_id, receipt_token in previously_rooms.items(): + # Fetch any missing rooms concurrently. + + async def handle_previously_room(room_id: str) -> None: + receipt_token = previously_rooms[room_id] # TODO: Limit the number of receipts we're about to send down # for the room, if its too many we should TODO previously_receipts = ( @@ -546,6 +550,10 @@ async def get_receipts_extension_response( ) fetched_receipts.extend(previously_receipts) + await concurrently_execute( + handle_previously_room, previously_rooms.keys(), 20 + ) + if initial_rooms: # We also always send down receipts for the current user. user_receipts = (