Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for save_method PATCH #483

Merged
merged 4 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion documentation/docs/tutorials/trigger_data_updates.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,45 @@ Example:
- All the APIs in opal are OpenAPI / Swagger based (via FastAPI).
- Check out the [API docs on your running OPAL-server](http://localhost:7002/docs#/Data%20Updates/publish_data_update_event_data_config_post) -- this link assumes you have the server running on `http://localhost:7002`
- You can also [generate an API-client](https://github.com/OpenAPITools/openapi-generator) in the language of your choice using the [OpenAPI spec provided by the server](http://localhost:7002/openapi.json)

#### Using PATCH save method
- There are two save methods of triggering data update, PUT and PATCH defined in the payload using the `save_method` field
- Using PUT basically places(overrides if path exists) data at the specified `dst_path`
- When using PATCH, the `data` field should conform to JSON patch schema according to [RFC 6902](https://datatracker.ietf.org/doc/html/rfc6902#section-3) and this applies patch operation on the data as specified
- Let's say the following is the
- >NOTE: OPA does not support `move` operation of JSON patch, [refer discussion for the same](https://github.com/orgs/open-policy-agent/discussions/462#discussioncomment-6343050) and hence OPAL cannot support `move` operation as well
- Example: Consider this [JSON data](https://github.com/permitio/opal-example-policy-repo/blob/master/data.json) from the [opal-example-policy-repo](https://github.com/permitio/opal-example-policy-repo)
- Let's say a user is deleted from the system and we would want that user details to be removed from the JSON, let's remove bob from the list, we can use the `remove` operation of JSON patch to achieve this
- The following API request will remove `bob` from the `users` JSON
```
curl -X 'POST' \
'http://localhost:7002/data/config' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"id": "randomid",
"entries": [
{
"url": "",
"config": {},
"topics": [
"policy_data"
],
"dst_path": "/users",
"save_method": "PATCH",
"data": [
{
"op": "remove",
"path": "/bob"
}
]
}
],
"reason": "user bob is deleted from the system",
"callback": {
"callbacks": []
}
}'
```
### Option 3: Write your own - import code from the OPAL's packages
- One of the great things about OPAL being written in Python is that you can easily reuse its code.
See the code for the `DataUpdate` model at [opal_common/schemas/data.py](https://github.com/permitio/opal/blob/master/packages/opal-common/opal_common/schemas/data.py) and use it within your own code to send an update to the server
27 changes: 20 additions & 7 deletions packages/opal-client/opal_client/data/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from opal_common.schemas.store import TransactionType
from opal_common.security.sslcontext import get_custom_ssl_context
from opal_common.utils import get_authorization_header
from pydantic.json import pydantic_encoder


class DataUpdater:
Expand Down Expand Up @@ -296,7 +297,7 @@ def calc_hash(data):
"""
try:
if not isinstance(data, str):
data = json.dumps(data, default=str)
data = json.dumps(data, default=pydantic_encoder)
roekatz marked this conversation as resolved.
Show resolved Hide resolved
return hashlib.sha256(data.encode("utf-8")).hexdigest()
except:
logger.exception("Failed to calculate hash for data {data}", data=data)
Expand Down Expand Up @@ -409,13 +410,17 @@ async def update_policy_data(
and isinstance(policy_data, dict)
):
await self._set_split_policy_data(
store_transaction, url=url, data=policy_data
store_transaction,
url=url,
save_method=entry.save_method,
data=policy_data,
)
else:
await self._set_policy_data(
store_transaction,
url=url,
path=policy_store_path,
save_method=entry.save_method,
data=policy_data,
)
# No exception we we're able to save to the policy-store
Expand Down Expand Up @@ -447,19 +452,27 @@ async def update_policy_data(
)
)

async def _set_split_policy_data(self, tx, url: str, data: Dict[str, Any]):
async def _set_split_policy_data(
self, tx, url: str, save_method: str, data: Dict[str, Any]
):
"""Split data writes to root ("/") path, so they won't overwrite other
sources."""
logger.info("Splitting root data to {n} keys", n=len(data))

for prefix, obj in data.items():
await self._set_policy_data(tx, url=url, path=f"/{prefix}", data=obj)
await self._set_policy_data(
tx, url=url, path=f"/{prefix}", save_method=save_method, data=obj
)

async def _set_policy_data(self, tx, url: str, path: str, data: JsonableValue):
async def _set_policy_data(
self, tx, url: str, path: str, save_method: str, data: JsonableValue
):
logger.info(
"Saving fetched data to policy-store: source url='{url}', destination path='{path}'",
url=url,
path=path or "/",
)

await tx.set_policy_data(data, path=path)
if save_method == "PUT":
await tx.set_policy_data(data, path=path)
else:
roekatz marked this conversation as resolved.
Show resolved Hide resolved
await tx.patch_policy_data(data, path=path)
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio
import json
from typing import Any, Dict, List, Optional

import jsonpatch
from opal_common.schemas.data import custom_encoder
from opal_common.schemas.policy import PolicyBundle
from opal_common.schemas.store import StoreTransaction
from opal_common.schemas.store import JSONPatchAction, StoreTransaction
from pydantic import BaseModel

from .base_policy_store_client import BasePolicyStoreClient, JsonableValue
Expand Down Expand Up @@ -53,6 +56,22 @@ async def set_policy_data(
self._data[path] = policy_data
self.has_data_event.set()

async def patch_policy_data(
self,
policy_data: List[JSONPatchAction],
path: str = "",
transaction_id: Optional[str] = None,
):
for i, _ in enumerate(policy_data):
if not path == "/":
policy_data[i].path = path + policy_data[i].path
data_str = json.dumps(
policy_data, default=custom_encoder(by_alias=True, exclude_none=True)
)
patch = jsonpatch.JsonPatch.from_string(data_str)
patch.apply(self._data, in_place=True)
self.has_data_event.set()

async def get_data(self, path: str = None) -> Dict:
if path is None or path == "":
return self._data
Expand Down
66 changes: 64 additions & 2 deletions packages/opal-client/opal_client/policy_store/opa_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import aiohttp
import dpath
import jsonpatch
from aiofiles.threadpool.text import AsyncTextIOWrapper
from fastapi import Response, status
from opal_client.config import opal_client_config
Expand All @@ -21,6 +22,7 @@
from opal_common.engine.parsing import get_rego_package
from opal_common.git.bundle_utils import BundleUtils
from opal_common.paths import PathUtils
from opal_common.schemas.data import custom_encoder
from opal_common.schemas.policy import DataModule, PolicyBundle, RegoModule
from opal_common.schemas.store import JSONPatchAction, StoreTransaction, TransactionType
from pydantic import BaseModel
Expand Down Expand Up @@ -262,6 +264,16 @@ def set(self, path, data):
# This would overwrite already existing paths
dpath.new(self._root_data, path, data)

def patch(self, path, data: List[JSONPatchAction]):
for i, _ in enumerate(data):
if not path == "/":
data[i].path = path + data[i].path
data_str = json.dumps(
data, default=custom_encoder(by_alias=True, exclude_none=True)
)
patch = jsonpatch.JsonPatch.from_string(data_str)
patch.apply(self._root_data, in_place=True)

def delete(self, path):
if not path or path == "/":
self._root_data = {}
Expand Down Expand Up @@ -741,10 +753,60 @@ async def set_policy_data(
async with aiohttp.ClientSession() as session:
try:
headers = await self._get_auth_headers()
data = json.dumps(
policy_data,
default=custom_encoder(by_alias=True, exclude_none=True),
)

async with session.put(
f"{self._opa_url}/data{path}",
data=json.dumps(policy_data, default=str),
data=data,
headers=headers,
**self._ssl_context_kwargs,
) as opa_response:
response = await proxy_response_unless_invalid(
opa_response,
accepted_status_codes=[
status.HTTP_204_NO_CONTENT,
status.HTTP_304_NOT_MODIFIED,
],
)
if self._policy_data_cache:
self._policy_data_cache.set(path, json.loads(data))
return response
except aiohttp.ClientError as e:
logger.warning("Opa connection error: {err}", err=repr(e))
raise

@affects_transaction
@retry(**RETRY_CONFIG)
async def patch_policy_data(
self,
policy_data: List[JSONPatchAction],
path: str = "",
transaction_id: Optional[str] = None,
):
path = self._safe_data_module_path(path)

# in OPA, the root document must be an object, so we must wrap list values
if not path and isinstance(policy_data, list):
logger.warning(
"OPAL client was instructed to put a list on OPA's root document. In OPA the root document must be an object so the original value was wrapped."
)
policy_data = {"items": policy_data}

async with aiohttp.ClientSession() as session:
try:
headers = await self._get_auth_headers()
headers["Content-Type"] = "application/json-patch+json"
data = json.dumps(
roekatz marked this conversation as resolved.
Show resolved Hide resolved
policy_data,
default=custom_encoder(by_alias=True, exclude_none=True),
)

async with session.patch(
f"{self._opa_url}/data{path}",
data=data,
headers=headers,
**self._ssl_context_kwargs,
) as opa_response:
Expand All @@ -756,7 +818,7 @@ async def set_policy_data(
],
)
if self._policy_data_cache:
self._policy_data_cache.set(path, policy_data)
self._policy_data_cache.patch(path, policy_data)
return response
except aiohttp.ClientError as e:
logger.warning("Opa connection error: {err}", err=repr(e))
Expand Down
Loading