Skip to content

Commit

Permalink
Webhook: Fix not listening to notifications when no broadcaster
Browse files Browse the repository at this point in the history
  • Loading branch information
roekatz committed Jun 27, 2023
1 parent d11bb2d commit 8cf1ac3
Showing 1 changed file with 19 additions and 13 deletions.
32 changes: 19 additions & 13 deletions packages/opal-server/opal_server/policy/watcher/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,27 @@ async def _on_webhook(self, topic: Topic, data: Any):
async def _listen_to_webhook_notifications(self):
# Webhook api route can be hit randomly in all workers, so it publishes a message to the webhook topic.
# This listener, running in the leader's context, would actually trigger the repo pull

async def _listen_to_webhook_internal():
logger.info(
"listening on webhook topic: '{topic}'",
topic=opal_server_config.POLICY_REPO_WEBHOOK_TOPIC,
)

await self._pubsub_endpoint.subscribe(
[opal_server_config.POLICY_REPO_WEBHOOK_TOPIC],
self._on_webhook,
)
await self._pubsub_endpoint.broadcaster.get_reader_task()

# Stop the watcher if broadcaster disconnects
self.signal_stop()

if self._pubsub_endpoint.broadcaster is not None:
async with self._pubsub_endpoint.broadcaster.get_listening_context():
logger.info(
"listening on webhook topic: '{topic}'",
topic=opal_server_config.POLICY_REPO_WEBHOOK_TOPIC,
)

await self._pubsub_endpoint.subscribe(
[opal_server_config.POLICY_REPO_WEBHOOK_TOPIC],
self._on_webhook,
)
await self._pubsub_endpoint.broadcaster.get_reader_task()

# Stop the watcher if broadcaster disconnects
self.signal_stop()
await _listen_to_webhook_internal()
else:
await _listen_to_webhook_internal()

async def start(self):
"""starts the policy watcher and registers a failure callback to
Expand Down

0 comments on commit 8cf1ac3

Please sign in to comment.