Skip to content

Commit

Permalink
Merge pull request #606 from permitio/rk/raise-broadcaster-conn-failu…
Browse files Browse the repository at this point in the history
…re-from-watcher-task

BasePolicyWatcherTask: Signal stop if broadcaster fails to connect
  • Loading branch information
roekatz committed Jul 2, 2024
2 parents 384df7e + 6d80630 commit 6fff757
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 33 deletions.
13 changes: 0 additions & 13 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,6 @@ jobs:
tags: |
permitio/opal-client:test
- name: Build client cedar
id: build_client_cedar
uses: docker/build-push-action@v2
with:
file: docker/Dockerfile
push: false
target: client-cedar
cache-from: type=registry,ref=permitio/opal-client-cedar:latest
cache-to: type=inline
load: true
tags: |
permitio/opal-client-cedar:test
- name: Build server
id: build_server
uses: docker/build-push-action@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from opal_server.server import OpalServer

# Server settings
PORT = int(os.environ.get("PORT") or "9123")
PORT = int(os.environ.get("PORT") or "9124")
UPDATES_URL = f"ws://localhost:{PORT}/ws"
DATA_ROUTE = "/fetchable_data"
DATA_URL = f"http://localhost:{PORT}{DATA_ROUTE}"
Expand Down
9 changes: 6 additions & 3 deletions packages/opal-common/opal_common/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ async def run_sync(
"""Shorthand for running a sync function in an executor within an async
context.
For example: def sync_function_that_takes_time_to_run(arg1,
arg2): time.sleep(5) async def async_function():
await run_sync(sync_function_that_takes_time_to_run, 1, arg2=5)
For example:
def sync_function_that_takes_time_to_run(arg1, arg2):
time.sleep(5)
async def async_function():
await run_sync(sync_function_that_takes_time_to_run, 1, arg2=5)
"""
return await asyncio.get_event_loop().run_in_executor(
None, partial(func, *args, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion packages/opal-common/opal_common/git/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def local_repo(tmp_path, helpers: Helpers) -> Repo:
"""
root: Path = tmp_path / "myrepo"
root.mkdir()
repo = Repo.init(root)
repo = Repo.init(root, initial_branch="master")

# create file to delete later
helpers.create_new_file_commit(repo, root / "deleted.rego")
Expand Down
9 changes: 1 addition & 8 deletions packages/opal-server/opal_server/git_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,7 @@ def __init__(
)

async def _get_repo_lock(self):
# # This implementation works across multiple processes/threads, but is not fair (next acquiree is random)
# locks_dir = self._base_dir / ".locks"
# await aiofiles.os.makedirs(str(locks_dir), exist_ok=True)

# return NamedLock(
# locks_dir / GitPolicyFetcher.source_id(self._source), attempt_interval=0.1
# )

# Previous file based implementation worked across multiple processes/threads, but wasn't fair (next acquiree is random)
# This implementation works only within the same process/thread, but is fair (next acquiree is the earliest to enter the lock)
src_id = GitPolicyFetcher.source_id(self._source)
lock = GitPolicyFetcher.repo_locks[src_id] = GitPolicyFetcher.repo_locks.get(
Expand Down
11 changes: 6 additions & 5 deletions packages/opal-server/opal_server/policy/watcher/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ async def _subscribe_internal():
)

if self._pubsub_endpoint.broadcaster is not None:
async with self._pubsub_endpoint.broadcaster.get_listening_context():
await _subscribe_internal()
await self._pubsub_endpoint.broadcaster.get_reader_task()

# Stop the watcher if broadcaster disconnects
try:
async with self._pubsub_endpoint.broadcaster.get_listening_context():
await _subscribe_internal()
await self._pubsub_endpoint.broadcaster.get_reader_task()
finally:
# Stop the watcher if broadcaster disconnects / fails to connect
self.signal_stop()
else:
# If no broadcaster is configured, just subscribe, no need to wait on anything
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from opal_common.utils import get_authorization_header
from opal_server.config import PolicySourceTypes, opal_server_config

PORT = int(os.environ.get("PORT") or "9123")
PORT = int(os.environ.get("PORT") or "9125")

# Basic server route config
WEBHOOK_ROUTE = "/webhook"
Expand Down
2 changes: 1 addition & 1 deletion packages/requires.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
idna>=3.3,<4
typer>=0.4.1,<1
fastapi>=0.109.1,<1
fastapi_websocket_pubsub==0.3.7
fastapi_websocket_pubsub==0.3.9
fastapi_websocket_rpc>=0.1.21,<1
gunicorn>=22.0.0,<23
pydantic[email]>=1.9.1,<2
Expand Down

0 comments on commit 6fff757

Please sign in to comment.