Skip to content

Commit

Permalink
Merge pull request #589 from permitio/rk/improve-statistics
Browse files Browse the repository at this point in the history
Improve Statistics and introduce server (and client) counts
  • Loading branch information
roekatz committed Jun 13, 2024
2 parents 45a4e7f + 731b101 commit f83b373
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 28 deletions.
10 changes: 10 additions & 0 deletions packages/opal-server/opal_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,16 @@ class OpalServerConfig(Confi):
"__opal_stats_state_sync",
description="The topic other servers with statistics provide their state to a waking-up server",
)
STATISTICS_SERVER_KEEPALIVE_CHANNEL = confi.str(
"STATISTICS_SERVER_KEEPALIVE_CHANNEL",
"__opal_stats_server_keepalive",
description="The topic workers use to signal they exist and are alive",
)
STATISTICS_SERVER_KEEPALIVE_TIMEOUT = confi.str(
"STATISTICS_SERVER_KEEPALIVE_TIMEOUT",
20,
description="Timeout for forgetting a server from which a keep-alive haven't been seen (keep-alive frequency would be half of this value)",
)

# Data updates
ALL_DATA_TOPIC = confi.str(
Expand Down
2 changes: 2 additions & 0 deletions packages/opal-server/opal_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@ async def stop_server_background_tasks(self):
tasks.append(asyncio.create_task(self.publisher.stop()))
if self.broadcast_keepalive is not None:
tasks.append(asyncio.create_task(self.broadcast_keepalive.stop()))
if self.opal_statistics is not None:
tasks.append(asyncio.create_task(self.opal_statistics.stop()))

try:
await asyncio.gather(*tasks)
Expand Down
161 changes: 133 additions & 28 deletions packages/opal-server/opal_server/statistics.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import asyncio
import os
from datetime import datetime
from importlib.metadata import version as module_version
from random import uniform
from typing import Dict, List, Optional, Set
from typing import Any, Dict, List, Optional, Set
from uuid import uuid4

import opal_server
import pydantic
from fastapi import APIRouter, HTTPException, status
from fastapi_websocket_pubsub.event_notifier import Subscription, TopicList
from fastapi_websocket_pubsub.pub_sub_server import PubSubEndpoint
from opal_common.async_utils import TasksPool
from opal_common.config import opal_common_config
from opal_common.logger import get_logger
from opal_common.topics.publisher import PeriodicPublisher
from opal_server.config import opal_server_config
from pydantic import BaseModel, Field

Expand All @@ -22,10 +27,22 @@ class ChannelStats(BaseModel):

class ServerStats(BaseModel):
uptime: datetime = Field(..., description="uptime for this opal server worker")
version: str = Field(..., description="opal server version")
clients: Dict[str, List[ChannelStats]] = Field(
...,
description="connected opal clients, each client can have multiple subscriptions",
)
servers: Set[str] = Field(
...,
description="list of all connected opal server replicas",
)


class ServerStatsBrief(BaseModel):
uptime: datetime = Field(..., description="uptime for this opal server worker")
version: str = Field(..., description="opal server version")
client_count: int = Field(..., description="number of connected opal clients")
server_count: int = Field(..., description="number of opal server replicas")


class SyncRequest(BaseModel):
Expand All @@ -38,6 +55,10 @@ class SyncResponse(BaseModel):
rpc_id_to_client_id: Dict[str, str]


class ServerKeepalive(BaseModel):
worker_id: str


logger = get_logger("opal.statistics")

# time to wait before sending statistics
Expand All @@ -57,27 +78,80 @@ class OpalStatistics:
def __init__(self, endpoint):
self._endpoint: PubSubEndpoint = endpoint
self._uptime = datetime.utcnow()
self._workers_count = (lambda envar: int(envar) if envar.isdigit() else 1)(
os.environ.get("UVICORN_NUM_WORKERS", "1")
)

# helps us realize when another server already responded to a sync request
self._worker_id = uuid4().hex

# state: Dict[str, List[ChannelStats]]
# The state is built in this way so it will be easy to understand how much OPAL clients (vs. rpc clients)
# you have connected to your OPAL server and to help merge client lists between servers.
# The state is keyed by unique client id (A unique id that each opal client can set in env var `OPAL_CLIENT_STAT_ID`)
self._state: ServerStats = ServerStats(uptime=self._uptime, clients={})
self._state: ServerStats = ServerStats(
uptime=self._uptime,
clients={},
servers={self._worker_id},
version=module_version(opal_server.__name__),
)

# rpc_id_to_client_id:
# dict to help us get client id without another loop
self._rpc_id_to_client_id: Dict[str, str] = {}
self._lock = asyncio.Lock()

# helps us realize when another server already responded to a sync request
self._worker_id = uuid4().hex
self._synced_after_wakeup = asyncio.Event()
self._received_sync_messages: Set[str] = set()
self._publish_tasks = TasksPool()
self._seen_servers: Dict[str, datetime] = {}
self._periodic_keepalive_task: asyncio.Task | None = None

@property
def state(self) -> ServerStats:
return self._state

@property
def state_brief(self) -> ServerStatsBrief:
return ServerStatsBrief(
uptime=self._state.uptime,
version=self._state.version,
client_count=len(self._state.clients),
server_count=len(self._state.servers) / self._workers_count,
)

async def _expire_old_servers(self):
async with self._lock:
now = datetime.utcnow()
still_alive = {}
for server_id, last_seen in self._seen_servers.items():
if (
now - last_seen
).total_seconds() < opal_server_config.STATISTICS_SERVER_KEEPALIVE_TIMEOUT:
still_alive[server_id] = last_seen
self._seen_servers = still_alive
self._state.servers = {self._worker_id} | set(self._seen_servers.keys())

async def _periodic_server_keepalive(self):
while True:
try:
await self._expire_old_servers()
self._publish(
opal_server_config.STATISTICS_SERVER_KEEPALIVE_CHANNEL,
ServerKeepalive(worker_id=self._worker_id).dict(),
)
await asyncio.sleep(
opal_server_config.STATISTICS_SERVER_KEEPALIVE_TIMEOUT / 2
)
except asyncio.CancelledError:
logger.debug("Statistics: periodic server keepalive cancelled")
return
except Exception as e:
logger.exception("Statistics: periodic server keepalive failed")
logger.exception("Statistics: periodic server keepalive failed")

def _publish(self, channel: str, message: Any):
self._publish_tasks.add_task(self._endpoint.publish([channel], message))

async def run(self):
"""subscribe to two channels to be able to sync add and delete of
clients."""
Expand All @@ -89,6 +163,10 @@ async def run(self):
[opal_server_config.STATISTICS_STATE_SYNC_CHANNEL],
self._receive_other_worker_synced_state,
)
await self._endpoint.subscribe(
[opal_server_config.STATISTICS_SERVER_KEEPALIVE_CHANNEL],
self._receive_other_worker_keepalive_message,
)
await self._endpoint.subscribe(
[opal_common_config.STATISTICS_ADD_CLIENT_CHANNEL], self._add_client
)
Expand All @@ -101,17 +179,24 @@ async def run(self):
# counting on the broadcaster to listen and to replicate the message
# to the other workers / server nodes in the networks.
# However, since broadcaster is using asyncio.create_task(), there is a
# race condition that is mitigate by this asyncio.sleep() call.
# race condition that is mitigated by this asyncio.sleep() call.
await asyncio.sleep(SLEEP_TIME_FOR_BROADCASTER_READER_TO_START)
# Let all the other opal servers know that new opal server started
logger.info(f"sending stats wakeup message: {self._worker_id}")
asyncio.create_task(
self._endpoint.publish(
[opal_server_config.STATISTICS_WAKEUP_CHANNEL],
SyncRequest(requesting_worker_id=self._worker_id).dict(),
)
self._publish(
opal_server_config.STATISTICS_WAKEUP_CHANNEL,
SyncRequest(requesting_worker_id=self._worker_id).dict(),
)
self._periodic_keepalive_task = asyncio.create_task(
self._periodic_server_keepalive()
)

async def stop(self):
if self._periodic_keepalive_task:
self._periodic_keepalive_task.cancel()
await self._periodic_keepalive_task
self._periodic_keepalive_task = None

async def _sync_remove_client(self, subscription: Subscription, rpc_id: str):
"""helper function to recall remove client in all servers.
Expand All @@ -128,7 +213,9 @@ async def _receive_other_worker_wakeup_message(
"""Callback when new server wakes up and requests our statistics state.
Sends state only if we have state of our own and another
response to that request was not already received.
response to that request was not already received. Always reply
with hello message to refresh the "workers" state of other
servers.
"""
try:
request = SyncRequest(**sync_request)
Expand All @@ -146,23 +233,22 @@ async def _receive_other_worker_wakeup_message(
return

logger.debug(f"received stats wakeup message: {request.requesting_worker_id}")

if len(self._state.clients):
# wait random time in order to reduce the number of messages sent by all the other opal servers
await asyncio.sleep(uniform(MIN_TIME_TO_WAIT, MAX_TIME_TO_WAIT))
# if didn't got any other message it means that this server is the first one to pass the sleep
if not request.requesting_worker_id in self._received_sync_messages:
# if didn't get any other message it means that this server is the first one to pass the sleep
if request.requesting_worker_id not in self._received_sync_messages:
logger.info(
f"[{request.requesting_worker_id}] respond with my own stats"
)
asyncio.create_task(
self._endpoint.publish(
[opal_server_config.STATISTICS_STATE_SYNC_CHANNEL],
SyncResponse(
requesting_worker_id=request.requesting_worker_id,
clients=self._state.clients,
rpc_id_to_client_id=self._rpc_id_to_client_id,
).dict(),
)
self._publish(
opal_server_config.STATISTICS_STATE_SYNC_CHANNEL,
SyncResponse(
requesting_worker_id=request.requesting_worker_id,
clients=self._state.clients,
rpc_id_to_client_id=self._rpc_id_to_client_id,
).dict(),
)

async def _receive_other_worker_synced_state(
Expand Down Expand Up @@ -193,6 +279,13 @@ async def _receive_other_worker_synced_state(
self._rpc_id_to_client_id = response.rpc_id_to_client_id
self._synced_after_wakeup.set()

async def _receive_other_worker_keepalive_message(
self, subscription: Subscription, keepalive_message: dict
):
async with self._lock:
self._seen_servers[keepalive_message["worker_id"]] = datetime.now()
self._state.servers.add(keepalive_message["worker_id"])

async def _add_client(self, subscription: Subscription, stats_message: dict):
"""add client record to statistics state.
Expand Down Expand Up @@ -270,11 +363,9 @@ async def remove_client(self, rpc_id: str, topics: TopicList, publish=True):
"Publish rpc_id={rpc_id} to be removed from statistics",
rpc_id=rpc_id,
)
asyncio.create_task(
self._endpoint.publish(
[opal_common_config.STATISTICS_REMOVE_CLIENT_CHANNEL],
rpc_id,
)
self._publish(
opal_common_config.STATISTICS_REMOVE_CLIENT_CHANNEL,
rpc_id,
)


Expand Down Expand Up @@ -302,4 +393,18 @@ async def get_statistics():
logger.info("Serving statistics")
return stats.state

@router.get("/stats", response_model=ServerStatsBrief)
async def get_stat_counts():
"""Route to serve only server and client instanace counts."""
if stats is None:
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail={
"error": "This OPAL server does not have statistics turned on."
+ " To turn on, set this config var: OPAL_STATISTICS_ENABLED=true"
},
)
logger.info("Serving brief statistics info")
return stats.state_brief

return router

0 comments on commit f83b373

Please sign in to comment.