diff --git a/CHANGELOG.md b/CHANGELOG.md index 361bd339..d6643db6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,8 +4,12 @@ All notable changes to this project will be documented in this file. The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). +## [0.15.x] -## [Current: 0.14.x] +- Basic support for `POST /validation`: pass through validation of best upstream backend for given process graph ([#42](https://github.com/Open-EO/openeo-aggregator/issues/42)) + + +## [0.14.x] - Disassociate billing plans from user roles and don't list any for now ([openEOPlatform/architecture-docs#381](https://github.com/openEOPlatform/architecture-docs/issues/381)) diff --git a/src/openeo_aggregator/about.py b/src/openeo_aggregator/about.py index 28615649..4af68fa1 100644 --- a/src/openeo_aggregator/about.py +++ b/src/openeo_aggregator/about.py @@ -2,7 +2,7 @@ import sys from typing import Optional -__version__ = "0.14.0a1" +__version__ = "0.15.0a1" def log_version_info(logger: Optional[logging.Logger] = None): diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index dd3f8c42..2024b827 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -355,9 +355,6 @@ def __init__( self._catalog = catalog self._stream_chunk_size = config.streaming_chunk_size - # TODO #42 /validation support - self.validate = None - def get_process_registry( self, api_version: Union[str, ComparableVersion] ) -> ProcessRegistry: @@ -593,6 +590,44 @@ def _process_load_ml_model( return job_backend_id, job_id return None, model_id + def validate(self, process_graph: dict, env: EvalEnv = None) -> List[dict]: + """Validate given process graph.""" + # TODO: validate against each backend and aggregate/combine results? + # TODO: do some additional aggregator-level validation? + # TODO: support validation of cross-backend process graphs? + try: + backend_id = self.get_backend_for_process_graph(process_graph, api_version=env.get("version")) + + # Preprocess process graph (e.g. translate job ids and other references) + process_graph = self.preprocess_process_graph(process_graph, backend_id=backend_id) + + # Send process graph to backend + con = self.backends.get_connection(backend_id=backend_id) + post_data = {"process_graph": process_graph} + timing_logger = TimingLogger(title=f"Process graph validation on backend {backend_id}", logger=_log.info) + with con.authenticated_from_request(flask.request), timing_logger: + try: + backend_response = con.post(path="/validation", json=post_data, expected_status=200) + errors = backend_response.json()["errors"] + except Exception as e: + _log.error(f"Validation failed on backend {con.id}: {e!r}", exc_info=True) + errors = [ + { + "code": "UpstreamValidationFailure", + "message": f"Validation failed on backend {con.id}: {e!r}", + } + ] + except Exception as e: + _log.error(f"Validation failed: {e!r}", exc_info=True) + errors = [ + { + "code": "InternalValidationFailure", + "message": f"Validation failed: {e!r}", + } + ] + return errors + + class AggregatorBatchJobs(BatchJobs): diff --git a/tests/test_views.py b/tests/test_views.py index f1747f69..b2f20599 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -65,7 +65,7 @@ def test_capabilities_validation(self, api100): capabilities = res.json endpoints = capabilities["endpoints"] paths = set(e["path"] for e in endpoints) - assert "/validation" not in paths + assert "/validation" in paths def test_billing_plans(self, api100): capabilities = api100.get("/").assert_status_code(200).json @@ -1214,6 +1214,81 @@ def post_result(request: requests.Request, context): assert (b1_mock.call_count, b2_mock.call_count) == call_counts assert caplog.messages == expected_warnings + def test_validation_basic(self, api100, requests_mock, backend1): + def post_validation(request: requests.Request, context): + assert request.headers["Authorization"] == TEST_USER_AUTH_HEADER["Authorization"] + assert request.json() == { + "process_graph": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}} + } + context.headers["Content-Type"] = "application/json" + return {"errors": [{"code": "NoMath", "message": "No math support"}]} + + validation_mock = requests_mock.post(backend1 + "/validation", json=post_validation) + + api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) + post_data = {"process_graph": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}}} + res = api100.post("/validation", json=post_data).assert_status_code(200) + assert res.json == {"errors": [{"code": "NoMath", "message": "No math support"}]} + assert validation_mock.call_count == 1 + + @pytest.mark.parametrize( + ["collection_id", "expected_error", "expected_call_counts"], + [ + ("S1", {"code": "NoData", "message": "No data for S1"}, (1, 0)), + ("S2", {"code": "NoData", "message": "No data for S2"}, (0, 1)), + ( + "MEH", + { + "code": "InternalValidationFailure", + "message": RegexMatcher( + r"^Validation failed: CollectionNotFoundException\(status_code=404, code='CollectionNotFound', message=\"Collection 'MEH' does not exist." + ), + }, + (0, 0), + ), + ], + ) + def test_validation_collection_support( + self, api100, requests_mock, backend1, backend2, collection_id, expected_error, expected_call_counts + ): + requests_mock.get(backend1 + "/collections", json={"collections": [{"id": "S1"}]}) + requests_mock.get(backend2 + "/collections", json={"collections": [{"id": "S2"}]}) + + def post_validation(request: requests.Request, context): + assert request.headers["Authorization"] == TEST_USER_AUTH_HEADER["Authorization"] + collection_id = request.json()["process_graph"]["lc"]["arguments"]["id"] + context.headers["Content-Type"] = "application/json" + return {"errors": [{"code": "NoData", "message": f"No data for {collection_id}"}]} + + b1_validation_mock = requests_mock.post(backend1 + "/validation", json=post_validation) + b2_validation_mock = requests_mock.post(backend2 + "/validation", json=post_validation) + + api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) + post_data = { + "process_graph": { + "lc": {"process_id": "load_collection", "arguments": {"id": collection_id}, "result": True} + } + } + res = api100.post("/validation", json=post_data).assert_status_code(200) + assert res.json == {"errors": [expected_error]} + assert (b1_validation_mock.call_count, b2_validation_mock.call_count) == expected_call_counts + + def test_validation_upstream_failure(self, api100, requests_mock, backend1, backend2): + validation_mock = requests_mock.post(backend1 + "/validation", content=b"this is not JSON") + + api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) + post_data = {"process_graph": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}}} + res = api100.post("/validation", json=post_data).assert_status_code(200) + assert res.json == { + "errors": [ + { + "code": "UpstreamValidationFailure", + "message": "Validation failed on backend b1: JSONDecodeError('Expecting value: line 1 column 1 (char 0)')", + } + ] + } + assert validation_mock.call_count == 1 + class TestBatchJobs: