diff --git a/documentation/docs/tutorials/trigger_data_updates.mdx b/documentation/docs/tutorials/trigger_data_updates.mdx index 2829c9514..7c91ef617 100644 --- a/documentation/docs/tutorials/trigger_data_updates.mdx +++ b/documentation/docs/tutorials/trigger_data_updates.mdx @@ -61,7 +61,45 @@ Example: - All the APIs in opal are OpenAPI / Swagger based (via FastAPI). - Check out the [API docs on your running OPAL-server](http://localhost:7002/docs#/Data%20Updates/publish_data_update_event_data_config_post) -- this link assumes you have the server running on `http://localhost:7002` - You can also [generate an API-client](https://github.com/OpenAPITools/openapi-generator) in the language of your choice using the [OpenAPI spec provided by the server](http://localhost:7002/openapi.json) - +#### Using PATCH save method +- There are two save methods of triggering data update, PUT and PATCH defined in the payload using the `save_method` field +- Using PUT basically places(overrides if path exists) data at the specified `dst_path` +- When using PATCH, the `data` field should conform to JSON patch schema according to [RFC 6902](https://datatracker.ietf.org/doc/html/rfc6902#section-3) and this applies patch operation on the data as specified +- Let's say the following is the +- >NOTE: OPA does not support `move` operation of JSON patch, [refer discussion for the same](https://github.com/orgs/open-policy-agent/discussions/462#discussioncomment-6343050) and hence OPAL cannot support `move` operation as well +- Example: Consider this [JSON data](https://github.com/permitio/opal-example-policy-repo/blob/master/data.json) from the [opal-example-policy-repo](https://github.com/permitio/opal-example-policy-repo) +- Let's say a user is deleted from the system and we would want that user details to be removed from the JSON, let's remove bob from the list, we can use the `remove` operation of JSON patch to achieve this +- The following API request will remove `bob` from the `users` JSON + ``` + curl -X 'POST' \ + 'http://localhost:7002/data/config' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{ + "id": "randomid", + "entries": [ + { + "url": "", + "config": {}, + "topics": [ + "policy_data" + ], + "dst_path": "/users", + "save_method": "PATCH", + "data": [ + { + "op": "remove", + "path": "/bob" + } + ] + } + ], + "reason": "user bob is deleted from the system", + "callback": { + "callbacks": [] + } +}' + ``` ### Option 3: Write your own - import code from the OPAL's packages - One of the great things about OPAL being written in Python is that you can easily reuse its code. See the code for the `DataUpdate` model at [opal_common/schemas/data.py](https://github.com/permitio/opal/blob/master/packages/opal-common/opal_common/schemas/data.py) and use it within your own code to send an update to the server diff --git a/packages/opal-client/opal_client/data/updater.py b/packages/opal-client/opal_client/data/updater.py index 607df28fa..0aef7660a 100644 --- a/packages/opal-client/opal_client/data/updater.py +++ b/packages/opal-client/opal_client/data/updater.py @@ -35,6 +35,7 @@ from opal_common.schemas.store import TransactionType from opal_common.security.sslcontext import get_custom_ssl_context from opal_common.utils import get_authorization_header +from pydantic.json import pydantic_encoder class DataUpdater: @@ -296,7 +297,7 @@ def calc_hash(data): """ try: if not isinstance(data, str): - data = json.dumps(data, default=str) + data = json.dumps(data, default=pydantic_encoder) return hashlib.sha256(data.encode("utf-8")).hexdigest() except: logger.exception("Failed to calculate hash for data {data}", data=data) diff --git a/packages/opal-client/opal_client/policy_store/mock_policy_store_client.py b/packages/opal-client/opal_client/policy_store/mock_policy_store_client.py index 2ee52a110..e19b5b460 100644 --- a/packages/opal-client/opal_client/policy_store/mock_policy_store_client.py +++ b/packages/opal-client/opal_client/policy_store/mock_policy_store_client.py @@ -1,8 +1,11 @@ import asyncio +import json from typing import Any, Dict, List, Optional +import jsonpatch +from opal_common.schemas.data import custom_encoder from opal_common.schemas.policy import PolicyBundle -from opal_common.schemas.store import StoreTransaction +from opal_common.schemas.store import JSONPatchAction, StoreTransaction from pydantic import BaseModel from .base_policy_store_client import BasePolicyStoreClient, JsonableValue @@ -53,6 +56,22 @@ async def set_policy_data( self._data[path] = policy_data self.has_data_event.set() + async def patch_policy_data( + self, + policy_data: List[JSONPatchAction], + path: str = "", + transaction_id: Optional[str] = None, + ): + for i, _ in enumerate(policy_data): + if not path == "/": + policy_data[i].path = path + policy_data[i].path + data_str = json.dumps( + policy_data, default=custom_encoder(by_alias=True, exclude_none=True) + ) + patch = jsonpatch.JsonPatch.from_string(data_str) + patch.apply(self._data, in_place=True) + self.has_data_event.set() + async def get_data(self, path: str = None) -> Dict: if path is None or path == "": return self._data diff --git a/packages/opal-client/opal_client/tests/data_updater_test.py b/packages/opal-client/opal_client/tests/data_updater_test.py index 1cbcff08d..f3e26ad0c 100644 --- a/packages/opal-client/opal_client/tests/data_updater_test.py +++ b/packages/opal-client/opal_client/tests/data_updater_test.py @@ -1,4 +1,5 @@ import asyncio +import json import logging import multiprocessing import os @@ -6,10 +7,12 @@ from multiprocessing import Event, Process import pytest +import requests import uvicorn from aiohttp import ClientSession from fastapi_websocket_pubsub import PubSubClient from flaky import flaky +from pydantic.json import pydantic_encoder # Add parent path to use local src as package for tests root_dir = os.path.abspath( @@ -30,6 +33,7 @@ ServerDataSourceConfig, UpdateCallback, ) +from opal_common.schemas.store import JSONPatchAction from opal_common.utils import get_authorization_header from opal_server.config import opal_server_config from opal_server.server import OpalServer @@ -53,6 +57,9 @@ DATA_SOURCES_CONFIG = ServerDataSourceConfig( config=DataSourceConfig(entries=[{"url": DATA_URL, "topics": DATA_TOPICS}]) ) +DATA_UPDATE_ROUTE = f"http://localhost:{PORT}/data/config" + +PATCH_DATA_UPDATE = [JSONPatchAction(op="add", path="/", value=TEST_DATA)] def setup_server(event): @@ -75,7 +82,10 @@ def fetchable_data(): # route to report complition to @server_app.post(DATA_UPDATE_CALLBACK_ROUTE) def callback(report: DataUpdateReport): - assert report.reports[0].hash == DataUpdater.calc_hash(TEST_DATA) + if len(callbacks) == 1: + assert report.reports[0].hash == DataUpdater.calc_hash(PATCH_DATA_UPDATE) + else: + assert report.reports[0].hash == DataUpdater.calc_hash(TEST_DATA) callbacks.append(report) return "OKAY" @@ -122,6 +132,33 @@ async def run(): asyncio.run(run()) +def trigger_update_patch(): + async def run(): + # trigger an update + entries = [ + DataSourceEntry( + url="", + data=PATCH_DATA_UPDATE, + dst_path="/", + save_method="PATCH", + topics=DATA_TOPICS, + ) + ] + callback = UpdateCallback(callbacks=[DATA_UPDATE_CALLBACK_URL]) + update = DataUpdate(reason="Test", entries=entries, callback=callback) + async with PubSubClient( + server_uri=UPDATES_URL, + methods_class=TenantAwareRpcEventClientMethods, + extra_headers=[get_authorization_header(opal_client_config.CLIENT_TOKEN)], + ) as client: + # Channel must be ready before we can publish on it + await asyncio.wait_for(client.wait_until_ready(), 5) + logging.info("Publishing data event") + await client.publish(DATA_TOPICS, data=update) + + asyncio.run(run()) + + @flaky @pytest.mark.asyncio async def test_data_updater(server): @@ -146,10 +183,50 @@ async def test_data_updater(server): # wait until new data arrives into the store via the updater await asyncio.wait_for(policy_store.wait_for_data(), 60) # cleanup + finally: + await updater.stop() + proc.terminate() + try: + proc = multiprocessing.Process(target=trigger_update_patch, daemon=True) + proc.start() + # wait until new data arrives into the store via the updater + await asyncio.wait_for(policy_store.wait_for_data(), 60) + # cleanup finally: await updater.stop() proc.terminate() + # test PATCH update event via API + entries = [ + DataSourceEntry( + url="", + data=PATCH_DATA_UPDATE, + dst_path="/", + topics=DATA_TOPICS, + save_method="PATCH", + ) + ] + update = DataUpdate( + reason="Test_Patch", entries=entries, callback=UpdateCallback(callbacks=[]) + ) + + headers = {"content-type": "application/json"} + # trigger an update + res = requests.post( + DATA_UPDATE_ROUTE, + data=json.dumps(update, default=pydantic_encoder), + headers=headers, + ) + assert res.status_code == 200 + # value field is not specified for add operation should fail + entries[0].data = [{"op": "add", "path": "/"}] + res = requests.post( + DATA_UPDATE_ROUTE, + data=json.dumps(update, default=pydantic_encoder), + headers=headers, + ) + assert res.status_code == 422 + @pytest.mark.asyncio async def test_data_updater_with_report_callback(server): @@ -188,10 +265,28 @@ async def test_data_updater_with_report_callback(server): # we got one callback in the interim assert count == current_callback_count + 1 + async with ClientSession() as session: + res = await session.get(CHECK_DATA_UPDATE_CALLBACK_URL) + current_callback_count = await res.json() + + proc2 = multiprocessing.Process(target=trigger_update_patch, daemon=True) + proc2.start() + # wait until new data arrives into the store via the updater + await asyncio.wait_for(policy_store.wait_for_data(), 15) + # give the callback a chance to arrive + await asyncio.sleep(1) + + async with ClientSession() as session: + res = await session.get(CHECK_DATA_UPDATE_CALLBACK_URL) + count = await res.json() + # we got one callback in the interim + assert count == current_callback_count + 1 + # cleanup finally: await updater.stop() proc.terminate() + proc2.terminate() @pytest.mark.asyncio diff --git a/packages/opal-common/opal_common/schemas/data.py b/packages/opal-common/opal_common/schemas/data.py index cc968bb84..5e20e2678 100644 --- a/packages/opal-common/opal_common/schemas/data.py +++ b/packages/opal-common/opal_common/schemas/data.py @@ -56,7 +56,8 @@ def name_must_contain_space(cls, value, values): # see https://www.openpolicyagent.org/docs/latest/rest-api/#data-api path is the path nested under //data dst_path: str = Field("", description="OPA data api path to store the document at") save_method: str = Field( - "PUT", description="Method used to write into OPA - PUT/PATCH" + "PUT", + description="Method used to write into OPA - PUT/PATCH, when using the PATCH method the data field should conform to the JSON patch schema defined in RFC 6902(https://datatracker.ietf.org/doc/html/rfc6902#section-3)", ) data: Optional[JsonableValue] = Field( None,