Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Admin API delete room v2 actions to be run on worker #17904

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17904.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow Admin API delete room v2 actions to be run on workers.
3 changes: 2 additions & 1 deletion docker/complement/conf/start_for_complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
client_reader, \
appservice, \
pusher, \
stream_writers=account_data+presence+receipts+to_device+typing"
stream_writers=account_data+presence+receipts+to_device+typing, \
admin"

fi
log "Workers requested: $SYNAPSE_WORKER_TYPES"
Expand Down
9 changes: 9 additions & 0 deletions docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@
},
"worker_extra_conf": "enable_media_repo: true",
},
"admin": {
"app": "synapse.app.generic_worker",
"listener_resources": ["replication", "admin"],
"endpoint_patterns": ["^/_synapse/admin/v2/rooms/.*$"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"appservice": {
"app": "synapse.app.generic_worker",
"listener_resources": [],
Expand Down Expand Up @@ -574,6 +581,7 @@ def is_sharding_allowed_for_worker_type(worker_type: str) -> bool:
"receipts",
"typing",
"to_device",
"admin",
]


Expand Down Expand Up @@ -1076,6 +1084,7 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
# Split type names by comma, ignoring whitespace.
worker_types = split_and_strip_string(worker_types_env, ",")
requested_worker_types = parse_worker_types(worker_types)
log(f"requested_worker_types {requested_worker_types}")

# Always regenerate all other config files
log("Generating worker config files")
Expand Down
2 changes: 2 additions & 0 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,8 @@ Valid resource names are:
for [workers](../../workers.md) and containers without listener e.g.
[application services](../../workers.md#notifying-application-services).

* `admin`: the admin API (/_synapse/admin)

Example configuration #1:
```yaml
listeners:
Expand Down
10 changes: 10 additions & 0 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,16 @@ with `client` and `federation` `resources` must be configured in the
[`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners)
option in the worker config.

The following admin APIs are now available to be handled by workers, with more forthcoming:

# Admin APIs
"^/_synapse/admin/v2/rooms/.*$"

Note that a [HTTP listener](usage/configuration/config_documentation.md#listeners)
with `admin` and `replication `resources` must be configured in the
[`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners)
option in the worker config.

#### Load balancing

It is possible to run multiple instances of this worker app, with incoming requests
Expand Down
1 change: 1 addition & 0 deletions synapse/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
MEDIA_R0_PREFIX = "/_matrix/media/r0"
MEDIA_V3_PREFIX = "/_matrix/media/v3"
LEGACY_MEDIA_PREFIX = "/_matrix/media/v1"
ADMIN_PREFIX = "/_synapse/admin"


class ConsentURIBuilder:
Expand Down
8 changes: 6 additions & 2 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import synapse
import synapse.events
from synapse.api.urls import (
ADMIN_PREFIX,
CLIENT_API_PREFIX,
FEDERATION_PREFIX,
LEGACY_MEDIA_PREFIX,
Expand All @@ -52,7 +53,7 @@
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.rest import ClientRestResource
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.admin import AdminRestResource, register_servlets_for_media_repo
from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyResource
from synapse.rest.synapse.client import build_synapse_client_resource_tree
Expand Down Expand Up @@ -207,7 +208,7 @@ def _listen_http(self, listener_config: ListenerConfig) -> None:
MEDIA_R0_PREFIX: media_repo,
MEDIA_V3_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
"/_synapse/admin": admin_resource,
ADMIN_PREFIX: admin_resource,
}
)

Expand Down Expand Up @@ -248,6 +249,9 @@ def _listen_http(self, listener_config: ListenerConfig) -> None:
if name == "replication":
resources[REPLICATION_PREFIX] = ReplicationRestResource(self)

if name == "admin":
resources[ADMIN_PREFIX] = AdminRestResource(self)

# Attach additional resources registered by modules.
resources.update(self._module_web_resources)
self._module_web_resources_consumed = True
Expand Down
1 change: 1 addition & 0 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def generate_ip_set(
"openid",
"replication",
"static",
"admin",
}


Expand Down
5 changes: 4 additions & 1 deletion synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,11 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
"""
Register all the admin servlets.
"""
# Admin servlets aren't registered on workers.
# Only handle certain endpoints on workers.
if hs.config.worker.worker_app is not None:
RoomRestV2Servlet(hs).register(http_server)
DeleteRoomStatusByDeleteIdRestServlet(hs).register(http_server)
DeleteRoomStatusByRoomIdRestServlet(hs).register(http_server)
return

register_servlets_for_client_rest_resource(hs, http_server)
Expand Down
52 changes: 26 additions & 26 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,32 @@ def process_replication_position(
self._un_partial_stated_rooms_stream_id_gen.advance(instance_name, token)
return super().process_replication_position(stream_name, instance_name, token)

async def block_room(self, room_id: str, user_id: str) -> None:
"""Marks the room as blocked.

Can be called multiple times (though we'll only track the last user to
block this room).

Can be called on a room unknown to this homeserver.

Args:
room_id: Room to block
user_id: Who blocked it
"""
await self.db_pool.simple_upsert(
table="blocked_rooms",
keyvalues={"room_id": room_id},
values={},
insertion_values={"user_id": user_id},
desc="block_room",
)
await self.db_pool.runInteraction(
"block_room_invalidation",
self._invalidate_cache_and_stream,
self.is_room_blocked,
(room_id,),
)

async def store_room(
self,
room_id: str,
Expand Down Expand Up @@ -2493,32 +2519,6 @@ async def add_room_report(
)
return next_id

async def block_room(self, room_id: str, user_id: str) -> None:
"""Marks the room as blocked.

Can be called multiple times (though we'll only track the last user to
block this room).

Can be called on a room unknown to this homeserver.

Args:
room_id: Room to block
user_id: Who blocked it
"""
await self.db_pool.simple_upsert(
table="blocked_rooms",
keyvalues={"room_id": room_id},
values={},
insertion_values={"user_id": user_id},
desc="block_room",
)
await self.db_pool.runInteraction(
"block_room_invalidation",
self._invalidate_cache_and_stream,
self.is_room_blocked,
(room_id,),
)

async def unblock_room(self, room_id: str) -> None:
"""Remove the room from blocking list.

Expand Down
Loading