Skip to content

Commit

Permalink
Speed up /keys/changes (#17548)
Browse files Browse the repository at this point in the history
Follow on from #17537.

This is just adding a batched lookup function (you might want to hide
whitespace in the diff).
  • Loading branch information
erikjohnston authored Aug 16, 2024
1 parent 9ce489b commit f162c92
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 20 deletions.
1 change: 1 addition & 0 deletions changelog.d/17548.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix performance of device lists in `/key/changes` and sliding sync.
32 changes: 14 additions & 18 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,31 +267,27 @@ async def get_user_ids_changed(
newly_left_rooms.add(change.room_id)

# We now work out if any other users have since joined or left the rooms
# the user is currently in. First we filter out rooms that we know
# haven't changed recently.
rooms_changed = self.store.get_rooms_that_changed(
joined_room_ids, from_token.room_key
)
# the user is currently in.

# List of membership changes per room
room_to_deltas: Dict[str, List[StateDelta]] = {}
# The set of event IDs of membership events (so we can fetch their
# associated membership).
memberships_to_fetch: Set[str] = set()
for room_id in rooms_changed:
# TODO: Only pull out membership events?
state_changes = await self.store.get_current_state_deltas_for_room(
room_id, from_token=from_token.room_key, to_token=now_token.room_key
)
for delta in state_changes:
if delta.event_type != EventTypes.Member:
continue

room_to_deltas.setdefault(room_id, []).append(delta)
if delta.event_id:
memberships_to_fetch.add(delta.event_id)
if delta.prev_event_id:
memberships_to_fetch.add(delta.prev_event_id)
# TODO: Only pull out membership events?
state_changes = await self.store.get_current_state_deltas_for_rooms(
joined_room_ids, from_token=from_token.room_key, to_token=now_token.room_key
)
for delta in state_changes:
if delta.event_type != EventTypes.Member:
continue

room_to_deltas.setdefault(delta.room_id, []).append(delta)
if delta.event_id:
memberships_to_fetch.add(delta.event_id)
if delta.prev_event_id:
memberships_to_fetch.add(delta.prev_event_id)

# Fetch all the memberships for the membership events
event_id_to_memberships = await self.store.get_membership_from_event_ids(
Expand Down
64 changes: 62 additions & 2 deletions synapse/storage/databases/main/state_deltas.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@

from synapse.logging.opentracing import trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import LoggingTransaction
from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
from synapse.storage.databases.main.stream import _filter_results_by_stream
from synapse.types import RoomStreamToken
from synapse.types import RoomStreamToken, StrCollection
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -200,3 +201,62 @@ def get_current_state_deltas_for_room_txn(
return await self.db_pool.runInteraction(
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
)

@trace
async def get_current_state_deltas_for_rooms(
self,
room_ids: StrCollection,
from_token: RoomStreamToken,
to_token: RoomStreamToken,
) -> List[StateDelta]:
"""Get the state deltas between two tokens for the set of rooms."""

room_ids = self._curr_state_delta_stream_cache.get_entities_changed(
room_ids, from_token.stream
)
if not room_ids:
return []

def get_current_state_deltas_for_rooms_txn(
txn: LoggingTransaction,
room_ids: StrCollection,
) -> List[StateDelta]:
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", room_ids
)

sql = f"""
SELECT instance_name, stream_id, room_id, type, state_key, event_id, prev_event_id
FROM current_state_delta_stream
WHERE {clause} AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
"""
args.append(from_token.stream)
args.append(to_token.get_max_stream_pos())

txn.execute(sql, args)

return [
StateDelta(
stream_id=row[1],
room_id=row[2],
event_type=row[3],
state_key=row[4],
event_id=row[5],
prev_event_id=row[6],
)
for row in txn
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
]

results = []
for batch in batch_iter(room_ids, 1000):
deltas = await self.db_pool.runInteraction(
"get_current_state_deltas_for_rooms",
get_current_state_deltas_for_rooms_txn,
batch,
)

results.extend(deltas)

return results

0 comments on commit f162c92

Please sign in to comment.