Skip to content

Commit

Permalink
introduce retry for fetching data source configurations (#502)
Browse files Browse the repository at this point in the history
* introduce retry for fetching data source configurations

* set raise_for_status to true for HttpFetchProvider to raise exception for >400 status code for retry

* update docs for moving policy and data retry config variables under respective headings

---------

Co-authored-by: thilak reddy <[email protected]>
  • Loading branch information
thilak009 and thilak reddy authored Dec 12, 2023
1 parent ba35715 commit 73b60fa
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 13 deletions.
9 changes: 5 additions & 4 deletions documentation/docs/getting-started/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ Please use this table as a reference.
| POLICY_STORE_AUTH_OAUTH_CLIENT_SECRET | The client secret OPAL will use to authenticate against the OAuth server. | |
| POLICY_STORE_CONN_RETRY | Retry options when connecting to the policy store (i.e. the agent that handles the policy, e.g. OPA). | |
| POLICY_STORE_POLICY_PATHS_TO_IGNORE | Which policy paths pushed to the client should be ignored. List of glob style paths, or paths without wildcards but ending with "/\*\*" indicating a parent path (ignoring all under it). | |
| POLICY_UPDATER_CONN_RETRY | Retry options when connecting to the policy source (e.g. the policy bundle server). | |
| INLINE_OPA_ENABLED | Whether or not OPAL should run OPA by itself in the same container. | |
| INLINE_OPA_CONFIG | If inline OPA is indeed enabled, the user can set the [server configuration options](https://docs.opal.ac/getting-started/running-opal/run-opal-client/opa-runner-parameters) that affects how OPA will start when running `opa run --server` inline. Watch escaping quotes. | {"config_file":"/mnt/opa/config"} |
| INLINE_OPA_LOG_FORMAT | | |
Expand All @@ -140,9 +139,10 @@ Please use this table as a reference.

## Policy Updater Configuration Variables

| Variables | Description | Example |
| ------------------------ | --------------------------------------------------------------------------------------- | ------- |
| POLICY_SUBSCRIPTION_DIRS | The directories in a policy repo we should subscribe to for policy code (rego) modules. | |
| Variables | Description | Example |
| ------------------------- | --------------------------------------------------------------------------------------- | ------- |
| POLICY_SUBSCRIPTION_DIRS | The directories in a policy repo we should subscribe to for policy code (rego) modules. | |
| POLICY_UPDATER_CONN_RETRY | Retry options when connecting to the policy source (e.g. the policy bundle server | |

## Data Updater Configuration Variables

Expand All @@ -155,6 +155,7 @@ Please use this table as a reference.
| SHOULD_REPORT_ON_DATA_UPDATES | Should the client report on updates to callbacks defined in DEFAULT_UPDATE_CALLBACKS or within the given updates. | |
| DEFAULT_UPDATE_CALLBACK_CONFIG | | |
| DEFAULT_UPDATE_CALLBACKS | Where/How the client should report on the completion of data updates. | |
| DATA_STORE_CONN_RETRY | Retry options when connecting to the base data source (e.g. an external API server which returns data snapshot). | |

## OPA Transaction Log / Healthcheck Configuration Variables

Expand Down
22 changes: 17 additions & 5 deletions packages/opal-client/opal_client/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from enum import Enum

from opal_client.engine.options import CedarServerOptions, OpaServerOptions
from opal_client.policy.options import PolicyConnRetryOptions
from opal_client.policy.options import ConnRetryOptions
from opal_client.policy_store.schemas import PolicyStoreAuth, PolicyStoreTypes
from opal_common.confi import Confi, confi
from opal_common.config import opal_common_config
Expand Down Expand Up @@ -49,16 +49,16 @@ class OpalClientConfig(Confi):
description="the client secret OPAL will use to authenticate against the OAuth server.",
)

POLICY_STORE_CONN_RETRY: PolicyConnRetryOptions = confi.model(
POLICY_STORE_CONN_RETRY: ConnRetryOptions = confi.model(
"POLICY_STORE_CONN_RETRY",
PolicyConnRetryOptions,
ConnRetryOptions,
# defaults are being set according to PolicyStoreConnRetryOptions pydantic definitions (see class)
{},
description="retry options when connecting to the policy store (i.e. the agent that handles the policy, e.g. OPA)",
)
POLICY_UPDATER_CONN_RETRY: PolicyConnRetryOptions = confi.model(
POLICY_UPDATER_CONN_RETRY: ConnRetryOptions = confi.model(
"POLICY_UPDATER_CONN_RETRY",
PolicyConnRetryOptions,
ConnRetryOptions,
{
"wait_strategy": "random_exponential",
"max_wait": 10,
Expand All @@ -68,6 +68,18 @@ class OpalClientConfig(Confi):
description="retry options when connecting to the policy source (e.g. the policy bundle server)",
)

DATA_STORE_CONN_RETRY: ConnRetryOptions = confi.model(
"DATA_STORE_CONN_RETRY",
ConnRetryOptions,
{
"wait_strategy": "random_exponential",
"max_wait": 10,
"attempts": 5,
"wait_time": 1,
},
description="retry options when connecting to the base data source (e.g. an external API server which returns data snapshot)",
)

POLICY_STORE_POLICY_PATHS_TO_IGNORE = confi.list(
"POLICY_STORE_POLICY_PATHS_TO_IGNORE",
[],
Expand Down
13 changes: 12 additions & 1 deletion packages/opal-client/opal_client/data/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@
class DataFetcher:
"""fetches policy data from backend."""

def __init__(self, default_data_url: str = None, token: str = None):
# Use as default config the configuration provider by opal_client_config.DATA_STORE_CONN_RETRY
# Add reraise as true (an option not available for control from the higher-level config)
DEFAULT_RETRY_CONFIG = opal_client_config.DATA_STORE_CONN_RETRY.toTenacityConfig()
DEFAULT_RETRY_CONFIG["reraise"] = True

def __init__(
self, default_data_url: str = None, token: str = None, retry_config=None
):
"""
Args:
Expand All @@ -24,11 +31,15 @@ def __init__(self, default_data_url: str = None, token: str = None):
# defaults
default_data_url: str = default_data_url or opal_client_config.DEFAULT_DATA_URL
token: str = token or opal_client_config.CLIENT_TOKEN
self._retry_config = (
retry_config if retry_config is not None else self.DEFAULT_RETRY_CONFIG
)
# The underlying fetching engine
self._engine = FetchingEngine(
worker_count=opal_common_config.FETCHING_WORKER_COUNT,
callback_timeout=opal_common_config.FETCHING_CALLBACK_TIMEOUT,
enqueue_timeout=opal_common_config.FETCHING_ENQUEUE_TIMEOUT,
retry_config=self._retry_config,
)
self._data_url = default_data_url
self._token = token
Expand Down
2 changes: 1 addition & 1 deletion packages/opal-client/opal_client/policy/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class WaitStrategy(str, Enum):
random_exponential = "random_exponential"


class PolicyConnRetryOptions(BaseModel):
class ConnRetryOptions(BaseModel):
wait_strategy: WaitStrategy = Field(
WaitStrategy.fixed,
description="waiting strategy (e.g. fixed for fixed-time waiting, exponential for exponential back-off) (default fixed)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(
worker_count: int = DEFAULT_WORKER_COUNT,
callback_timeout: int = DEFAULT_CALLBACK_TIMEOUT,
enqueue_timeout: int = DEFAULT_ENQUEUE_TIMEOUT,
retry_config=None,
) -> None:
# The internal task queue (created at start_workers)
self._queue: asyncio.Queue = None
Expand All @@ -51,6 +52,7 @@ def __init__(
self._callback_timeout = callback_timeout
# time in seconds before time out on adding a task to queue (when full)
self._enqueue_timeout = enqueue_timeout
self._retry_config = retry_config

def start_workers(self):
if self._queue is None:
Expand Down Expand Up @@ -145,7 +147,9 @@ async def queue_url(
fetcher = config.fetcher

# init a URL event
event = FetchEvent(url=url, fetcher=fetcher, config=config)
event = FetchEvent(
url=url, fetcher=fetcher, config=config, retry=self._retry_config
)
return await self.queue_fetch_event(event, callback)

async def queue_fetch_event(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ async def __aenter__(self):
headers = {}
if self._event.config.headers is not None:
headers = self._event.config.headers
self._session = await ClientSession(headers=headers).__aenter__()
self._session = await ClientSession(
headers=headers, raise_for_status=True
).__aenter__()
return self

async def __aexit__(self, exc_type=None, exc_val=None, tb=None):
Expand Down

0 comments on commit 73b60fa

Please sign in to comment.