Skip to content

Commit

Permalink
Issue #42 basic support /validation: pass through from best upstrea…
Browse files Browse the repository at this point in the history
…m backend
  • Loading branch information
soxofaan committed Dec 5, 2023
1 parent e17e9be commit 91cde5d
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 6 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ All notable changes to this project will be documented in this file.

The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [0.15.x]

## [Current: 0.14.x]
- Basic support for `POST /validation`: pass through validation of best upstream backend for given process graph ([#42](https://github.com/Open-EO/openeo-aggregator/issues/42))


## [0.14.x]

- Disassociate billing plans from user roles and don't list any for now ([openEOPlatform/architecture-docs#381](https://github.com/openEOPlatform/architecture-docs/issues/381))

Expand Down
2 changes: 1 addition & 1 deletion src/openeo_aggregator/about.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
from typing import Optional

__version__ = "0.14.0a1"
__version__ = "0.15.0a1"


def log_version_info(logger: Optional[logging.Logger] = None):
Expand Down
41 changes: 38 additions & 3 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,6 @@ def __init__(
self._catalog = catalog
self._stream_chunk_size = config.streaming_chunk_size

# TODO #42 /validation support
self.validate = None

def get_process_registry(
self, api_version: Union[str, ComparableVersion]
) -> ProcessRegistry:
Expand Down Expand Up @@ -593,6 +590,44 @@ def _process_load_ml_model(
return job_backend_id, job_id
return None, model_id

def validate(self, process_graph: dict, env: EvalEnv = None) -> List[dict]:
"""Validate given process graph."""
# TODO: validate against each backend and aggregate/combine results?
# TODO: do some additional aggregator-level validation?
# TODO: support validation of cross-backend process graphs?
try:
backend_id = self.get_backend_for_process_graph(process_graph, api_version=env.get("version"))

# Preprocess process graph (e.g. translate job ids and other references)
process_graph = self.preprocess_process_graph(process_graph, backend_id=backend_id)

# Send process graph to backend
con = self.backends.get_connection(backend_id=backend_id)
post_data = {"process_graph": process_graph}
timing_logger = TimingLogger(title=f"Process graph validation on backend {backend_id}", logger=_log.info)
with con.authenticated_from_request(flask.request), timing_logger:
try:
backend_response = con.post(path="/validation", json=post_data, expected_status=200)
errors = backend_response.json()["errors"]
except Exception as e:
_log.error(f"Validation failed on backend {con.id}: {e!r}", exc_info=True)
errors = [
{
"code": "UpstreamValidationFailure",
"message": f"Validation failed on backend {con.id}: {e!r}",
}
]
except Exception as e:
_log.error(f"Validation failed: {e!r}", exc_info=True)
errors = [
{
"code": "InternalValidationFailure",
"message": f"Validation failed: {e!r}",
}
]
return errors



class AggregatorBatchJobs(BatchJobs):

Expand Down
77 changes: 76 additions & 1 deletion tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def test_capabilities_validation(self, api100):
capabilities = res.json
endpoints = capabilities["endpoints"]
paths = set(e["path"] for e in endpoints)
assert "/validation" not in paths
assert "/validation" in paths

def test_billing_plans(self, api100):
capabilities = api100.get("/").assert_status_code(200).json
Expand Down Expand Up @@ -1214,6 +1214,81 @@ def post_result(request: requests.Request, context):
assert (b1_mock.call_count, b2_mock.call_count) == call_counts
assert caplog.messages == expected_warnings

def test_validation_basic(self, api100, requests_mock, backend1):
def post_validation(request: requests.Request, context):
assert request.headers["Authorization"] == TEST_USER_AUTH_HEADER["Authorization"]
assert request.json() == {
"process_graph": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}}
}
context.headers["Content-Type"] = "application/json"
return {"errors": [{"code": "NoMath", "message": "No math support"}]}

validation_mock = requests_mock.post(backend1 + "/validation", json=post_validation)

api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
post_data = {"process_graph": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}}}
res = api100.post("/validation", json=post_data).assert_status_code(200)
assert res.json == {"errors": [{"code": "NoMath", "message": "No math support"}]}
assert validation_mock.call_count == 1

@pytest.mark.parametrize(
["collection_id", "expected_error", "expected_call_counts"],
[
("S1", {"code": "NoData", "message": "No data for S1"}, (1, 0)),
("S2", {"code": "NoData", "message": "No data for S2"}, (0, 1)),
(
"MEH",
{
"code": "InternalValidationFailure",
"message": RegexMatcher(
r"^Validation failed: CollectionNotFoundException\(status_code=404, code='CollectionNotFound', message=\"Collection 'MEH' does not exist."
),
},
(0, 0),
),
],
)
def test_validation_collection_support(
self, api100, requests_mock, backend1, backend2, collection_id, expected_error, expected_call_counts
):
requests_mock.get(backend1 + "/collections", json={"collections": [{"id": "S1"}]})
requests_mock.get(backend2 + "/collections", json={"collections": [{"id": "S2"}]})

def post_validation(request: requests.Request, context):
assert request.headers["Authorization"] == TEST_USER_AUTH_HEADER["Authorization"]
collection_id = request.json()["process_graph"]["lc"]["arguments"]["id"]
context.headers["Content-Type"] = "application/json"
return {"errors": [{"code": "NoData", "message": f"No data for {collection_id}"}]}

b1_validation_mock = requests_mock.post(backend1 + "/validation", json=post_validation)
b2_validation_mock = requests_mock.post(backend2 + "/validation", json=post_validation)

api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
post_data = {
"process_graph": {
"lc": {"process_id": "load_collection", "arguments": {"id": collection_id}, "result": True}
}
}
res = api100.post("/validation", json=post_data).assert_status_code(200)
assert res.json == {"errors": [expected_error]}
assert (b1_validation_mock.call_count, b2_validation_mock.call_count) == expected_call_counts

def test_validation_upstream_failure(self, api100, requests_mock, backend1, backend2):
validation_mock = requests_mock.post(backend1 + "/validation", content=b"this is not JSON")

api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
post_data = {"process_graph": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}}}
res = api100.post("/validation", json=post_data).assert_status_code(200)
assert res.json == {
"errors": [
{
"code": "UpstreamValidationFailure",
"message": "Validation failed on backend b1: JSONDecodeError('Expecting value: line 1 column 1 (char 0)')",
}
]
}
assert validation_mock.call_count == 1


class TestBatchJobs:

Expand Down

0 comments on commit 91cde5d

Please sign in to comment.