Skip to content

Commit

Permalink
add test and update docs for patch data update event
Browse files Browse the repository at this point in the history
  • Loading branch information
thilak reddy committed Jul 4, 2023
1 parent f5a12b6 commit 7230e12
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 5 deletions.
40 changes: 39 additions & 1 deletion documentation/docs/tutorials/trigger_data_updates.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,45 @@ Example:
- All the APIs in opal are OpenAPI / Swagger based (via FastAPI).
- Check out the [API docs on your running OPAL-server](http://localhost:7002/docs#/Data%20Updates/publish_data_update_event_data_config_post) -- this link assumes you have the server running on `http://localhost:7002`
- You can also [generate an API-client](https://github.com/OpenAPITools/openapi-generator) in the language of your choice using the [OpenAPI spec provided by the server](http://localhost:7002/openapi.json)

#### Using PATCH save method
- There are two save methods of triggering data update, PUT and PATCH defined in the payload using the `save_method` field
- Using PUT basically places(overrides if path exists) data at the specified `dst_path`
- When using PATCH, the `data` field should conform to JSON patch schema according to [RFC 6902](https://datatracker.ietf.org/doc/html/rfc6902#section-3) and this applies patch operation on the data as specified
- Let's say the following is the
- >NOTE: OPA does not support `move` operation of JSON patch, [refer discussion for the same](https://github.com/orgs/open-policy-agent/discussions/462#discussioncomment-6343050) and hence OPAL cannot support `move` operation as well
- Example: Consider this [JSON data](https://github.com/permitio/opal-example-policy-repo/blob/master/data.json) from the [opal-example-policy-repo](https://github.com/permitio/opal-example-policy-repo)
- Let's say a user is deleted from the system and we would want that user details to be removed from the JSON, let's remove bob from the list, we can use the `remove` operation of JSON patch to achieve this
- The following API request will remove `bob` from the `users` JSON
```
curl -X 'POST' \
'http://localhost:7002/data/config' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"id": "randomid",
"entries": [
{
"url": "",
"config": {},
"topics": [
"policy_data"
],
"dst_path": "/users",
"save_method": "PATCH",
"data": [
{
"op": "remove",
"path": "/bob"
}
]
}
],
"reason": "user bob is deleted from the system",
"callback": {
"callbacks": []
}
}'
```
### Option 3: Write your own - import code from the OPAL's packages
- One of the great things about OPAL being written in Python is that you can easily reuse its code.
See the code for the `DataUpdate` model at [opal_common/schemas/data.py](https://github.com/permitio/opal/blob/master/packages/opal-common/opal_common/schemas/data.py) and use it within your own code to send an update to the server
3 changes: 2 additions & 1 deletion packages/opal-client/opal_client/data/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from opal_common.schemas.store import TransactionType
from opal_common.security.sslcontext import get_custom_ssl_context
from opal_common.utils import get_authorization_header
from pydantic.json import pydantic_encoder


class DataUpdater:
Expand Down Expand Up @@ -296,7 +297,7 @@ def calc_hash(data):
"""
try:
if not isinstance(data, str):
data = json.dumps(data, default=str)
data = json.dumps(data, default=pydantic_encoder)
return hashlib.sha256(data.encode("utf-8")).hexdigest()
except:
logger.exception("Failed to calculate hash for data {data}", data=data)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio
import json
from typing import Any, Dict, List, Optional

import jsonpatch
from opal_common.schemas.data import custom_encoder
from opal_common.schemas.policy import PolicyBundle
from opal_common.schemas.store import StoreTransaction
from opal_common.schemas.store import JSONPatchAction, StoreTransaction
from pydantic import BaseModel

from .base_policy_store_client import BasePolicyStoreClient, JsonableValue
Expand Down Expand Up @@ -53,6 +56,22 @@ async def set_policy_data(
self._data[path] = policy_data
self.has_data_event.set()

async def patch_policy_data(
self,
policy_data: List[JSONPatchAction],
path: str = "",
transaction_id: Optional[str] = None,
):
for i, _ in enumerate(policy_data):
if not path == "/":
policy_data[i].path = path + policy_data[i].path
data_str = json.dumps(
policy_data, default=custom_encoder(by_alias=True, exclude_none=True)
)
patch = jsonpatch.JsonPatch.from_string(data_str)
patch.apply(self._data, in_place=True)
self.has_data_event.set()

async def get_data(self, path: str = None) -> Dict:
if path is None or path == "":
return self._data
Expand Down
97 changes: 96 additions & 1 deletion packages/opal-client/opal_client/tests/data_updater_test.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import asyncio
import json
import logging
import multiprocessing
import os
import sys
from multiprocessing import Event, Process

import pytest
import requests
import uvicorn
from aiohttp import ClientSession
from fastapi_websocket_pubsub import PubSubClient
from flaky import flaky
from pydantic.json import pydantic_encoder

# Add parent path to use local src as package for tests
root_dir = os.path.abspath(
Expand All @@ -30,6 +33,7 @@
ServerDataSourceConfig,
UpdateCallback,
)
from opal_common.schemas.store import JSONPatchAction
from opal_common.utils import get_authorization_header
from opal_server.config import opal_server_config
from opal_server.server import OpalServer
Expand All @@ -53,6 +57,9 @@
DATA_SOURCES_CONFIG = ServerDataSourceConfig(
config=DataSourceConfig(entries=[{"url": DATA_URL, "topics": DATA_TOPICS}])
)
DATA_UPDATE_ROUTE = f"http://localhost:{PORT}/data/config"

PATCH_DATA_UPDATE = [JSONPatchAction(op="add", path="/", value=TEST_DATA)]


def setup_server(event):
Expand All @@ -75,7 +82,10 @@ def fetchable_data():
# route to report complition to
@server_app.post(DATA_UPDATE_CALLBACK_ROUTE)
def callback(report: DataUpdateReport):
assert report.reports[0].hash == DataUpdater.calc_hash(TEST_DATA)
if len(callbacks) == 1:
assert report.reports[0].hash == DataUpdater.calc_hash(PATCH_DATA_UPDATE)
else:
assert report.reports[0].hash == DataUpdater.calc_hash(TEST_DATA)
callbacks.append(report)
return "OKAY"

Expand Down Expand Up @@ -122,6 +132,33 @@ async def run():
asyncio.run(run())


def trigger_update_patch():
async def run():
# trigger an update
entries = [
DataSourceEntry(
url="",
data=PATCH_DATA_UPDATE,
dst_path="/",
save_method="PATCH",
topics=DATA_TOPICS,
)
]
callback = UpdateCallback(callbacks=[DATA_UPDATE_CALLBACK_URL])
update = DataUpdate(reason="Test", entries=entries, callback=callback)
async with PubSubClient(
server_uri=UPDATES_URL,
methods_class=TenantAwareRpcEventClientMethods,
extra_headers=[get_authorization_header(opal_client_config.CLIENT_TOKEN)],
) as client:
# Channel must be ready before we can publish on it
await asyncio.wait_for(client.wait_until_ready(), 5)
logging.info("Publishing data event")
await client.publish(DATA_TOPICS, data=update)

asyncio.run(run())


@flaky
@pytest.mark.asyncio
async def test_data_updater(server):
Expand All @@ -146,10 +183,50 @@ async def test_data_updater(server):
# wait until new data arrives into the store via the updater
await asyncio.wait_for(policy_store.wait_for_data(), 60)
# cleanup
finally:
await updater.stop()
proc.terminate()
try:
proc = multiprocessing.Process(target=trigger_update_patch, daemon=True)
proc.start()
# wait until new data arrives into the store via the updater
await asyncio.wait_for(policy_store.wait_for_data(), 60)
# cleanup
finally:
await updater.stop()
proc.terminate()

# test PATCH update event via API
entries = [
DataSourceEntry(
url="",
data=PATCH_DATA_UPDATE,
dst_path="/",
topics=DATA_TOPICS,
save_method="PATCH",
)
]
update = DataUpdate(
reason="Test_Patch", entries=entries, callback=UpdateCallback(callbacks=[])
)

headers = {"content-type": "application/json"}
# trigger an update
res = requests.post(
DATA_UPDATE_ROUTE,
data=json.dumps(update, default=pydantic_encoder),
headers=headers,
)
assert res.status_code == 200
# value field is not specified for add operation should fail
entries[0].data = [{"op": "add", "path": "/"}]
res = requests.post(
DATA_UPDATE_ROUTE,
data=json.dumps(update, default=pydantic_encoder),
headers=headers,
)
assert res.status_code == 422


@pytest.mark.asyncio
async def test_data_updater_with_report_callback(server):
Expand Down Expand Up @@ -188,10 +265,28 @@ async def test_data_updater_with_report_callback(server):
# we got one callback in the interim
assert count == current_callback_count + 1

async with ClientSession() as session:
res = await session.get(CHECK_DATA_UPDATE_CALLBACK_URL)
current_callback_count = await res.json()

proc2 = multiprocessing.Process(target=trigger_update_patch, daemon=True)
proc2.start()
# wait until new data arrives into the store via the updater
await asyncio.wait_for(policy_store.wait_for_data(), 15)
# give the callback a chance to arrive
await asyncio.sleep(1)

async with ClientSession() as session:
res = await session.get(CHECK_DATA_UPDATE_CALLBACK_URL)
count = await res.json()
# we got one callback in the interim
assert count == current_callback_count + 1

# cleanup
finally:
await updater.stop()
proc.terminate()
proc2.terminate()


@pytest.mark.asyncio
Expand Down
3 changes: 2 additions & 1 deletion packages/opal-common/opal_common/schemas/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def name_must_contain_space(cls, value, values):
# see https://www.openpolicyagent.org/docs/latest/rest-api/#data-api path is the path nested under <OPA_SERVER>/<version>/data
dst_path: str = Field("", description="OPA data api path to store the document at")
save_method: str = Field(
"PUT", description="Method used to write into OPA - PUT/PATCH"
"PUT",
description="Method used to write into OPA - PUT/PATCH, when using the PATCH method the data field should conform to the JSON patch schema defined in RFC 6902(https://datatracker.ietf.org/doc/html/rfc6902#section-3)",
)
data: Optional[JsonableValue] = Field(
None,
Expand Down

0 comments on commit 7230e12

Please sign in to comment.