Skip to content

Commit

Permalink
Issue #604/#644 UDPJobFactory: make process_id optional (if namespace…
Browse files Browse the repository at this point in the history
… is given)
  • Loading branch information
soxofaan committed Oct 14, 2024
1 parent a92e47f commit 84ee8ea
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 10 deletions.
17 changes: 8 additions & 9 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,6 @@ class UDPJobFactory:
# Job creator, based on a parameterized openEO process definition
job_starter = UDPJobFactory(
process_id="my_process",
namespace="https://example.com/my_process.json",
)
Expand All @@ -983,13 +982,14 @@ class UDPJobFactory:

def __init__(
self,
process_id: str,
*,
process_id: Optional[str] = None,
namespace: Union[str, None] = None,
parameter_defaults: Optional[dict] = None,
parameter_column_map: Optional[dict] = None,
):
# TODO: allow process_id to be None too? when remote process definition fully comes from URL
if process_id is None and namespace is None:
raise ValueError("At least one of `process_id` and `namespace` should be provided.")
self._process_id = process_id
self._namespace = namespace
self._parameter_defaults = parameter_defaults or {}
Expand Down Expand Up @@ -1024,6 +1024,7 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:
"""

process_definition = self._get_process_definition(connection=connection)
process_id = process_definition.id
parameters = process_definition.parameters or []

if self._parameter_column_map is None:
Expand All @@ -1046,7 +1047,7 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:
# Skip optional parameters without any fallback default value
continue
else:
raise ValueError(f"Missing required parameter {param_name !r} for process {self._process_id!r}")
raise ValueError(f"Missing required parameter {param_name !r} for process {process_id!r}")

# Prepare some values/dtypes for JSON encoding
if isinstance(value, numpy.integer):
Expand All @@ -1058,12 +1059,10 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:

arguments[param_name] = value

cube = connection.datacube_from_process(process_id=self._process_id, namespace=self._namespace, **arguments)
cube = connection.datacube_from_process(process_id=process_id, namespace=self._namespace, **arguments)

title = row.get("title", f"Process {self._process_id!r} with {repr_truncate(arguments)}")
description = row.get(
"description", f"Process {self._process_id!r} (namespace {self._namespace}) with {arguments}"
)
title = row.get("title", f"Process {process_id!r} with {repr_truncate(arguments)}")
description = row.get("description", f"Process {process_id!r} (namespace {self._namespace}) with {arguments}")
job = connection.create_job(cube, title=title, description=description)

return job
Expand Down
5 changes: 5 additions & 0 deletions openeo/internal/processes/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,9 @@ def parse_remote_process_definition(namespace: str, process_id: Optional[str] =
raise LookupError(f"Process {process_id!r} not found in process listing {namespace!r}")
(data,) = processes

# Some final validation.
assert "id" in data, "Process definition should at least have an 'id' field"
if process_id is not None and data["id"] != process_id:
raise LookupError(f"Expected process id {process_id!r}, but found {data['id']!r}")

return Process.from_dict(data)
56 changes: 55 additions & 1 deletion tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,9 @@ def dummy_backend(self, requests_mock, con) -> DummyBackend:
@pytest.fixture(autouse=True)
def remote_process_definitions(self, requests_mock) -> dict:
mocks = {}
for pg in [self.PG_3PLUS5, self.PG_INCREMENT, self.PG_OFFSET_POLYGON]:
processes = [self.PG_3PLUS5, self.PG_INCREMENT, self.PG_OFFSET_POLYGON]
mocks["_all"] = requests_mock.get("https://remote.test/_all", json={"processes": processes, "links": []})
for pg in processes:
process_id = pg["id"]
mocks[process_id] = requests_mock.get(f"https://remote.test/{process_id}.json", json=pg)
return mocks
Expand Down Expand Up @@ -1151,6 +1153,58 @@ def test_basic_parameterization(self, con, dummy_backend, parameter_defaults, ro
}
}

@pytest.mark.parametrize(
["process_id", "namespace", "expected"],
[
(
# Classic UDP reference
"3plus5",
None,
{"process_id": "3plus5"},
),
(
# Remote process definition (with "redundant" process_id)
"3plus5",
"https://remote.test/3plus5.json",
{"process_id": "3plus5", "namespace": "https://remote.test/3plus5.json"},
),
(
# Remote process definition with just namespace (process_id should be inferred from that)
None,
"https://remote.test/3plus5.json",
{"process_id": "3plus5", "namespace": "https://remote.test/3plus5.json"},
),
(
# Remote process definition from listing
"3plus5",
"https://remote.test/_all",
{"process_id": "3plus5", "namespace": "https://remote.test/_all"},
),
],
)
def test_process_references_in_constructor(
self, con, requests_mock, dummy_backend, remote_process_definitions, process_id, namespace, expected
):
"""Various ways to provide process references in the constructor"""

# Register personal UDP
requests_mock.get(con.build_url("/process_graphs/3plus5"), json=self.PG_3PLUS5)

job_factory = UDPJobFactory(process_id=process_id, namespace=namespace)

job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con)
assert isinstance(job, BatchJob)
assert dummy_backend.batch_jobs == {
"job-000": {
"job_id": "job-000",
"pg": {"3plus51": {**expected, "arguments": {}, "result": True}},
"status": "created",
}
}

def test_no_process_id_nor_namespace(self):
with pytest.raises(ValueError, match="At least one of `process_id` and `namespace` should be provided"):
_ = UDPJobFactory()

@pytest.fixture
def job_manager(self, tmp_path, dummy_backend) -> MultiBackendJobManager:
Expand Down
7 changes: 7 additions & 0 deletions tests/internal/processes/test_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,10 @@ def test_parse_remote_process_definition_listing(requests_mock):
assert process.returns is None
assert process.description is None
assert process.summary is None


def test_parse_remote_process_definition_inconsistency(requests_mock):
url = "https://example.com/ndvi.json"
requests_mock.get(url, json={"id": "nnddvvii"})
with pytest.raises(LookupError, match="Expected process id 'ndvi', but found 'nnddvvii'"):
_ = parse_remote_process_definition(url, process_id="ndvi")

0 comments on commit 84ee8ea

Please sign in to comment.