diff --git a/packages/opal-server/opal_server/config.py b/packages/opal-server/opal_server/config.py index 05f4f923..b272915a 100644 --- a/packages/opal-server/opal_server/config.py +++ b/packages/opal-server/opal_server/config.py @@ -177,6 +177,16 @@ class OpalServerConfig(Confi): "__opal_stats_state_sync", description="The topic other servers with statistics provide their state to a waking-up server", ) + STATISTICS_SERVER_KEEPALIVE_CHANNEL = confi.str( + "STATISTICS_SERVER_KEEPALIVE_CHANNEL", + "__opal_stats_server_keepalive", + description="The topic workers use to signal they exist and are alive", + ) + STATISTICS_SERVER_KEEPALIVE_TIMEOUT = confi.str( + "STATISTICS_SERVER_KEEPALIVE_TIMEOUT", + 20, + description="Timeout for forgetting a server from which a keep-alive haven't been seen (keep-alive frequency would be half of this value)", + ) # Data updates ALL_DATA_TOPIC = confi.str( diff --git a/packages/opal-server/opal_server/server.py b/packages/opal-server/opal_server/server.py index b286fd79..6a946a8c 100644 --- a/packages/opal-server/opal_server/server.py +++ b/packages/opal-server/opal_server/server.py @@ -390,6 +390,8 @@ async def stop_server_background_tasks(self): tasks.append(asyncio.create_task(self.publisher.stop())) if self.broadcast_keepalive is not None: tasks.append(asyncio.create_task(self.broadcast_keepalive.stop())) + if self.opal_statistics is not None: + tasks.append(asyncio.create_task(self.opal_statistics.stop())) try: await asyncio.gather(*tasks) diff --git a/packages/opal-server/opal_server/statistics.py b/packages/opal-server/opal_server/statistics.py index 045d1384..6ae5ad61 100644 --- a/packages/opal-server/opal_server/statistics.py +++ b/packages/opal-server/opal_server/statistics.py @@ -1,15 +1,20 @@ import asyncio +import os from datetime import datetime +from importlib.metadata import version as module_version from random import uniform -from typing import Dict, List, Optional, Set +from typing import Any, Dict, List, Optional, Set from uuid import uuid4 +import opal_server import pydantic from fastapi import APIRouter, HTTPException, status from fastapi_websocket_pubsub.event_notifier import Subscription, TopicList from fastapi_websocket_pubsub.pub_sub_server import PubSubEndpoint +from opal_common.async_utils import TasksPool from opal_common.config import opal_common_config from opal_common.logger import get_logger +from opal_common.topics.publisher import PeriodicPublisher from opal_server.config import opal_server_config from pydantic import BaseModel, Field @@ -22,10 +27,22 @@ class ChannelStats(BaseModel): class ServerStats(BaseModel): uptime: datetime = Field(..., description="uptime for this opal server worker") + version: str = Field(..., description="opal server version") clients: Dict[str, List[ChannelStats]] = Field( ..., description="connected opal clients, each client can have multiple subscriptions", ) + servers: Set[str] = Field( + ..., + description="list of all connected opal server replicas", + ) + + +class ServerStatsBrief(BaseModel): + uptime: datetime = Field(..., description="uptime for this opal server worker") + version: str = Field(..., description="opal server version") + client_count: int = Field(..., description="number of connected opal clients") + server_count: int = Field(..., description="number of opal server replicas") class SyncRequest(BaseModel): @@ -38,6 +55,10 @@ class SyncResponse(BaseModel): rpc_id_to_client_id: Dict[str, str] +class ServerKeepalive(BaseModel): + worker_id: str + + logger = get_logger("opal.statistics") # time to wait before sending statistics @@ -57,27 +78,80 @@ class OpalStatistics: def __init__(self, endpoint): self._endpoint: PubSubEndpoint = endpoint self._uptime = datetime.utcnow() + self._workers_count = (lambda envar: int(envar) if envar.isdigit() else 1)( + os.environ.get("UVICORN_NUM_WORKERS", "1") + ) + + # helps us realize when another server already responded to a sync request + self._worker_id = uuid4().hex # state: Dict[str, List[ChannelStats]] # The state is built in this way so it will be easy to understand how much OPAL clients (vs. rpc clients) # you have connected to your OPAL server and to help merge client lists between servers. # The state is keyed by unique client id (A unique id that each opal client can set in env var `OPAL_CLIENT_STAT_ID`) - self._state: ServerStats = ServerStats(uptime=self._uptime, clients={}) + self._state: ServerStats = ServerStats( + uptime=self._uptime, + clients={}, + servers={self._worker_id}, + version=module_version(opal_server.__name__), + ) # rpc_id_to_client_id: # dict to help us get client id without another loop self._rpc_id_to_client_id: Dict[str, str] = {} self._lock = asyncio.Lock() - - # helps us realize when another server already responded to a sync request - self._worker_id = uuid4().hex self._synced_after_wakeup = asyncio.Event() self._received_sync_messages: Set[str] = set() + self._publish_tasks = TasksPool() + self._seen_servers: Dict[str, datetime] = {} + self._periodic_keepalive_task: asyncio.Task | None = None @property def state(self) -> ServerStats: return self._state + @property + def state_brief(self) -> ServerStatsBrief: + return ServerStatsBrief( + uptime=self._state.uptime, + version=self._state.version, + client_count=len(self._state.clients), + server_count=len(self._state.servers) / self._workers_count, + ) + + async def _expire_old_servers(self): + async with self._lock: + now = datetime.utcnow() + still_alive = {} + for server_id, last_seen in self._seen_servers.items(): + if ( + now - last_seen + ).total_seconds() < opal_server_config.STATISTICS_SERVER_KEEPALIVE_TIMEOUT: + still_alive[server_id] = last_seen + self._seen_servers = still_alive + self._state.servers = {self._worker_id} | set(self._seen_servers.keys()) + + async def _periodic_server_keepalive(self): + while True: + try: + await self._expire_old_servers() + self._publish( + opal_server_config.STATISTICS_SERVER_KEEPALIVE_CHANNEL, + ServerKeepalive(worker_id=self._worker_id).dict(), + ) + await asyncio.sleep( + opal_server_config.STATISTICS_SERVER_KEEPALIVE_TIMEOUT / 2 + ) + except asyncio.CancelledError: + logger.debug("Statistics: periodic server keepalive cancelled") + return + except Exception as e: + logger.exception("Statistics: periodic server keepalive failed") + logger.exception("Statistics: periodic server keepalive failed") + + def _publish(self, channel: str, message: Any): + self._publish_tasks.add_task(self._endpoint.publish([channel], message)) + async def run(self): """subscribe to two channels to be able to sync add and delete of clients.""" @@ -89,6 +163,10 @@ async def run(self): [opal_server_config.STATISTICS_STATE_SYNC_CHANNEL], self._receive_other_worker_synced_state, ) + await self._endpoint.subscribe( + [opal_server_config.STATISTICS_SERVER_KEEPALIVE_CHANNEL], + self._receive_other_worker_keepalive_message, + ) await self._endpoint.subscribe( [opal_common_config.STATISTICS_ADD_CLIENT_CHANNEL], self._add_client ) @@ -101,17 +179,24 @@ async def run(self): # counting on the broadcaster to listen and to replicate the message # to the other workers / server nodes in the networks. # However, since broadcaster is using asyncio.create_task(), there is a - # race condition that is mitigate by this asyncio.sleep() call. + # race condition that is mitigated by this asyncio.sleep() call. await asyncio.sleep(SLEEP_TIME_FOR_BROADCASTER_READER_TO_START) # Let all the other opal servers know that new opal server started logger.info(f"sending stats wakeup message: {self._worker_id}") - asyncio.create_task( - self._endpoint.publish( - [opal_server_config.STATISTICS_WAKEUP_CHANNEL], - SyncRequest(requesting_worker_id=self._worker_id).dict(), - ) + self._publish( + opal_server_config.STATISTICS_WAKEUP_CHANNEL, + SyncRequest(requesting_worker_id=self._worker_id).dict(), + ) + self._periodic_keepalive_task = asyncio.create_task( + self._periodic_server_keepalive() ) + async def stop(self): + if self._periodic_keepalive_task: + self._periodic_keepalive_task.cancel() + await self._periodic_keepalive_task + self._periodic_keepalive_task = None + async def _sync_remove_client(self, subscription: Subscription, rpc_id: str): """helper function to recall remove client in all servers. @@ -128,7 +213,9 @@ async def _receive_other_worker_wakeup_message( """Callback when new server wakes up and requests our statistics state. Sends state only if we have state of our own and another - response to that request was not already received. + response to that request was not already received. Always reply + with hello message to refresh the "workers" state of other + servers. """ try: request = SyncRequest(**sync_request) @@ -146,23 +233,22 @@ async def _receive_other_worker_wakeup_message( return logger.debug(f"received stats wakeup message: {request.requesting_worker_id}") + if len(self._state.clients): # wait random time in order to reduce the number of messages sent by all the other opal servers await asyncio.sleep(uniform(MIN_TIME_TO_WAIT, MAX_TIME_TO_WAIT)) - # if didn't got any other message it means that this server is the first one to pass the sleep - if not request.requesting_worker_id in self._received_sync_messages: + # if didn't get any other message it means that this server is the first one to pass the sleep + if request.requesting_worker_id not in self._received_sync_messages: logger.info( f"[{request.requesting_worker_id}] respond with my own stats" ) - asyncio.create_task( - self._endpoint.publish( - [opal_server_config.STATISTICS_STATE_SYNC_CHANNEL], - SyncResponse( - requesting_worker_id=request.requesting_worker_id, - clients=self._state.clients, - rpc_id_to_client_id=self._rpc_id_to_client_id, - ).dict(), - ) + self._publish( + opal_server_config.STATISTICS_STATE_SYNC_CHANNEL, + SyncResponse( + requesting_worker_id=request.requesting_worker_id, + clients=self._state.clients, + rpc_id_to_client_id=self._rpc_id_to_client_id, + ).dict(), ) async def _receive_other_worker_synced_state( @@ -193,6 +279,13 @@ async def _receive_other_worker_synced_state( self._rpc_id_to_client_id = response.rpc_id_to_client_id self._synced_after_wakeup.set() + async def _receive_other_worker_keepalive_message( + self, subscription: Subscription, keepalive_message: dict + ): + async with self._lock: + self._seen_servers[keepalive_message["worker_id"]] = datetime.now() + self._state.servers.add(keepalive_message["worker_id"]) + async def _add_client(self, subscription: Subscription, stats_message: dict): """add client record to statistics state. @@ -270,11 +363,9 @@ async def remove_client(self, rpc_id: str, topics: TopicList, publish=True): "Publish rpc_id={rpc_id} to be removed from statistics", rpc_id=rpc_id, ) - asyncio.create_task( - self._endpoint.publish( - [opal_common_config.STATISTICS_REMOVE_CLIENT_CHANNEL], - rpc_id, - ) + self._publish( + opal_common_config.STATISTICS_REMOVE_CLIENT_CHANNEL, + rpc_id, ) @@ -302,4 +393,18 @@ async def get_statistics(): logger.info("Serving statistics") return stats.state + @router.get("/stats", response_model=ServerStatsBrief) + async def get_stat_counts(): + """Route to serve only server and client instanace counts.""" + if stats is None: + raise HTTPException( + status_code=status.HTTP_501_NOT_IMPLEMENTED, + detail={ + "error": "This OPAL server does not have statistics turned on." + + " To turn on, set this config var: OPAL_STATISTICS_ENABLED=true" + }, + ) + logger.info("Serving brief statistics info") + return stats.state_brief + return router