Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce retry for fetching data source configurations #502

Merged
merged 7 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions documentation/docs/getting-started/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ Please use this table as a reference.
| POLICY_STORE_AUTH_OAUTH_CLIENT_SECRET | The client secret OPAL will use to authenticate against the OAuth server. | |
| POLICY_STORE_CONN_RETRY | Retry options when connecting to the policy store (i.e. the agent that handles the policy, e.g. OPA). | |
| POLICY_STORE_POLICY_PATHS_TO_IGNORE | Which policy paths pushed to the client should be ignored. List of glob style paths, or paths without wildcards but ending with "/\*\*" indicating a parent path (ignoring all under it). | |
| POLICY_UPDATER_CONN_RETRY | Retry options when connecting to the policy source (e.g. the policy bundle server). | |
| INLINE_OPA_ENABLED | Whether or not OPAL should run OPA by itself in the same container. | |
| INLINE_OPA_CONFIG | If inline OPA is indeed enabled, the user can set the [server configuration options](https://docs.opal.ac/getting-started/running-opal/run-opal-client/opa-runner-parameters) that affects how OPA will start when running `opa run --server` inline. Watch escaping quotes. | {"config_file":"/mnt/opa/config"} |
| INLINE_OPA_LOG_FORMAT | | |
Expand All @@ -140,9 +139,10 @@ Please use this table as a reference.

## Policy Updater Configuration Variables

| Variables | Description | Example |
| ------------------------ | --------------------------------------------------------------------------------------- | ------- |
| POLICY_SUBSCRIPTION_DIRS | The directories in a policy repo we should subscribe to for policy code (rego) modules. | |
| Variables | Description | Example |
| ------------------------- | --------------------------------------------------------------------------------------- | ------- |
| POLICY_SUBSCRIPTION_DIRS | The directories in a policy repo we should subscribe to for policy code (rego) modules. | |
| POLICY_UPDATER_CONN_RETRY | Retry options when connecting to the policy source (e.g. the policy bundle server | |

## Data Updater Configuration Variables

Expand All @@ -155,6 +155,7 @@ Please use this table as a reference.
| SHOULD_REPORT_ON_DATA_UPDATES | Should the client report on updates to callbacks defined in DEFAULT_UPDATE_CALLBACKS or within the given updates. | |
| DEFAULT_UPDATE_CALLBACK_CONFIG | | |
| DEFAULT_UPDATE_CALLBACKS | Where/How the client should report on the completion of data updates. | |
| DATA_STORE_CONN_RETRY | Retry options when connecting to the base data source (e.g. an external API server which returns data snapshot). | |

## OPA Transaction Log / Healthcheck Configuration Variables

Expand Down
22 changes: 17 additions & 5 deletions packages/opal-client/opal_client/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from enum import Enum

from opal_client.engine.options import CedarServerOptions, OpaServerOptions
from opal_client.policy.options import PolicyConnRetryOptions
from opal_client.policy.options import ConnRetryOptions
from opal_client.policy_store.schemas import PolicyStoreAuth, PolicyStoreTypes
from opal_common.confi import Confi, confi
from opal_common.config import opal_common_config
Expand Down Expand Up @@ -49,16 +49,16 @@ class OpalClientConfig(Confi):
description="the client secret OPAL will use to authenticate against the OAuth server.",
)

POLICY_STORE_CONN_RETRY: PolicyConnRetryOptions = confi.model(
POLICY_STORE_CONN_RETRY: ConnRetryOptions = confi.model(
"POLICY_STORE_CONN_RETRY",
PolicyConnRetryOptions,
ConnRetryOptions,
# defaults are being set according to PolicyStoreConnRetryOptions pydantic definitions (see class)
{},
description="retry options when connecting to the policy store (i.e. the agent that handles the policy, e.g. OPA)",
)
POLICY_UPDATER_CONN_RETRY: PolicyConnRetryOptions = confi.model(
POLICY_UPDATER_CONN_RETRY: ConnRetryOptions = confi.model(
"POLICY_UPDATER_CONN_RETRY",
PolicyConnRetryOptions,
ConnRetryOptions,
{
"wait_strategy": "random_exponential",
"max_wait": 10,
Expand All @@ -68,6 +68,18 @@ class OpalClientConfig(Confi):
description="retry options when connecting to the policy source (e.g. the policy bundle server)",
)

DATA_STORE_CONN_RETRY: ConnRetryOptions = confi.model(
"DATA_STORE_CONN_RETRY",
ConnRetryOptions,
{
"wait_strategy": "random_exponential",
"max_wait": 10,
"attempts": 5,
"wait_time": 1,
},
description="retry options when connecting to the base data source (e.g. an external API server which returns data snapshot)",
)

POLICY_STORE_POLICY_PATHS_TO_IGNORE = confi.list(
"POLICY_STORE_POLICY_PATHS_TO_IGNORE",
[],
Expand Down
13 changes: 12 additions & 1 deletion packages/opal-client/opal_client/data/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@
class DataFetcher:
"""fetches policy data from backend."""

def __init__(self, default_data_url: str = None, token: str = None):
# Use as default config the configuration provider by opal_client_config.DATA_STORE_CONN_RETRY
# Add reraise as true (an option not available for control from the higher-level config)
DEFAULT_RETRY_CONFIG = opal_client_config.DATA_STORE_CONN_RETRY.toTenacityConfig()
DEFAULT_RETRY_CONFIG["reraise"] = True
roekatz marked this conversation as resolved.
Show resolved Hide resolved

def __init__(
self, default_data_url: str = None, token: str = None, retry_config=None
):
"""

Args:
Expand All @@ -24,11 +31,15 @@ def __init__(self, default_data_url: str = None, token: str = None):
# defaults
default_data_url: str = default_data_url or opal_client_config.DEFAULT_DATA_URL
token: str = token or opal_client_config.CLIENT_TOKEN
self._retry_config = (
retry_config if retry_config is not None else self.DEFAULT_RETRY_CONFIG
)
# The underlying fetching engine
self._engine = FetchingEngine(
worker_count=opal_common_config.FETCHING_WORKER_COUNT,
callback_timeout=opal_common_config.FETCHING_CALLBACK_TIMEOUT,
enqueue_timeout=opal_common_config.FETCHING_ENQUEUE_TIMEOUT,
retry_config=self._retry_config,
)
self._data_url = default_data_url
self._token = token
Expand Down
2 changes: 1 addition & 1 deletion packages/opal-client/opal_client/policy/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class WaitStrategy(str, Enum):
random_exponential = "random_exponential"


class PolicyConnRetryOptions(BaseModel):
class ConnRetryOptions(BaseModel):
wait_strategy: WaitStrategy = Field(
WaitStrategy.fixed,
description="waiting strategy (e.g. fixed for fixed-time waiting, exponential for exponential back-off) (default fixed)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(
worker_count: int = DEFAULT_WORKER_COUNT,
callback_timeout: int = DEFAULT_CALLBACK_TIMEOUT,
enqueue_timeout: int = DEFAULT_ENQUEUE_TIMEOUT,
retry_config=None,
) -> None:
# The internal task queue (created at start_workers)
self._queue: asyncio.Queue = None
Expand All @@ -51,6 +52,7 @@ def __init__(
self._callback_timeout = callback_timeout
# time in seconds before time out on adding a task to queue (when full)
self._enqueue_timeout = enqueue_timeout
self._retry_config = retry_config

def start_workers(self):
if self._queue is None:
Expand Down Expand Up @@ -145,7 +147,9 @@ async def queue_url(
fetcher = config.fetcher

# init a URL event
event = FetchEvent(url=url, fetcher=fetcher, config=config)
event = FetchEvent(
url=url, fetcher=fetcher, config=config, retry=self._retry_config
)
return await self.queue_fetch_event(event, callback)

async def queue_fetch_event(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ async def __aenter__(self):
headers = {}
if self._event.config.headers is not None:
headers = self._event.config.headers
self._session = await ClientSession(headers=headers).__aenter__()
self._session = await ClientSession(
headers=headers, raise_for_status=True
).__aenter__()
return self

async def __aexit__(self, exc_type=None, exc_val=None, tb=None):
Expand Down
Loading