Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send device list update EDUs on room join #16875

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16875.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed an issue where device list updates were not sent eagerly to remote servers on room join.
154 changes: 152 additions & 2 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
import logging
from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping, Optional, Set, Tuple

import synapse.metrics
from synapse.api import errors
from synapse.api.constants import EduTypes, EventTypes
from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.api.errors import (
Codes,
FederationDeniedError,
Expand All @@ -38,6 +39,7 @@
wrap_as_background_process,
)
from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.types import (
JsonDict,
JsonMapping,
Expand All @@ -54,7 +56,7 @@
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.cancellation import cancellable
from synapse.util.metrics import measure_func
from synapse.util.metrics import Measure, measure_func
from synapse.util.retryutils import (
NotRetryingDestination,
filter_destinations_by_retry_limiter,
Expand Down Expand Up @@ -428,6 +430,7 @@ def __init__(self, hs: "HomeServer"):
self._account_data_handler = hs.get_account_data_handler()
self._storage_controllers = hs.get_storage_controllers()
self.db_pool = hs.get_datastores().main.db_pool
self._is_processing = False

self.device_list_updater = DeviceListUpdater(hs, self)

Expand Down Expand Up @@ -461,6 +464,153 @@ def __init__(self, hs: "HomeServer"):
self._delete_stale_devices,
)

# Listen for state delta updates. We do this so we can send device list updates on room join
# to remote servers. We do not remember where we got up to before, as we only need to send
# these updates on a best-effort basis, as they quickly heal due to /keys/query requests.
# We want to send device list updates eagerly to improve our robustness on unreliable
# networks.
# See https://github.com/element-hq/synapse/issues/11374#issuecomment-1908396300
self._event_pos = self.store.get_room_max_stream_ordering()
self._event_processing = False
self.notifier.add_replication_callback(self.notify_new_event)

def notify_new_event(self) -> None:
"""Called when there may be more deltas to process"""
if self._event_processing:
return

self._event_processing = True

async def process() -> None:
try:
await self._unsafe_process()
finally:
self._event_processing = False

run_as_background_process("device.notify_new_event", process)

async def _unsafe_process(self) -> None:
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "device_list_delta"):
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
return

logger.debug(
"Processing device list stats %s->%s",
self._event_pos,
room_max_stream_ordering,
)
(
max_pos,
deltas,
) = await self._storage_controllers.state.get_current_state_deltas(
self._event_pos, room_max_stream_ordering
)

# We may get multiple deltas for different rooms, but we want to
# handle them on a room by room basis, so we batch them up by
# room.
deltas_by_room: Dict[str, List[StateDelta]] = {}
for delta in deltas:
deltas_by_room.setdefault(delta.room_id, []).append(delta)

for room_id, deltas_for_room in deltas_by_room.items():
newly_joined_local_users = await self._get_newly_joined_local_users(
room_id, deltas_for_room
)
if not newly_joined_local_users:
continue
# if a local user newly joins a room, we want to broadcast their device lists to
# federated servers in that room, if we haven't already.
hosts = await self.store.get_current_hosts_in_room(room_id)
# filter out ourselves
other_hosts = [
h for h in hosts if not self.hs.is_mine_server_name(h)
]
if len(other_hosts) == 0:
continue
# broadcast device lists for these users in the room
num_pokes = 0
for user_id in newly_joined_local_users:
# the join is for the user, we need to send device list updates for all
# their devices.
device_ids = await self.store.get_devices_by_user(user_id)
for device_id in device_ids.keys():
num_pokes += 1
await self.store.add_device_list_outbound_pokes(
user_id=user_id,
device_id=device_id,
room_id=room_id,
hosts=other_hosts,
context=None,
)
logger.info(
"Found %d hosts to send device list updates to for a new room join, "
+ "added %s device_list_outbound_pokes",
len(other_hosts),
num_pokes,
)

# Notify things that device lists need to be sent out.
self.notifier.notify_replication()
await self.federation_sender.send_device_messages(
other_hosts, immediate=False
)

self._event_pos = max_pos

# Expose current event processing position to prometheus
synapse.metrics.event_processing_positions.labels("device").set(max_pos)

async def _get_newly_joined_local_users(
self, room_id: str, deltas: List[StateDelta]
) -> Optional[Set[str]]:
"""Process current state deltas for the room to find new joins that need
to be handled.
"""
newly_joined_local_users = set()

for delta in deltas:
assert room_id == delta.room_id
logger.debug(
"device.handling: %r %r, %s",
delta.event_type,
delta.state_key,
delta.event_id,
)
# Drop any event that isn't a membership join
if delta.event_type != EventTypes.Member:
continue
if delta.event_id is None:
# state has been deleted, so this is not a join. We only care about joins.
continue
# Drop any event that is for a non-local user
membership_change_user = UserID.from_string(delta.state_key)
if not self.hs.is_mine(membership_change_user):
continue
event = await self.store.get_event(delta.event_id, allow_none=True)
if not event or event.content.get("membership") != Membership.JOIN:
# We only care about joins
continue
if delta.prev_event_id:
prev_event = await self.store.get_event(
delta.prev_event_id, allow_none=True
)
if (
prev_event
and prev_event.content.get("membership") == Membership.JOIN
):
# Ignore changes to join events.
continue
newly_joined_local_users.add(delta.state_key)

if not newly_joined_local_users:
# If nobody has joined then there's nothing to do.
return None
return newly_joined_local_users

def _check_device_name_length(self, name: Optional[str]) -> None:
"""
Checks whether a device name is longer than the maximum allowed length.
Expand Down
Loading