Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
abf658c
WIP MSC4354; stub out soft-failure code for now
kegsay Sep 18, 2025
8699534
Persist sticky events in sticky_events table
kegsay Sep 18, 2025
7801e68
Use multi-writer streams for sticky events
kegsay Sep 19, 2025
e01a22b
Hook up replication receiver, add sticky events to sync tokens
kegsay Sep 19, 2025
3e7a5a6
Insert sticky events into /sync responses
kegsay Sep 19, 2025
7af7429
Don't include expired sticky events in /sync responses
kegsay Sep 22, 2025
0cfdd0d
Delete from sticky_events periodically
kegsay Sep 22, 2025
7c8daf4
Get sticky events working with Simplified Sliding Sync
kegsay Sep 23, 2025
ac0f8c2
Support MSC4140 Delayed Events with sticky events
kegsay Sep 23, 2025
1e812e4
Fix sqlite
kegsay Sep 23, 2025
2728b21
Re-evaluate soft-failure on sticky events
kegsay Sep 24, 2025
666e94b
Don't re-evaluate spam
kegsay Sep 24, 2025
771692a
Rejig when we persist sticky events
kegsay Sep 24, 2025
ad6a2b9
Update docs to not lie
kegsay Sep 24, 2025
33d80be
Send sticky events when catching up over federation
kegsay Sep 26, 2025
148caef
Fix trial tests
kegsay Sep 26, 2025
de3e9b4
Add msc4354_sticky_duration_ttl_ms support
kegsay Sep 29, 2025
105d2cd
Merge branch 'develop' into kegan/sticky-events
kegsay Sep 30, 2025
651e829
Use standard unstable identifiers
kegsay Oct 1, 2025
4acc98d
Don't persist sticky outliers
kegsay Oct 1, 2025
78c4097
SQLite specific soft-failure update code
kegsay Oct 2, 2025
15453d4
JSON false not str
kegsay Oct 2, 2025
aa45bf7
Add msc4354 to /versions response
kegsay Oct 2, 2025
888ab79
Add NewServerJoined replication command
kegsay Oct 2, 2025
aac3c84
Use a tri-state for soft failed to communicate when we need to cache …
kegsay Oct 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker/complement/conf/workers-shared-extra.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ experimental_features:
msc4155_enabled: true
# Thread Subscriptions
msc4306_enabled: true
# Sticky Events
msc4354_enabled: true

server_notices:
system_mxid_localpart: _server
Expand Down
1 change: 1 addition & 0 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
"has_known_state",
"is_encrypted",
],
"sticky_events": ["soft_failed"],
"thread_subscriptions": ["subscribed", "automatic"],
"users": ["shadow_banned", "approved", "locked", "suspended"],
"un_partial_stated_event_stream": ["rejection_status_changed"],
Expand Down
21 changes: 20 additions & 1 deletion synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"""Contains constants from the specification."""

import enum
from typing import Final
from typing import Final, TypedDict

# the max size of a (canonical-json-encoded) event
MAX_PDU_SIZE = 65536
Expand Down Expand Up @@ -279,6 +279,8 @@ class EventUnsignedContentFields:
# Requesting user's membership, per MSC4115
MEMBERSHIP: Final = "membership"

STICKY_TTL: Final = "msc4354_sticky_duration_ttl_ms"


class MTextFields:
"""Fields found inside m.text content blocks."""
Expand Down Expand Up @@ -360,3 +362,20 @@ class Direction(enum.Enum):
class ProfileFields:
DISPLAYNAME: Final = "displayname"
AVATAR_URL: Final = "avatar_url"


class StickyEventField(TypedDict):
duration_ms: int


class StickyEvent:
QUERY_PARAM_NAME: Final = "org.matrix.msc4354.sticky_duration_ms"
FIELD_NAME: Final = "msc4354_sticky"
MAX_DURATION_MS: Final = 3600000 # 1 hour


