Skip to content

Commit

Permalink
Merge pull request #480 from permitio/rk/fix-webhook-without-broadcaster
Browse files Browse the repository at this point in the history
Webhook: Fix not listening to notifications when no broadcaster
  • Loading branch information
roekatz committed Jun 27, 2023
2 parents d11bb2d + 63e1323 commit ec75ecc
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions packages/opal-server/opal_server/policy/watcher/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,27 @@ async def _on_webhook(self, topic: Topic, data: Any):
async def _listen_to_webhook_notifications(self):
# Webhook api route can be hit randomly in all workers, so it publishes a message to the webhook topic.
# This listener, running in the leader's context, would actually trigger the repo pull

async def _subscribe_internal():
logger.info(
"listening on webhook topic: '{topic}'",
topic=opal_server_config.POLICY_REPO_WEBHOOK_TOPIC,
)
await self._pubsub_endpoint.subscribe(
[opal_server_config.POLICY_REPO_WEBHOOK_TOPIC],
self._on_webhook,
)

if self._pubsub_endpoint.broadcaster is not None:
async with self._pubsub_endpoint.broadcaster.get_listening_context():
logger.info(
"listening on webhook topic: '{topic}'",
topic=opal_server_config.POLICY_REPO_WEBHOOK_TOPIC,
)

await self._pubsub_endpoint.subscribe(
[opal_server_config.POLICY_REPO_WEBHOOK_TOPIC],
self._on_webhook,
)
await _subscribe_internal()
await self._pubsub_endpoint.broadcaster.get_reader_task()

# Stop the watcher if broadcaster disconnects
self.signal_stop()
else:
# If no broadcaster is configured, just subscribe, no need to wait on anything
await _subscribe_internal()

async def start(self):
"""starts the policy watcher and registers a failure callback to
Expand Down

0 comments on commit ec75ecc

Please sign in to comment.