From b729acf480a85422e64a87198f490d258b368b0a Mon Sep 17 00:00:00 2001 From: roekatz Date: Thu, 16 May 2024 16:44:55 +0300 Subject: [PATCH 1/5] Statistics: Use task pool --- .../opal-server/opal_server/statistics.py | 49 ++++++++++--------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/packages/opal-server/opal_server/statistics.py b/packages/opal-server/opal_server/statistics.py index 045d13845..9741d58ec 100644 --- a/packages/opal-server/opal_server/statistics.py +++ b/packages/opal-server/opal_server/statistics.py @@ -1,13 +1,14 @@ import asyncio from datetime import datetime from random import uniform -from typing import Dict, List, Optional, Set +from typing import Any, Dict, List, Optional, Set from uuid import uuid4 import pydantic from fastapi import APIRouter, HTTPException, status from fastapi_websocket_pubsub.event_notifier import Subscription, TopicList from fastapi_websocket_pubsub.pub_sub_server import PubSubEndpoint +from opal_common.async_utils import TasksPool from opal_common.config import opal_common_config from opal_common.logger import get_logger from opal_server.config import opal_server_config @@ -73,11 +74,17 @@ def __init__(self, endpoint): self._worker_id = uuid4().hex self._synced_after_wakeup = asyncio.Event() self._received_sync_messages: Set[str] = set() + self._publish_tasks = TasksPool() @property def state(self) -> ServerStats: return self._state + def _publish(self, channel: str, message: Any): + self._publish_tasks.add_task( + asyncio.create_task(self._endpoint.publish([channel], message)) + ) + async def run(self): """subscribe to two channels to be able to sync add and delete of clients.""" @@ -101,15 +108,13 @@ async def run(self): # counting on the broadcaster to listen and to replicate the message # to the other workers / server nodes in the networks. # However, since broadcaster is using asyncio.create_task(), there is a - # race condition that is mitigate by this asyncio.sleep() call. + # race condition that is mitigated by this asyncio.sleep() call. await asyncio.sleep(SLEEP_TIME_FOR_BROADCASTER_READER_TO_START) # Let all the other opal servers know that new opal server started logger.info(f"sending stats wakeup message: {self._worker_id}") - asyncio.create_task( - self._endpoint.publish( - [opal_server_config.STATISTICS_WAKEUP_CHANNEL], - SyncRequest(requesting_worker_id=self._worker_id).dict(), - ) + self._publish( + opal_server_config.STATISTICS_WAKEUP_CHANNEL, + SyncRequest(requesting_worker_id=self._worker_id).dict(), ) async def _sync_remove_client(self, subscription: Subscription, rpc_id: str): @@ -128,7 +133,9 @@ async def _receive_other_worker_wakeup_message( """Callback when new server wakes up and requests our statistics state. Sends state only if we have state of our own and another - response to that request was not already received. + response to that request was not already received. Always reply + with hello message to refresh the "workers" state of other + servers. """ try: request = SyncRequest(**sync_request) @@ -150,19 +157,17 @@ async def _receive_other_worker_wakeup_message( # wait random time in order to reduce the number of messages sent by all the other opal servers await asyncio.sleep(uniform(MIN_TIME_TO_WAIT, MAX_TIME_TO_WAIT)) # if didn't got any other message it means that this server is the first one to pass the sleep - if not request.requesting_worker_id in self._received_sync_messages: + if request.requesting_worker_id not in self._received_sync_messages: logger.info( f"[{request.requesting_worker_id}] respond with my own stats" ) - asyncio.create_task( - self._endpoint.publish( - [opal_server_config.STATISTICS_STATE_SYNC_CHANNEL], - SyncResponse( - requesting_worker_id=request.requesting_worker_id, - clients=self._state.clients, - rpc_id_to_client_id=self._rpc_id_to_client_id, - ).dict(), - ) + self._publish( + opal_server_config.STATISTICS_STATE_SYNC_CHANNEL, + SyncResponse( + requesting_worker_id=request.requesting_worker_id, + clients=self._state.clients, + rpc_id_to_client_id=self._rpc_id_to_client_id, + ).dict(), ) async def _receive_other_worker_synced_state( @@ -270,11 +275,9 @@ async def remove_client(self, rpc_id: str, topics: TopicList, publish=True): "Publish rpc_id={rpc_id} to be removed from statistics", rpc_id=rpc_id, ) - asyncio.create_task( - self._endpoint.publish( - [opal_common_config.STATISTICS_REMOVE_CLIENT_CHANNEL], - rpc_id, - ) + self._publish( + opal_common_config.STATISTICS_REMOVE_CLIENT_CHANNEL, + rpc_id, ) From 50d237d96c0f907e602f2f453b10575329254206 Mon Sep 17 00:00:00 2001 From: roekatz Date: Thu, 23 May 2024 18:11:55 +0300 Subject: [PATCH 2/5] Statistics: Count servers and clients --- packages/opal-server/opal_server/config.py | 5 ++ .../opal-server/opal_server/statistics.py | 67 ++++++++++++++++--- 2 files changed, 64 insertions(+), 8 deletions(-) diff --git a/packages/opal-server/opal_server/config.py b/packages/opal-server/opal_server/config.py index 05f4f9230..aad1b9e8b 100644 --- a/packages/opal-server/opal_server/config.py +++ b/packages/opal-server/opal_server/config.py @@ -177,6 +177,11 @@ class OpalServerConfig(Confi): "__opal_stats_state_sync", description="The topic other servers with statistics provide their state to a waking-up server", ) + STATISTICS_SERVER_HELLO_CHANNEL = confi.str( + "STATISTICS_SERVER_HELLO_CHANNEL", + "__opal_stats_server_hello", + description="The topic workers use to signal they exist and are alive", + ) # Data updates ALL_DATA_TOPIC = confi.str( diff --git a/packages/opal-server/opal_server/statistics.py b/packages/opal-server/opal_server/statistics.py index 9741d58ec..0a2f1a65e 100644 --- a/packages/opal-server/opal_server/statistics.py +++ b/packages/opal-server/opal_server/statistics.py @@ -1,4 +1,5 @@ import asyncio +import os from datetime import datetime from random import uniform from typing import Any, Dict, List, Optional, Set @@ -27,6 +28,15 @@ class ServerStats(BaseModel): ..., description="connected opal clients, each client can have multiple subscriptions", ) + servers: Set[str] = Field( + ..., + description="list of all connected opal server replicas", + ) + + +class StatCounts(BaseModel): + clients: int + servers: int class SyncRequest(BaseModel): @@ -39,6 +49,10 @@ class SyncResponse(BaseModel): rpc_id_to_client_id: Dict[str, str] +class ServerHello(BaseModel): + worker_id: str + + logger = get_logger("opal.statistics") # time to wait before sending statistics @@ -59,19 +73,21 @@ def __init__(self, endpoint): self._endpoint: PubSubEndpoint = endpoint self._uptime = datetime.utcnow() + # helps us realize when another server already responded to a sync request + self._worker_id = uuid4().hex + # state: Dict[str, List[ChannelStats]] # The state is built in this way so it will be easy to understand how much OPAL clients (vs. rpc clients) # you have connected to your OPAL server and to help merge client lists between servers. # The state is keyed by unique client id (A unique id that each opal client can set in env var `OPAL_CLIENT_STAT_ID`) - self._state: ServerStats = ServerStats(uptime=self._uptime, clients={}) + self._state: ServerStats = ServerStats( + uptime=self._uptime, clients={}, servers={self._worker_id} + ) # rpc_id_to_client_id: # dict to help us get client id without another loop self._rpc_id_to_client_id: Dict[str, str] = {} self._lock = asyncio.Lock() - - # helps us realize when another server already responded to a sync request - self._worker_id = uuid4().hex self._synced_after_wakeup = asyncio.Event() self._received_sync_messages: Set[str] = set() self._publish_tasks = TasksPool() @@ -80,11 +96,15 @@ def __init__(self, endpoint): def state(self) -> ServerStats: return self._state - def _publish(self, channel: str, message: Any): - self._publish_tasks.add_task( - asyncio.create_task(self._endpoint.publish([channel], message)) + @property + def stat_counts(self) -> StatCounts: + return StatCounts( + clients=len(self._state.clients), servers=len(self._state.servers) ) + def _publish(self, channel: str, message: Any): + self._publish_tasks.add_task(self._endpoint.publish([channel], message)) + async def run(self): """subscribe to two channels to be able to sync add and delete of clients.""" @@ -96,6 +116,10 @@ async def run(self): [opal_server_config.STATISTICS_STATE_SYNC_CHANNEL], self._receive_other_worker_synced_state, ) + await self._endpoint.subscribe( + [opal_server_config.STATISTICS_SERVER_HELLO_CHANNEL], + self._receive_other_worker_hello_message, + ) await self._endpoint.subscribe( [opal_common_config.STATISTICS_ADD_CLIENT_CHANNEL], self._add_client ) @@ -153,10 +177,18 @@ async def _receive_other_worker_wakeup_message( return logger.debug(f"received stats wakeup message: {request.requesting_worker_id}") + + # Use worker wakeup to reset everyone's "servers" state + self._state.servers = {self._worker_id, request.requesting_worker_id} + self._publish( + opal_server_config.STATISTICS_SERVER_HELLO_CHANNEL, + ServerHello(worker_id=self._worker_id).dict(), + ) + if len(self._state.clients): # wait random time in order to reduce the number of messages sent by all the other opal servers await asyncio.sleep(uniform(MIN_TIME_TO_WAIT, MAX_TIME_TO_WAIT)) - # if didn't got any other message it means that this server is the first one to pass the sleep + # if didn't get any other message it means that this server is the first one to pass the sleep if request.requesting_worker_id not in self._received_sync_messages: logger.info( f"[{request.requesting_worker_id}] respond with my own stats" @@ -198,6 +230,11 @@ async def _receive_other_worker_synced_state( self._rpc_id_to_client_id = response.rpc_id_to_client_id self._synced_after_wakeup.set() + async def _receive_other_worker_hello_message( + self, subscription: Subscription, hello_message: dict + ): + self._state.servers.add(hello_message["worker_id"]) + async def _add_client(self, subscription: Subscription, stats_message: dict): """add client record to statistics state. @@ -305,4 +342,18 @@ async def get_statistics(): logger.info("Serving statistics") return stats.state + @router.get("/stat_counts", response_model=StatCounts) + async def get_stat_counts(): + """Route to serve only server and client instanace counts.""" + if stats is None: + raise HTTPException( + status_code=status.HTTP_501_NOT_IMPLEMENTED, + detail={ + "error": "This OPAL server does not have statistics turned on." + + " To turn on, set this config var: OPAL_STATISTICS_ENABLED=true" + }, + ) + logger.info("Serving stat counts") + return stats.stat_counts + return router From 74b305067e040d03949f224bd1ac4feba6f15139 Mon Sep 17 00:00:00 2001 From: roekatz Date: Sun, 26 May 2024 18:34:04 +0300 Subject: [PATCH 3/5] fixup! Statistics: Count servers and clients --- packages/opal-server/opal_server/config.py | 9 ++- packages/opal-server/opal_server/server.py | 2 + .../opal-server/opal_server/statistics.py | 72 +++++++++++++++---- 3 files changed, 67 insertions(+), 16 deletions(-) diff --git a/packages/opal-server/opal_server/config.py b/packages/opal-server/opal_server/config.py index aad1b9e8b..7ab638675 100644 --- a/packages/opal-server/opal_server/config.py +++ b/packages/opal-server/opal_server/config.py @@ -177,11 +177,16 @@ class OpalServerConfig(Confi): "__opal_stats_state_sync", description="The topic other servers with statistics provide their state to a waking-up server", ) - STATISTICS_SERVER_HELLO_CHANNEL = confi.str( - "STATISTICS_SERVER_HELLO_CHANNEL", + STATISTICS_SERVER_KEEPALIVE_CHANNEL = confi.str( + "STATISTICS_SERVER_KEEPALIVE_CHANNEL", "__opal_stats_server_hello", description="The topic workers use to signal they exist and are alive", ) + STATISTICS_SERVER_KEEPALIVE_TIMEOUT = confi.str( + "STATISTICS_SERVER_KEEPALIVE_TIMEOUT", + 20, + description="Timeout for forgetting a server from which a keep-alive haven't been seen (keep-alive frequency would be half of this value)", + ) # Data updates ALL_DATA_TOPIC = confi.str( diff --git a/packages/opal-server/opal_server/server.py b/packages/opal-server/opal_server/server.py index b286fd796..6a946a8c0 100644 --- a/packages/opal-server/opal_server/server.py +++ b/packages/opal-server/opal_server/server.py @@ -390,6 +390,8 @@ async def stop_server_background_tasks(self): tasks.append(asyncio.create_task(self.publisher.stop())) if self.broadcast_keepalive is not None: tasks.append(asyncio.create_task(self.broadcast_keepalive.stop())) + if self.opal_statistics is not None: + tasks.append(asyncio.create_task(self.opal_statistics.stop())) try: await asyncio.gather(*tasks) diff --git a/packages/opal-server/opal_server/statistics.py b/packages/opal-server/opal_server/statistics.py index 0a2f1a65e..8530fd94d 100644 --- a/packages/opal-server/opal_server/statistics.py +++ b/packages/opal-server/opal_server/statistics.py @@ -1,10 +1,12 @@ import asyncio import os from datetime import datetime +from importlib.metadata import version as module_version from random import uniform from typing import Any, Dict, List, Optional, Set from uuid import uuid4 +import opal_server import pydantic from fastapi import APIRouter, HTTPException, status from fastapi_websocket_pubsub.event_notifier import Subscription, TopicList @@ -12,6 +14,7 @@ from opal_common.async_utils import TasksPool from opal_common.config import opal_common_config from opal_common.logger import get_logger +from opal_common.topics.publisher import PeriodicPublisher from opal_server.config import opal_server_config from pydantic import BaseModel, Field @@ -49,7 +52,7 @@ class SyncResponse(BaseModel): rpc_id_to_client_id: Dict[str, str] -class ServerHello(BaseModel): +class ServerKeepalive(BaseModel): worker_id: str @@ -72,6 +75,10 @@ class OpalStatistics: def __init__(self, endpoint): self._endpoint: PubSubEndpoint = endpoint self._uptime = datetime.utcnow() + self._opal_version = module_version(opal_server.__name__) + self._workers_count = (lambda envar: int(envar) if envar.isdigit() else 1)( + os.environ.get("UVICORN_NUM_WORKERS", "1") + ) # helps us realize when another server already responded to a sync request self._worker_id = uuid4().hex @@ -91,6 +98,8 @@ def __init__(self, endpoint): self._synced_after_wakeup = asyncio.Event() self._received_sync_messages: Set[str] = set() self._publish_tasks = TasksPool() + self._seen_servers: Dict[str, datetime] = {} + self._periodic_keepalive_task: asyncio.Task | None = None @property def state(self) -> ServerStats: @@ -99,9 +108,40 @@ def state(self) -> ServerStats: @property def stat_counts(self) -> StatCounts: return StatCounts( - clients=len(self._state.clients), servers=len(self._state.servers) + clients=len(self._state.clients), + servers=len(self._state.servers) / self._workers_count, ) + async def _expire_old_servers(self): + async with self._lock: + now = datetime.utcnow() + still_alive = {} + for server_id, last_seen in self._seen_servers.items(): + if ( + now - last_seen + ).total_seconds() < opal_server_config.STATISTICS_SERVER_KEEPALIVE_TIMEOUT: + still_alive[server_id] = last_seen + self._seen_servers = still_alive + self._state.servers = {self._worker_id} | set(self._seen_servers.keys()) + + async def _periodic_server_keepalive(self): + while True: + try: + await self._expire_old_servers() + self._publish( + opal_server_config.STATISTICS_SERVER_KEEPALIVE_CHANNEL, + ServerKeepalive(worker_id=self._worker_id).dict(), + ) + await asyncio.sleep( + opal_server_config.STATISTICS_SERVER_KEEPALIVE_TIMEOUT / 2 + ) + except asyncio.CancelledError: + logger.debug("Statistics: periodic server keepalive cancelled") + return + except Exception as e: + logger.exception("Statistics: periodic server keepalive failed") + logger.exception("Statistics: periodic server keepalive failed") + def _publish(self, channel: str, message: Any): self._publish_tasks.add_task(self._endpoint.publish([channel], message)) @@ -117,8 +157,8 @@ async def run(self): self._receive_other_worker_synced_state, ) await self._endpoint.subscribe( - [opal_server_config.STATISTICS_SERVER_HELLO_CHANNEL], - self._receive_other_worker_hello_message, + [opal_server_config.STATISTICS_SERVER_KEEPALIVE_CHANNEL], + self._receive_other_worker_keepalive_message, ) await self._endpoint.subscribe( [opal_common_config.STATISTICS_ADD_CLIENT_CHANNEL], self._add_client @@ -140,6 +180,15 @@ async def run(self): opal_server_config.STATISTICS_WAKEUP_CHANNEL, SyncRequest(requesting_worker_id=self._worker_id).dict(), ) + self._periodic_keepalive_task = asyncio.create_task( + self._periodic_server_keepalive() + ) + + async def stop(self): + if self._periodic_keepalive_task: + self._periodic_keepalive_task.cancel() + await self._periodic_keepalive_task + self._periodic_keepalive_task = None async def _sync_remove_client(self, subscription: Subscription, rpc_id: str): """helper function to recall remove client in all servers. @@ -178,13 +227,6 @@ async def _receive_other_worker_wakeup_message( logger.debug(f"received stats wakeup message: {request.requesting_worker_id}") - # Use worker wakeup to reset everyone's "servers" state - self._state.servers = {self._worker_id, request.requesting_worker_id} - self._publish( - opal_server_config.STATISTICS_SERVER_HELLO_CHANNEL, - ServerHello(worker_id=self._worker_id).dict(), - ) - if len(self._state.clients): # wait random time in order to reduce the number of messages sent by all the other opal servers await asyncio.sleep(uniform(MIN_TIME_TO_WAIT, MAX_TIME_TO_WAIT)) @@ -230,10 +272,12 @@ async def _receive_other_worker_synced_state( self._rpc_id_to_client_id = response.rpc_id_to_client_id self._synced_after_wakeup.set() - async def _receive_other_worker_hello_message( - self, subscription: Subscription, hello_message: dict + async def _receive_other_worker_keepalive_message( + self, subscription: Subscription, keepalive_message: dict ): - self._state.servers.add(hello_message["worker_id"]) + async with self._lock: + self._seen_servers[keepalive_message["worker_id"]] = datetime.now() + self._state.servers.add(keepalive_message["worker_id"]) async def _add_client(self, subscription: Subscription, stats_message: dict): """add client record to statistics state. From 0b72cf93abb580e7b42bd70daa859e5e839e9330 Mon Sep 17 00:00:00 2001 From: roekatz Date: Sun, 26 May 2024 19:01:25 +0300 Subject: [PATCH 4/5] Statistics: Add opal version --- .../opal-server/opal_server/statistics.py | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/packages/opal-server/opal_server/statistics.py b/packages/opal-server/opal_server/statistics.py index 8530fd94d..6ae5ad619 100644 --- a/packages/opal-server/opal_server/statistics.py +++ b/packages/opal-server/opal_server/statistics.py @@ -27,6 +27,7 @@ class ChannelStats(BaseModel): class ServerStats(BaseModel): uptime: datetime = Field(..., description="uptime for this opal server worker") + version: str = Field(..., description="opal server version") clients: Dict[str, List[ChannelStats]] = Field( ..., description="connected opal clients, each client can have multiple subscriptions", @@ -37,9 +38,11 @@ class ServerStats(BaseModel): ) -class StatCounts(BaseModel): - clients: int - servers: int +class ServerStatsBrief(BaseModel): + uptime: datetime = Field(..., description="uptime for this opal server worker") + version: str = Field(..., description="opal server version") + client_count: int = Field(..., description="number of connected opal clients") + server_count: int = Field(..., description="number of opal server replicas") class SyncRequest(BaseModel): @@ -75,7 +78,6 @@ class OpalStatistics: def __init__(self, endpoint): self._endpoint: PubSubEndpoint = endpoint self._uptime = datetime.utcnow() - self._opal_version = module_version(opal_server.__name__) self._workers_count = (lambda envar: int(envar) if envar.isdigit() else 1)( os.environ.get("UVICORN_NUM_WORKERS", "1") ) @@ -88,7 +90,10 @@ def __init__(self, endpoint): # you have connected to your OPAL server and to help merge client lists between servers. # The state is keyed by unique client id (A unique id that each opal client can set in env var `OPAL_CLIENT_STAT_ID`) self._state: ServerStats = ServerStats( - uptime=self._uptime, clients={}, servers={self._worker_id} + uptime=self._uptime, + clients={}, + servers={self._worker_id}, + version=module_version(opal_server.__name__), ) # rpc_id_to_client_id: @@ -106,10 +111,12 @@ def state(self) -> ServerStats: return self._state @property - def stat_counts(self) -> StatCounts: - return StatCounts( - clients=len(self._state.clients), - servers=len(self._state.servers) / self._workers_count, + def state_brief(self) -> ServerStatsBrief: + return ServerStatsBrief( + uptime=self._state.uptime, + version=self._state.version, + client_count=len(self._state.clients), + server_count=len(self._state.servers) / self._workers_count, ) async def _expire_old_servers(self): @@ -386,7 +393,7 @@ async def get_statistics(): logger.info("Serving statistics") return stats.state - @router.get("/stat_counts", response_model=StatCounts) + @router.get("/stats", response_model=ServerStatsBrief) async def get_stat_counts(): """Route to serve only server and client instanace counts.""" if stats is None: @@ -397,7 +404,7 @@ async def get_stat_counts(): + " To turn on, set this config var: OPAL_STATISTICS_ENABLED=true" }, ) - logger.info("Serving stat counts") - return stats.stat_counts + logger.info("Serving brief statistics info") + return stats.state_brief return router From 731b10137c9a77569c91ef7699f66c81c926f5ba Mon Sep 17 00:00:00 2001 From: roekatz Date: Sun, 26 May 2024 19:25:47 +0300 Subject: [PATCH 5/5] fixup! Statistics: Count servers and clients --- packages/opal-server/opal_server/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/opal-server/opal_server/config.py b/packages/opal-server/opal_server/config.py index 7ab638675..b272915ad 100644 --- a/packages/opal-server/opal_server/config.py +++ b/packages/opal-server/opal_server/config.py @@ -179,7 +179,7 @@ class OpalServerConfig(Confi): ) STATISTICS_SERVER_KEEPALIVE_CHANNEL = confi.str( "STATISTICS_SERVER_KEEPALIVE_CHANNEL", - "__opal_stats_server_hello", + "__opal_stats_server_keepalive", description="The topic workers use to signal they exist and are alive", ) STATISTICS_SERVER_KEEPALIVE_TIMEOUT = confi.str(