Skip to content

Commit

Permalink
Issue #115 switch to canonical batch job result URL
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Sep 8, 2023
1 parent 7eda0c9 commit 1523f89
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 7 deletions.
18 changes: 15 additions & 3 deletions src/openeo_aggregator/partitionedjobs/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
STATUS_INSERTED,
STATUS_RUNNING,
PartitionedJob,
PartitionedJobFailure,
SubJob,
)
from openeo_aggregator.partitionedjobs.crossbackend import (
Expand Down Expand Up @@ -103,9 +104,20 @@ def create_crossbackend_pjob(

def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict:
nonlocal batch_jobs
# TODO: use canonical URL to circumvent auth issues
# but how does `parial` work there? (https://github.com/Open-EO/openeo-api/issues/507)
stac_url = batch_jobs[subgraph_id].get_results_metadata_url(full=True) + "?partial=true"
try:
job: BatchJob = batch_jobs[subgraph_id]
with job.connection.authenticated_from_request(flask.request):
result_url = job.get_results_metadata_url(full=True)
result_metadata = job.connection.get(
result_url, params={"partial": "true"}, expected_status=200
).json()
# Will canonical link also be partial? (https://github.com/Open-EO/openeo-api/issues/507)
canonical_links = [link for link in result_metadata.get("links", {}) if link.get("rel") == "canonical"]
stac_url = canonical_links[0]["href"]
except Exception as e:
msg = f"Failed to obtain partial canonical batch job result URL for {subgraph_id=}: {e}"
_log.exception(msg)
raise PartitionedJobFailure(msg) from e
return {
node_id: {
"process_id": "load_stac",
Expand Down
18 changes: 14 additions & 4 deletions tests/partitionedjobs/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ def test_create_job_simple(self, flask_app, api100, zk_db, dummy1):
assert pg == {"lc1": {"arguments": {"id": "S2"}, "process_id": "load_collection", "result": True}}

@now.mock
def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
def test_create_job_basic(self, flask_app, api100, zk_db, dummy1, requests_mock):
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)

pg = {
Expand All @@ -708,6 +708,11 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
},
}

requests_mock.get(
"https://b1.test/v1/jobs/1-jb-0/results?partial=true",
json={"links": [{"rel": "canonical", "href": "https://data.b1.test/123abc"}]},
)

res = api100.post(
"/jobs",
json={"process": {"process_graph": pg}, "job_options": {"split_strategy": "crossbackend"}},
Expand Down Expand Up @@ -765,7 +770,7 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
"lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
"lc2": {
"process_id": "load_stac",
"arguments": {"url": "https://b1.test/v1/jobs/1-jb-0/results?partial=true"},
"arguments": {"url": "https://data.b1.test/123abc"},
},
"merge": {
"process_id": "merge_cubes",
Expand All @@ -790,7 +795,7 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
"lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
"lc2": {
"process_id": "load_stac",
"arguments": {"url": "https://b1.test/v1/jobs/1-jb-0/results?partial=true"},
"arguments": {"url": "https://data.b1.test/123abc"},
},
"merge": {
"process_id": "merge_cubes",
Expand Down Expand Up @@ -818,7 +823,7 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
}

@now.mock
def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1):
def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1, requests_mock):
"""Run the jobs and get results"""
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)

Expand All @@ -832,6 +837,11 @@ def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1):
},
}

requests_mock.get(
"https://b1.test/v1/jobs/1-jb-0/results?partial=true",
json={"links": [{"rel": "canonical", "href": "https://data.b1.test/123abc"}]},
)

res = api100.post(
"/jobs",
json={
Expand Down

0 comments on commit 1523f89

Please sign in to comment.