Skip to content

Commit

Permalink
api.main: add endpoint to get existing subscriptions
Browse files Browse the repository at this point in the history
Introduce GET `/stats/subscriptions` endpoint to get
details of existing subscriptions i.e. subscription ID,
channel, owner, connection creation timestamp and last
poll timestamp.
Only admin users are allowed to access the endpoint.

Signed-off-by: Jeny Sadadia <[email protected]>
  • Loading branch information
Jeny Sadadia authored and nuclearcat committed Dec 22, 2023
1 parent 7643078 commit 2adeddc
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 3 deletions.
8 changes: 7 additions & 1 deletion api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
get_model_from_kind
)
from .paginator_models import PageModel
from .pubsub import PubSub, Subscription
from .pubsub import PubSub, Subscription, SubscriptionStats
from .user_manager import get_user_manager, create_user_manager
from .user_models import (
User,
Expand Down Expand Up @@ -692,6 +692,12 @@ async def pop(list_name: str, user: User = Depends(get_current_user)):
return await pubsub.pop(list_name)


@app.get('/stats/subscriptions', response_model=List[SubscriptionStats])
async def stats(user: User = Depends(get_current_superuser)):
"""Get details of all existing subscriptions"""
return await pubsub.subscription_stats()


# -----------------------------------------------------------------------------
# Regression

Expand Down
34 changes: 32 additions & 2 deletions api/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import asyncio

import json
from datetime import datetime
from typing import Optional
from redis import asyncio as aioredis
from cloudevents.http import CloudEvent, to_json
from pydantic import BaseModel, Field
Expand All @@ -28,6 +30,16 @@ class Subscription(BaseModel):
)


class SubscriptionStats(Subscription):
"""Pub/Sub subscription statistics object model"""
created: datetime = Field(
description='Timestamp of connection creation'
)
last_poll: Optional[datetime] = Field(
description='Timestamp when connection last polled for data'
)


class PubSub:
"""Pub/Sub implementation class
Expand Down Expand Up @@ -100,9 +112,11 @@ async def subscribe(self, channel, user):
async with self._lock:
redis_sub = self._redis.pubsub()
sub = Subscription(id=sub_id, channel=channel, user=user)
self._subscriptions[sub_id] = {'redis_sub': redis_sub,
'sub': sub}
await redis_sub.subscribe(channel)
self._subscriptions[sub_id] = {'redis_sub': redis_sub,
'sub': sub,
'created': datetime.utcnow(),
'last_poll': None}
self._update_channels()
self._start_keep_alive_timer()
return sub
Expand Down Expand Up @@ -141,6 +155,7 @@ async def listen(self, sub_id, user=None):
raise RuntimeError(f"Subscription {sub_id} "
f"not owned by {user}")
while True:
self._subscriptions[sub_id]['last_poll'] = datetime.utcnow()
msg = await sub['redis_sub'].get_message(
ignore_subscribe_messages=True, timeout=1.0
)
Expand Down Expand Up @@ -210,3 +225,18 @@ async def push_cloudevent(self, list_name, data, attributes=None):
}
event = CloudEvent(attributes=attributes, data=data)
await self.push(list_name, to_json(event))

async def subscription_stats(self):
"""Get existing subscription details"""
subscriptions = []
for _, subscription in self._subscriptions.items():
sub = subscription['sub']
stats = SubscriptionStats(
id=sub.id,
channel=sub.channel,
user=sub.user,
created=subscription['created'],
last_poll=subscription['last_poll']
)
subscriptions.append(stats)
return subscriptions

0 comments on commit 2adeddc

Please sign in to comment.