Skip to content

Commit

Permalink
Added support for custom data in callbacks.
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaul Kremer authored and roekatz committed Feb 14, 2024
1 parent d829fe1 commit d2fa7ef
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 1 deletion.
13 changes: 12 additions & 1 deletion packages/opal-client/opal_client/callbacks/reporter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from typing import List, Optional
from typing import Any, Awaitable, Callable, Dict, List, Optional

import aiohttp
from opal_client.callbacks.register import CallbackConfig, CallbacksRegister
Expand All @@ -9,6 +9,8 @@
from opal_common.logger import logger
from opal_common.schemas.data import DataUpdateReport

GetUserDataHandler = Callable[[DataUpdateReport], Awaitable[Dict[str, Any]]]


class CallbacksReporter:
"""can send a report to callbacks registered on the callback register."""
Expand All @@ -18,13 +20,19 @@ def __init__(
) -> None:
self._register = register
self._fetcher = data_fetcher or DataFetcher()
self._get_user_data_handler: Optional[GetUserDataHandler] = None

async def start(self):
await self._fetcher.start()

async def stop(self):
await self._fetcher.stop()

def set_user_data_handler(self, handler: GetUserDataHandler):
if self._get_user_data_handler is not None:
logger.warning("set_user_data_handler called and already have a handler.")
self._get_user_data_handler = handler

async def report_update_results(
self,
report: DataUpdateReport,
Expand All @@ -33,6 +41,9 @@ async def report_update_results(
try:
# all the urls that will be eventually called by the fetcher
urls = []
if self._get_user_data_handler is not None:
report = report.copy()
report.user_data = await self._get_user_data_handler(report)
report_data = report.json()

# first we add the callback urls from the callback register
Expand Down
4 changes: 4 additions & 0 deletions packages/opal-client/opal_client/data/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,7 @@ async def _set_policy_data(
await tx.set_policy_data(data, path=path)
else:
await tx.patch_policy_data(data, path=path)

@property
def callbacks_reporter(self) -> CallbacksReporter:
return self._callbacks_reporter
8 changes: 8 additions & 0 deletions packages/opal-client/opal_client/policy/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,11 @@ async def handle_policy_updates(self):
break
except Exception:
logger.exception("Failed to update policy")

@property
def topics(self) -> List[str]:
return self._topics

@property
def callbacks_reporter(self) -> CallbacksReporter:
return self._callbacks_reporter
1 change: 1 addition & 0 deletions packages/opal-common/opal_common/schemas/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,4 @@ class DataUpdateReport(BaseModel):
reports: List[DataEntryReport]
# in case this is a policy update, the new hash committed the policy store.
policy_hash: Optional[str] = None
user_data: Dict[str, Any] = {}

0 comments on commit d2fa7ef

Please sign in to comment.