# for the database
class StickyEventSoftFailed(enum.IntEnum):
FALSE = 0
TRUE = 1
FORMER_TRUE = 2
2 changes: 2 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
from synapse.storage.databases.main.sliding_sync import SlidingSyncStore
from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.sticky_events import StickyEventsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.databases.main.tags import TagsWorkerStore
from synapse.storage.databases.main.task_scheduler import TaskSchedulerWorkerStore
Expand Down Expand Up @@ -137,6 +138,7 @@ class GenericWorkerStore(
RoomWorkerStore,
DirectoryWorkerStore,
ThreadSubscriptionsWorkerStore,
StickyEventsWorkerStore,
PushRulesWorkerStore,
ApplicationServiceTransactionWorkerStore,
ApplicationServiceWorkerStore,
Expand Down
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,3 +595,6 @@ def read_config(
# MSC4306: Thread Subscriptions
# (and MSC4308: Thread Subscriptions extension to Sliding Sync)
self.msc4306_enabled: bool = experimental.get("msc4306_enabled", False)

# MSC4354: Sticky Events
self.msc4354_enabled: bool = experimental.get("msc4354_enabled", False)
2 changes: 1 addition & 1 deletion synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class WriterLocations:
"""Specifies the instances that write various streams.

Attributes:
events: The instances that write to the event and backfill streams.
events: The instances that write to the event, backfill and sticky events streams.
typing: The instances that write to the typing stream. Currently
can only be a single instance.
to_device: The instances that write to the to_device stream. Currently
Expand Down
21 changes: 20 additions & 1 deletion synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@
import attr
from unpaddedbase64 import encode_base64

from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.constants import (
EventContentFields,
EventTypes,
RelationTypes,
StickyEvent,
)
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
from synapse.synapse_rust.events import EventInternalMetadata
from synapse.types import (
Expand Down Expand Up @@ -323,6 +328,20 @@ def freeze(self) -> None:
# this will be a no-op if the event dict is already frozen.
self._dict = freeze(self._dict)

def sticky_duration(self) -> Optional[int]:
sticky_obj = self.get_dict().get(StickyEvent.FIELD_NAME, None)
if type(sticky_obj) is not dict:
return None
sticky_duration_ms = sticky_obj.get("duration_ms", None)
# MSC: Valid values are the integer range 0-MAX_DURATION_MS
if (
type(sticky_duration_ms) is int
and sticky_duration_ms >= 0
and sticky_duration_ms <= StickyEvent.MAX_DURATION_MS
):
return sticky_duration_ms
return None

def __str__(self) -> str:
return self.__repr__()

Expand Down
7 changes: 6 additions & 1 deletion synapse/events/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import attr
from signedjson.types import SigningKey

from synapse.api.constants import MAX_DEPTH, EventTypes
from synapse.api.constants import MAX_DEPTH, EventTypes, StickyEvent, StickyEventField
from synapse.api.room_versions import (
KNOWN_EVENT_FORMAT_VERSIONS,
EventFormatVersions,
Expand Down Expand Up @@ -89,6 +89,7 @@ class EventBuilder:

content: JsonDict = attr.Factory(dict)
unsigned: JsonDict = attr.Factory(dict)
sticky: Optional[StickyEventField] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sticky: Optional[StickyEventField] = None
sticky: Optional[StickyEventField] = None
"""
TODO
"""

It would be good to add a docstring to indicate where this is from and what's used for.


# These only exist on a subset of events, so they raise AttributeError if
# someone tries to get them when they don't exist.
Expand Down Expand Up @@ -269,6 +270,9 @@ async def build(
if self._origin_server_ts is not None:
event_dict["origin_server_ts"] = self._origin_server_ts

if self.sticky is not None:
event_dict[StickyEvent.FIELD_NAME] = self.sticky

return create_local_event_from_event_dict(
clock=self._clock,
hostname=self._hostname,
Expand Down Expand Up @@ -318,6 +322,7 @@ def for_room_version(
unsigned=key_values.get("unsigned", {}),
redacts=key_values.get("redacts", None),
origin_server_ts=key_values.get("origin_server_ts", None),
sticky=key_values.get(StickyEvent.FIELD_NAME, None),
)


Expand Down
2 changes: 2 additions & 0 deletions synapse/federation/federation_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ async def _check_sigs_and_hash(
# using the event in prev_events).
redacted_event = prune_event(pdu)
redacted_event.internal_metadata.soft_failed = True
# Mark this as spam so we don't re-evaluate soft-failure status.
redacted_event.internal_metadata.policy_server_spammy = True
Comment on lines +198 to +199
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unrelated to sticky events

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's related because spam is currently flagged in Synapse via soft-failure, and sometimes via policy_server_spammy. As this PR is re-evaluating soft-failed events, we want to make sure we don't re-evaluate spam, hence need to distinguish it.

return redacted_event

return pdu
Expand Down
5 changes: 5 additions & 0 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ def notify_new_events(self, max_token: RoomStreamToken) -> None:
# This should never get called.
raise NotImplementedError()

def notify_new_server_joined(self, server: str, room_id: str) -> None:
"""As per FederationSender"""
# This should never get called.
raise NotImplementedError()

def build_and_send_edu(
self,
destination: str,
Expand Down
10 changes: 10 additions & 0 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""
raise NotImplementedError()

@abc.abstractmethod
def notify_new_server_joined(self, server: str, room_id: str) -> None:
"""This gets called when we a new server has joined a room. We might
want to send out some events to this server.
"""
raise NotImplementedError()

@abc.abstractmethod
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room
Expand Down Expand Up @@ -488,6 +495,9 @@ def _get_per_destination_queue(
self._per_destination_queues[destination] = queue
return queue

def notify_new_server_joined(self, server: str, room_id: str) -> None:
print(f"FEDSENDER: new server joined: server={server} room={room_id}")

def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""This gets called when we have some new events we might want to
send out to other servers.
Expand Down
28 changes: 28 additions & 0 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def __init__(
self._instance_name = hs.get_instance_name()
self._federation_shard_config = hs.config.worker.federation_shard_config
self._state = hs.get_state_handler()
self.msc4354_enabled = hs.config.experimental.msc4354_enabled

self._should_send_on_this_instance = True
if not self._federation_shard_config.should_handle(
Expand Down Expand Up @@ -558,6 +559,33 @@ async def _catch_up_transmission_loop(self) -> None:
# send.
extrem_events = await self._store.get_events_as_list(extrems)

if self.msc4354_enabled:
# we also want to send sticky events that are still active in this room
sticky_event_ids = (
await self._store.get_sticky_event_ids_sent_by_self(
pdu.room_id,
last_successful_stream_ordering,
)
)
# skip any that are actually the forward extremities we want to send anyway
sticky_events = await self._store.get_events_as_list(
[
event_id
for event_id in sticky_event_ids
if event_id not in extrems
]
)
if sticky_events:
# *prepend* these to the extrem list, so they are processed first.
# This ensures they will show up before the forward extrem in stream order
extrem_events = sticky_events + extrem_events
logger.info(
"Sending %d missed sticky events to %s: %r",
len(sticky_events),
self._destination,
pdu.room_id,
)

new_pdus = []
for p in extrem_events:
# We pulled this from the DB, so it'll be non-null
Expand Down
12 changes: 9 additions & 3 deletions synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from twisted.internet.interfaces import IDelayedCall

from synapse.api.constants import EventTypes
from synapse.api.constants import EventTypes, StickyEvent
from synapse.api.errors import ShadowBanError, SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
Expand Down Expand Up @@ -333,6 +333,7 @@ async def add(
origin_server_ts: Optional[int],
content: JsonDict,
delay: int,
sticky_duration_ms: Optional[int],
) -> str:
"""
Creates a new delayed event and schedules its delivery.
Expand All @@ -346,7 +347,7 @@ async def add(
If None, the timestamp will be the actual time when the event is sent.
content: The content of the event to be sent.
delay: How long (in milliseconds) to wait before automatically sending the event.

sticky_duration_ms: The sticky duration if any, see MSC4354.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be expanded a bit to explain what it means without having to look at MSC4354. I have to make a lot of assumptions otherwise.

"Time since xxx that the event should appear in xxx"

Returns: The ID of the added delayed event.

Raises:
Expand Down Expand Up @@ -382,6 +383,7 @@ async def add(
origin_server_ts=origin_server_ts,
content=content,
delay=delay,
sticky_duration_ms=sticky_duration_ms,
)

if self._repl_client is not None:
Expand Down Expand Up @@ -489,6 +491,7 @@ async def send(self, requester: Requester, delay_id: str) -> None:
origin_server_ts=event.origin_server_ts,
content=event.content,
device_id=event.device_id,
sticky_duration_ms=event.sticky_duration_ms,
)
)

Expand Down Expand Up @@ -599,7 +602,10 @@ async def _send_event(

if event.state_key is not None:
event_dict["state_key"] = event.state_key

if event.sticky_duration_ms is not None:
event_dict[StickyEvent.FIELD_NAME] = {
"duration_ms": event.sticky_duration_ms,
}
(
sent_event,
_,
Expand Down
Loading
Loading