diff --git a/CHANGELOG.md b/CHANGELOG.md index f8d01fc57..805bfb369 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,8 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `MultiBackendJobManager`: Added `initialize_from_df(df)` (to `CsvJobDatabase` and `ParquetJobDatabase`) to initialize (and persist) the job database from a given DataFrame. Also added `create_job_db()` factory to easily create a job database from a given dataframe and its type guessed from filename extension. ([#635](https://github.com/Open-EO/openeo-python-client/issues/635)) - - +- `MultiBackendJobManager.run_jobs()` now returns a dictionary with counters/stats about various events during the full run of the job manager ([#645](https://github.com/Open-EO/openeo-python-client/issues/645)) +- Added (experimental) `ProcessBasedJobCreator` to be used as `start_job` callable with `MultiBackendJobManager` to create multiple jobs from a single parameterized process (e.g. a UDP or remote process definition) ([#604](https://github.com/Open-EO/openeo-python-client/issues/604)) ### Changed diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index 171cc3fb7..b5219dc72 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -2,6 +2,9 @@ Multi Backend Job Manager ==================================== +API +=== + .. warning:: This is a new experimental API, subject to change. @@ -14,3 +17,106 @@ Multi Backend Job Manager .. autoclass:: openeo.extra.job_management.CsvJobDatabase .. autoclass:: openeo.extra.job_management.ParquetJobDatabase + + +.. autoclass:: openeo.extra.job_management.ProcessBasedJobCreator + :members: + :special-members: __call__ + + +.. _job-management-with-process-based-job-creator: + +Job creation based on parameterized processes +=============================================== + +The openEO API supports parameterized processes out of the box, +which allows to work with flexible, reusable openEO building blocks +in the form of :ref:`user-defined processes ` +or `remote openEO process definitions `_. +This can also be leveraged for job creation in the context of the +:py:class:`~openeo.extra.job_management.MultiBackendJobManager`: +define a "template" job as a parameterized process +and let the job manager fill in the parameters +from a given data frame. + +The :py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` helper class +allows to do exactly that. +Given a reference to a parameterized process, +such as a user-defined process or remote process definition, +it can be used directly as ``start_job`` callable to +:py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs` +which will fill in the process parameters from the dataframe. + +Basic :py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` example +----------------------------------------------------------------------------- + +Basic usage example with a remote process definition: + +.. code-block:: python + :linenos: + :caption: Basic :py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` example snippet + :emphasize-lines: 10-15, 28 + + from openeo.extra.job_management import ( + MultiBackendJobManager, + create_job_db, + ProcessBasedJobCreator, + ) + + # Job creator, based on a parameterized openEO process + # (specified by the remote process definition at given URL) + # which has parameters "start_date" and "bands" for example. + job_starter = ProcessBasedJobCreator( + namespace="https://example.com/my_process.json", + parameter_defaults={ + "bands": ["B02", "B03"], + }, + ) + + # Initialize job database from a dataframe, + # with desired parameter values to fill in. + df = pd.DataFrame({ + "start_date": ["2021-01-01", "2021-02-01", "2021-03-01"], + }) + job_db = create_job_db("jobs.csv").initialize_from_df(df) + + # Create and run job manager, + # which will start a job for each of the `start_date` values in the dataframe + # and use the default band list ["B02", "B03"] for the "bands" parameter. + job_manager = MultiBackendJobManager(...) + job_manager.run_jobs(job_db=job_db, start_job=job_starter) + +In this example, a :py:class:`ProcessBasedJobCreator` is instantiated +based on a remote process definition, +which has parameters ``start_date`` and ``bands``. +When passed to :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs`, +a job for each row in the dataframe will be created, +with parameter values based on matching columns in the dataframe: + +- the ``start_date`` parameter will be filled in + with the values from the "start_date" column of the dataframe, +- the ``bands`` parameter has no corresponding column in the dataframe, + and will get its value from the default specified in the ``parameter_defaults`` argument. + + +:py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` with geometry handling +--------------------------------------------------------------------------------------------- + +Apart from the intuitive name-based parameter-column linking, +:py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` +also automatically links: + +- a process parameters that accepts inline GeoJSON geometries/features + (which practically means it has a schema like ``{"type": "object", "subtype": "geojson"}``, + as produced by :py:meth:`Parameter.geojson `). +- with the geometry column in a `GeoPandas `_ dataframe. + +even if the name of the parameter does not exactly match +the name of the GeoPandas geometry column (``geometry`` by default). +This automatic liking is only done if there is only one +GeoJSON parameter and one geometry column in the dataframe. + + +.. admonition:: to do + + Add example with geometry handling. diff --git a/docs/rst-cheatsheet.rst b/docs/rst-cheatsheet.rst index d1bd37360..c02e4a15d 100644 --- a/docs/rst-cheatsheet.rst +++ b/docs/rst-cheatsheet.rst @@ -50,6 +50,15 @@ More explicit code block with language hint (and no need for double colon) >>> 3 + 5 8 +Code block with additional features (line numbers, caption, highlighted lines, +for more see https://www.sphinx-doc.org/en/master/usage/restructuredtext/directives.html#directive-code-block) + +.. code-block:: python + :linenos: + :caption: how to say hello + :emphasize-lines: 1 + + print("hello world") References: @@ -60,4 +69,8 @@ References: - refer to the reference with:: - :ref:`target` + :ref:`target` or :ref:`custom text ` + +- inline URL references:: + + `Python `_ diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index eb6dd86db..d7f370291 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -1,23 +1,32 @@ import abc +import collections import contextlib import datetime import json import logging +import re import time import warnings from pathlib import Path from threading import Thread from typing import Callable, Dict, List, NamedTuple, Optional, Union +import numpy import pandas as pd import requests import shapely.errors +import shapely.geometry.base import shapely.wkt from requests.adapters import HTTPAdapter, Retry from openeo import BatchJob, Connection +from openeo.internal.processes.parse import ( + Parameter, + Process, + parse_remote_process_definition, +) from openeo.rest import OpenEoApiError -from openeo.util import deep_get, rfc3339 +from openeo.util import LazyLoadCache, deep_get, repr_truncate, rfc3339 _log = logging.getLogger(__name__) @@ -376,7 +385,7 @@ def run_jobs( start_job: Callable[[], BatchJob] = _start_job_default, job_db: Union[str, Path, JobDatabaseInterface, None] = None, **kwargs, - ): + ) -> dict: """Runs jobs, specified in a dataframe, and tracks parameters. :param df: @@ -418,6 +427,10 @@ def run_jobs( Support for Parquet files depends on the ``pyarrow`` package as :ref:`optional dependency `. + :return: dictionary with stats collected during the job running loop. + Note that the set of fields in this dictionary is experimental + and subject to change + .. versionchanged:: 0.31.0 Added support for persisting the job metadata in Parquet format. @@ -426,6 +439,9 @@ def run_jobs( which can be a path to a CSV or Parquet file, or a user-defined :py:class:`JobDatabaseInterface` object. The deprecated ``output_file`` argument is still supported for now. + + .. versionchanged:: 0.33.0 + return a stats dictionary """ # TODO: Defining start_jobs as a Protocol might make its usage more clear, and avoid complicated doctrings, # but Protocols are only supported in Python 3.8 and higher. @@ -453,35 +469,49 @@ def run_jobs( # TODO: start showing deprecation warnings for this usage pattern? job_db.initialize_from_df(df) + stats = collections.defaultdict(int) while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0: - self._job_update_loop(job_db=job_db, start_job=start_job) + self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats) + stats["run_jobs loop"] += 1 + time.sleep(self.poll_sleep) + stats["sleep"] += 1 + + return stats - def _job_update_loop(self, job_db: JobDatabaseInterface, start_job: Callable[[], BatchJob]): + def _job_update_loop( + self, job_db: JobDatabaseInterface, start_job: Callable[[], BatchJob], stats: Optional[dict] = None + ): """ Inner loop logic of job management: go through the necessary jobs to check for status updates, trigger status events, start new jobs when there is room for them, etc. """ + stats = stats if stats is not None else collections.defaultdict(int) + with ignore_connection_errors(context="get statuses"): - self._track_statuses(job_db) + self._track_statuses(job_db, stats=stats) + stats["track_statuses"] += 1 not_started = job_db.get_by_status(statuses=["not_started"], max=200) if len(not_started) > 0: # Check number of jobs running at each backend running = job_db.get_by_status(statuses=["created", "queued", "running"]) + stats["job_db get_by_status"] += 1 per_backend = running.groupby("backend_name").size().to_dict() _log.info(f"Running per backend: {per_backend}") for backend_name in self.backends: backend_load = per_backend.get(backend_name, 0) if backend_load < self.backends[backend_name].parallel_jobs: to_add = self.backends[backend_name].parallel_jobs - backend_load - to_launch = not_started.iloc[0:to_add] - for i in to_launch.index: - self._launch_job(start_job, not_started, i, backend_name) - job_db.persist(to_launch) + for i in not_started.index[0:to_add]: + self._launch_job(start_job, df=not_started, i=i, backend_name=backend_name, stats=stats) + stats["job launch"] += 1 + + job_db.persist(not_started.loc[i : i + 1]) + stats["job_db persist"] += 1 - def _launch_job(self, start_job, df, i, backend_name): + def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = None): """Helper method for launching jobs :param start_job: @@ -504,6 +534,7 @@ def _launch_job(self, start_job, df, i, backend_name): :param backend_name: name of the backend that will execute the job. """ + stats = stats if stats is not None else collections.defaultdict(int) df.loc[i, "backend_name"] = backend_name row = df.loc[i] @@ -511,6 +542,7 @@ def _launch_job(self, start_job, df, i, backend_name): _log.info(f"Starting job on backend {backend_name} for {row.to_dict()}") connection = self._get_connection(backend_name, resilient=True) + stats["start_job call"] += 1 job = start_job( row=row, connection_provider=self._get_connection, @@ -520,23 +552,30 @@ def _launch_job(self, start_job, df, i, backend_name): except requests.exceptions.ConnectionError as e: _log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True) df.loc[i, "status"] = "start_failed" + stats["start_job error"] += 1 else: df.loc[i, "start_time"] = rfc3339.utcnow() if job: df.loc[i, "id"] = job.job_id with ignore_connection_errors(context="get status"): status = job.status() + stats["job get status"] += 1 df.loc[i, "status"] = status if status == "created": # start job if not yet done by callback try: job.start() + stats["job start"] += 1 df.loc[i, "status"] = job.status() + stats["job get status"] += 1 except OpenEoApiError as e: _log.error(e) df.loc[i, "status"] = "start_failed" + stats["job start error"] += 1 else: + # TODO: what is this "skipping" about actually? df.loc[i, "status"] = "skipped" + stats["start_job skipped"] += 1 def on_job_done(self, job: BatchJob, row): """ @@ -619,11 +658,13 @@ def ensure_job_dir_exists(self, job_id: str) -> Path: if not job_dir.exists(): job_dir.mkdir(parents=True) - def _track_statuses(self, job_db: JobDatabaseInterface): + def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = None): """ Tracks status (and stats) of running jobs (in place). Optionally cancels jobs when running too long. """ + stats = stats if stats is not None else collections.defaultdict(int) + active = job_db.get_by_status(statuses=["created", "queued", "running"]) for i in active.index: job_id = active.loc[i, "id"] @@ -634,6 +675,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface): con = self._get_connection(backend_name) the_job = con.job(job_id) job_metadata = the_job.describe() + stats["job describe"] += 1 new_status = job_metadata["status"] _log.info( @@ -641,15 +683,19 @@ def _track_statuses(self, job_db: JobDatabaseInterface): ) if new_status == "finished": + stats["job finished"] += 1 self.on_job_done(the_job, active.loc[i]) if previous_status != "error" and new_status == "error": + stats["job failed"] += 1 self.on_job_error(the_job, active.loc[i]) if previous_status in {"created", "queued"} and new_status == "running": + stats["job started running"] += 1 active.loc[i, "running_start_time"] = rfc3339.utcnow() if new_status == "canceled": + stats["job canceled"] += 1 self.on_job_cancel(the_job, active.loc[i]) if self._cancel_running_job_after and new_status == "running": @@ -663,10 +709,14 @@ def _track_statuses(self, job_db: JobDatabaseInterface): active.loc[i, key] = _format_usage_stat(job_metadata, key) except OpenEoApiError as e: + stats["job tracking error"] += 1 print(f"error for job {job_id!r} on backend {backend_name}") print(e) + + stats["job_db persist"] += 1 job_db.persist(active) + def _format_usage_stat(job_metadata: dict, field: str) -> str: value = deep_get(job_metadata, "usage", field, "value", default=0) unit = deep_get(job_metadata, "usage", field, "unit", default="") @@ -886,3 +936,180 @@ def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = else: raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.") return job_db + + +class ProcessBasedJobCreator: + """ + Batch job creator + (to be used together with :py:class:`MultiBackendJobManager`) + that takes a parameterized openEO process definition + (e.g a user-defined process (UDP) or a remote openEO process definition), + and creates a batch job + for each row of the dataframe managed by the :py:class:`MultiBackendJobManager` + by filling in the process parameters with corresponding row values. + + .. seealso:: + See :ref:`job-management-with-process-based-job-creator` + for more information and examples. + + Process parameters are linked to dataframe columns by name. + While this intuitive name-based matching should cover most use cases, + there are additional options for overrides or fallbacks: + + - When provided, ``parameter_column_map`` will be consulted + for resolving a process parameter name (key in the dictionary) + to a desired dataframe column name (corresponding value). + - One common case is handled automatically as convenience functionality. + + When: + + - ``parameter_column_map`` is not provided (or set to ``None``), + - and there is a *single parameter* that accepts inline GeoJSON geometries, + - and the dataframe is a GeoPandas dataframe with a *single geometry* column, + + then this parameter and this geometries column will be linked automatically. + + - If a parameter can not be matched with a column by name as described above, + a default value will be picked, + first by looking in ``parameter_defaults`` (if provided), + and then by looking up the default value from the parameter schema in the process definition. + - Finally if no (default) value can be determined and the parameter + is not flagged as optional, an error will be raised. + + + :param process_id: (optional) openEO process identifier. + Can be omitted when working with a remote process definition + that is fully defined with a URL in the ``namespace`` parameter. + :param namespace: (optional) openEO process namespace. + Typically used to provide a URL to a remote process definition. + :param parameter_defaults: (optional) default values for process parameters, + to be used when not available in the dataframe managed by + :py:class:`MultiBackendJobManager`. + :param parameter_column_map: Optional overrides + for linking process parameters to dataframe columns: + mapping of process parameter names as key + to dataframe column names as value. + + .. versionadded:: 0.33.0 + + .. warning:: + This is an experimental API subject to change, + and we greatly welcome + `feedback and suggestions for improvement `_. + + """ + def __init__( + self, + *, + process_id: Optional[str] = None, + namespace: Union[str, None] = None, + parameter_defaults: Optional[dict] = None, + parameter_column_map: Optional[dict] = None, + ): + if process_id is None and namespace is None: + raise ValueError("At least one of `process_id` and `namespace` should be provided.") + self._process_id = process_id + self._namespace = namespace + self._parameter_defaults = parameter_defaults or {} + self._parameter_column_map = parameter_column_map + self._cache = LazyLoadCache() + + def _get_process_definition(self, connection: Connection) -> Process: + if isinstance(self._namespace, str) and re.match("https?://", self._namespace): + # Remote process definition handling + return self._cache.get( + key=("remote_process_definition", self._namespace, self._process_id), + load=lambda: parse_remote_process_definition(namespace=self._namespace, process_id=self._process_id), + ) + elif self._namespace is None: + # Handling of a user-specific UDP + udp_raw = connection.user_defined_process(self._process_id).describe() + return Process.from_dict(udp_raw) + else: + raise NotImplementedError( + f"Unsupported process definition source udp_id={self._process_id!r} namespace={self._namespace!r}" + ) + + + def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: + """ + Implementation of the ``start_job`` callable interface + of :py:meth:`MultiBackendJobManager.run_jobs` + to create a job based on given dataframe row + + :param row: The row in the pandas dataframe that stores the jobs state and other tracked data. + :param connection: The connection to the backend. + """ + # TODO: refactor out some methods, for better reuse and decoupling: + # `get_arguments()` (to build the arguments dictionary), `get_cube()` (to create the cube), + + process_definition = self._get_process_definition(connection=connection) + process_id = process_definition.id + parameters = process_definition.parameters or [] + + if self._parameter_column_map is None: + self._parameter_column_map = self._guess_parameter_column_map(parameters=parameters, row=row) + + arguments = {} + for parameter in parameters: + param_name = parameter.name + column_name = self._parameter_column_map.get(param_name, param_name) + if column_name in row.index: + # Get value from dataframe row + value = row.loc[column_name] + elif param_name in self._parameter_defaults: + # Fallback on default values from constructor + value = self._parameter_defaults[param_name] + elif parameter.has_default(): + # Explicitly use default value from parameter schema + value = parameter.default + elif parameter.optional: + # Skip optional parameters without any fallback default value + continue + else: + raise ValueError(f"Missing required parameter {param_name !r} for process {process_id!r}") + + # Prepare some values/dtypes for JSON encoding + if isinstance(value, numpy.integer): + value = int(value) + elif isinstance(value, numpy.number): + value = float(value) + elif isinstance(value, shapely.geometry.base.BaseGeometry): + value = shapely.geometry.mapping(value) + + arguments[param_name] = value + + cube = connection.datacube_from_process(process_id=process_id, namespace=self._namespace, **arguments) + + title = row.get("title", f"Process {process_id!r} with {repr_truncate(arguments)}") + description = row.get("description", f"Process {process_id!r} (namespace {self._namespace}) with {arguments}") + job = connection.create_job(cube, title=title, description=description) + + return job + + def __call__(self, *arg, **kwargs) -> BatchJob: + """Syntactic sugar for calling :py:meth:`start_job`.""" + return self.start_job(*arg, **kwargs) + + @staticmethod + def _guess_parameter_column_map(parameters: List[Parameter], row: pd.Series) -> dict: + """ + Guess parameter-column mapping from given parameter list and dataframe row + """ + parameter_column_map = {} + # Geometry based mapping: try to automatically map geometry columns to geojson parameters + geojson_parameters = [p.name for p in parameters if p.schema.accepts_geojson()] + geometry_columns = [i for (i, v) in row.items() if isinstance(v, shapely.geometry.base.BaseGeometry)] + if geojson_parameters and geometry_columns: + if len(geojson_parameters) == 1 and len(geometry_columns) == 1: + # Most common case: one geometry parameter and one geometry column: can be mapped naively + parameter_column_map[geojson_parameters[0]] = geometry_columns[0] + elif all(p in geometry_columns for p in geojson_parameters): + # Each geometry param has geometry column with same name: easy to map + parameter_column_map.update((p, p) for p in geojson_parameters) + else: + raise RuntimeError( + f"Problem with mapping geometry columns ({geometry_columns}) to process parameters ({geojson_parameters})" + ) + _log.debug(f"Guessed parameter-column map: {parameter_column_map}") + return parameter_column_map diff --git a/openeo/internal/processes/parse.py b/openeo/internal/processes/parse.py index afb97dfdb..1e22ba6bc 100644 --- a/openeo/internal/processes/parse.py +++ b/openeo/internal/processes/parse.py @@ -6,18 +6,18 @@ from __future__ import annotations import json +import re import typing from pathlib import Path -from typing import Iterator, List, Union +from typing import Any, Iterator, List, Optional, Union import requests -class Schema: +class Schema(typing.NamedTuple): """Schema description of an openEO process parameter or return value.""" - def __init__(self, schema: Union[dict, list]): - self.schema = schema + schema: Union[dict, list] @classmethod def from_dict(cls, data: dict) -> Schema: @@ -31,20 +31,31 @@ def is_process_graph(self) -> bool: and self.schema.get("subtype") == "process-graph" ) + def accepts_geojson(self) -> bool: + """Does this schema accept inline GeoJSON objects?""" -class Parameter: - """openEO process parameter""" + def is_geojson_schema(schema) -> bool: + return isinstance(schema, dict) and schema.get("type") == "object" and schema.get("subtype") == "geojson" - # TODO unify with openeo.api.process.Parameter? + if isinstance(self.schema, dict): + return is_geojson_schema(self.schema) + elif isinstance(self.schema, list): + return any(is_geojson_schema(s) for s in self.schema) + return False - NO_DEFAULT = object() - def __init__(self, name: str, description: str, schema: Schema, default=NO_DEFAULT, optional: bool = False): - self.name = name - self.description = description - self.schema = schema - self.default = default - self.optional = optional +_NO_DEFAULT = object() + + +class Parameter(typing.NamedTuple): + """openEO process parameter""" + # TODO unify with openeo.api.process.Parameter? + + name: str + description: str + schema: Schema + default: Any = _NO_DEFAULT + optional: bool = False @classmethod def from_dict(cls, data: dict) -> Parameter: @@ -52,12 +63,12 @@ def from_dict(cls, data: dict) -> Parameter: name=data["name"], description=data["description"], schema=Schema.from_dict(data["schema"]), - default=data.get("default", cls.NO_DEFAULT), + default=data.get("default", _NO_DEFAULT), optional=data.get("optional", False), ) def has_default(self): - return self.default is not self.NO_DEFAULT + return self.default is not _NO_DEFAULT class Returns: @@ -73,13 +84,20 @@ def from_dict(cls, data: dict) -> Returns: class Process(typing.NamedTuple): - """An openEO process""" - + """ + Container for a opneEO process definition of an openEO process, + covering pre-defined processes, user-defined processes, + remote process definitions, etc. + """ + + # Common-denominator-wise only the process id is a required field in a process definition. + # Depending on the context in the openEO API, some other fields (e.g. "process_graph") + # may also be required. id: str - parameters: List[Parameter] - returns: Returns - description: str = "" - summary: str = "" + parameters: Optional[List[Parameter]] = None + returns: Optional[Returns] = None + description: Optional[str] = None + summary: Optional[str] = None # TODO: more properties? @classmethod @@ -87,10 +105,10 @@ def from_dict(cls, data: dict) -> Process: """Construct openEO process from dictionary values""" return cls( id=data["id"], - parameters=[Parameter.from_dict(d) for d in data["parameters"]], - returns=Returns.from_dict(data["returns"]), - description=data["description"], - summary=data["summary"], + parameters=[Parameter.from_dict(d) for d in data["parameters"]] if "parameters" in data else None, + returns=Returns.from_dict(data["returns"]) if "returns" in data else None, + description=data.get("description"), + summary=data.get("summary"), ) @classmethod @@ -114,3 +132,33 @@ def parse_all_from_dir(path: Union[str, Path], pattern="*.json") -> Iterator[Pro """Parse all openEO process files in given directory""" for p in sorted(Path(path).glob(pattern)): yield Process.from_json_file(p) + + +def parse_remote_process_definition(namespace: str, process_id: Optional[str] = None) -> Process: + """ + Parse a process definition as defined by the "Remote Process Definition Extension" spec + https://github.com/Open-EO/openeo-api/tree/draft/extensions/remote-process-definition + """ + if not re.match("https?://", namespace): + raise ValueError(f"Expected absolute URL, but got {namespace!r}") + + resp = requests.get(url=namespace) + resp.raise_for_status() + data = resp.json() + assert isinstance(data, dict) + + if "id" not in data and "processes" in data and isinstance(data["processes"], list): + # Handle process listing: filter out right process + if not isinstance(process_id, str): + raise ValueError(f"Working with process listing, but got invalid process id {process_id!r}") + processes = [p for p in data["processes"] if p.get("id") == process_id] + if len(processes) != 1: + raise LookupError(f"Process {process_id!r} not found in process listing {namespace!r}") + (data,) = processes + + # Some final validation. + assert "id" in data, "Process definition should at least have an 'id' field" + if process_id is not None and data["id"] != process_id: + raise LookupError(f"Expected process id {process_id!r}, but found {data['id']!r}") + + return Process.from_dict(data) diff --git a/openeo/rest/_testing.py b/openeo/rest/_testing.py index b44002687..2f8209dc2 100644 --- a/openeo/rest/_testing.py +++ b/openeo/rest/_testing.py @@ -1,10 +1,13 @@ +import collections import json import re -from typing import Optional, Union +from typing import Callable, Iterator, Optional, Sequence, Union from openeo import Connection, DataCube from openeo.rest.vectorcube import VectorCube +OPENEO_BACKEND = "https://openeo.test/" + class OpeneoTestingException(Exception): pass @@ -23,6 +26,8 @@ class DummyBackend: "validation_requests", "next_result", "next_validation_errors", + "job_status_updater", + "extra_job_metadata_fields", ) # Default result (can serve both as JSON or binary data) @@ -35,6 +40,14 @@ def __init__(self, requests_mock, connection: Connection): self.validation_requests = [] self.next_result = self.DEFAULT_RESULT self.next_validation_errors = [] + self.extra_job_metadata_fields = [] + + # Job status update hook: + # callable that is called on starting a job, and getting job metadata + # allows to dynamically change how the status of a job evolves + # By default: immediately set to "finished" once job is started + self.job_status_updater = lambda job_id, current_status: "finished" + requests_mock.post( connection.build_url("/result"), content=self._handle_post_result, @@ -70,9 +83,13 @@ def _handle_post_result(self, request, context): def _handle_post_jobs(self, request, context): """handler of `POST /jobs` (create batch job)""" - pg = request.json()["process"]["process_graph"] + post_data = request.json() + pg = post_data["process"]["process_graph"] job_id = f"job-{len(self.batch_jobs):03d}" - self.batch_jobs[job_id] = {"job_id": job_id, "pg": pg, "status": "created"} + job_data = {"job_id": job_id, "pg": pg, "status": "created"} + for field in self.extra_job_metadata_fields: + job_data[field] = post_data.get(field) + self.batch_jobs[job_id] = job_data context.status_code = 201 context.headers["openeo-identifier"] = job_id @@ -88,13 +105,19 @@ def _handle_post_job_results(self, request, context): """Handler of `POST /job/{job_id}/results` (start batch job).""" job_id = self._get_job_id(request) assert self.batch_jobs[job_id]["status"] == "created" - # TODO: support custom status sequence (instead of directly going to status "finished")? - self.batch_jobs[job_id]["status"] = "finished" + self.batch_jobs[job_id]["status"] = self.job_status_updater( + job_id=job_id, current_status=self.batch_jobs[job_id]["status"] + ) context.status_code = 202 def _handle_get_job(self, request, context): """Handler of `GET /job/{job_id}` (get batch job status and metadata).""" job_id = self._get_job_id(request) + # Allow updating status with `job_status_setter` once job got past status "created" + if self.batch_jobs[job_id]["status"] != "created": + self.batch_jobs[job_id]["status"] = self.job_status_updater( + job_id=job_id, current_status=self.batch_jobs[job_id]["status"] + ) return {"id": job_id, "status": self.batch_jobs[job_id]["status"]} def _handle_get_job_results(self, request, context): @@ -160,6 +183,21 @@ def execute(self, cube: Union[DataCube, VectorCube], process_id: Optional[str] = cube.execute() return self.get_pg(process_id=process_id) + def setup_simple_job_status_flow(self, *, queued: int = 1, running: int = 4, final: str = "finished"): + """ + Set up simple job status flow: + queued (a couple of times) -> running (a couple of times) -> finished/error. + """ + template = ["queued"] * queued + ["running"] * running + [final] + job_stacks = collections.defaultdict(template.copy) + + def get_status(job_id: str, current_status: str) -> str: + stack = job_stacks[job_id] + # Pop first item each time, but repeat the last one at the end + return stack.pop(0) if len(stack) > 1 else stack[0] + + self.job_status_updater = get_status + def build_capabilities( *, diff --git a/openeo/rest/connection.py b/openeo/rest/connection.py index 25ba304ac..65126cf9a 100644 --- a/openeo/rest/connection.py +++ b/openeo/rest/connection.py @@ -1732,7 +1732,7 @@ def execute( def create_job( self, - process_graph: Union[dict, str, Path], + process_graph: Union[dict, str, Path, FlatGraphableMixin], *, title: Optional[str] = None, description: Optional[str] = None, diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 87a483652..c9a612af9 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1,3 +1,4 @@ +import copy import json import re import threading @@ -29,16 +30,28 @@ CsvJobDatabase, MultiBackendJobManager, ParquetJobDatabase, + ProcessBasedJobCreator, create_job_db, get_job_db, ) +from openeo.rest._testing import OPENEO_BACKEND, DummyBackend, build_capabilities from openeo.util import rfc3339 +@pytest.fixture +def con(requests_mock) -> openeo.Connection: + requests_mock.get(OPENEO_BACKEND, json=build_capabilities(api_version="1.2.0", udp=True)) + con = openeo.Connection(OPENEO_BACKEND) + return con + + class FakeBackend: """ Fake openEO backend with some basic job management functionality for testing job manager logic. """ + + # TODO: replace/merge with openeo.rest._testing.DummyBackend + def __init__(self, *, backend_root_url: str = "http://openeo.test", requests_mock): self.url = backend_root_url.rstrip("/") requests_mock.get(f"{self.url}/", json={"api_version": "1.1.0"}) @@ -73,13 +86,14 @@ def _handle_cancel_job(self, request, context): context.status_code = 204 +@pytest.fixture +def sleep_mock(): + with mock.patch("time.sleep") as sleep: + yield sleep + class TestMultiBackendJobManager: - @pytest.fixture - def sleep_mock(self): - with mock.patch("time.sleep") as sleep: - yield sleep def test_basic_legacy(self, tmp_path, requests_mock, sleep_mock): """ @@ -100,8 +114,17 @@ def start_job(row, connection, **kwargs): year = int(row["year"]) return BatchJob(job_id=f"job-{year}", connection=connection) - manager.run_jobs(df=df, start_job=start_job, output_file=output_file) - assert sleep_mock.call_count > 10 + run_stats = manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + assert run_stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=10), + "start_job call": 7, # TODO? + "job started running": 5, + "job finished": 5, + "job_db persist": dirty_equals.IsInt(gt=5), + "run_jobs loop": dirty_equals.IsInt(gt=5), + } + ) result = pd.read_csv(output_file) assert len(result) == 5 @@ -135,8 +158,17 @@ def start_job(row, connection, **kwargs): job_db = CsvJobDatabase(output_file).initialize_from_df(df) - manager.run_jobs(job_db=job_db, start_job=start_job) - assert sleep_mock.call_count > 10 + run_stats = manager.run_jobs(job_db=job_db, start_job=start_job) + assert run_stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=10), + "start_job call": 7, # TODO? + "job started running": 5, + "job finished": 5, + "job_db persist": dirty_equals.IsInt(gt=5), + "run_jobs loop": dirty_equals.IsInt(gt=5), + } + ) result = pd.read_csv(output_file) assert len(result) == 5 @@ -163,8 +195,14 @@ def start_job(row, connection, **kwargs): output_file = tmp_path / "jobs.db" job_db = db_class(output_file).initialize_from_df(df) - manager.run_jobs(job_db=job_db, start_job=start_job) - assert sleep_mock.call_count > 10 + run_stats = manager.run_jobs(job_db=job_db, start_job=start_job) + assert run_stats == dirty_equals.IsPartialDict( + { + "start_job call": 7, # TODO? + "job finished": 5, + "job_db persist": dirty_equals.IsInt(gt=5), + } + ) result = job_db.read() assert len(result) == 5 @@ -192,8 +230,14 @@ def start_job(row, connection, **kwargs): output_file = tmp_path / filename job_db = create_job_db(path=output_file, df=df) - manager.run_jobs(job_db=job_db, start_job=start_job) - assert sleep_mock.call_count > 10 + run_stats = manager.run_jobs(job_db=job_db, start_job=start_job) + assert run_stats == dirty_equals.IsPartialDict( + { + "start_job call": 7, # TODO? + "job finished": 5, + "job_db persist": dirty_equals.IsInt(gt=5), + } + ) result = job_db.read() assert len(result) == 5 @@ -222,6 +266,7 @@ def start_job(row, connection, **kwargs): # Trigger context switch to job thread sleep(1) manager.stop_job_thread() + # TODO #645 how to collect stats with the threaded run_job? assert sleep_mock.call_count > 10 result = pd.read_csv(output_file) @@ -530,8 +575,12 @@ def start_job(row, connection_provider, connection, **kwargs): output_file = tmp_path / "jobs.csv" - manager.run_jobs(df=df, start_job=start_job, output_file=output_file) - assert sleep_mock.call_count > 3 + run_stats = manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + assert run_stats == dirty_equals.IsPartialDict( + { + "start_job call": 1, + } + ) # Sanity check: the job succeeded result = pd.read_csv(output_file) @@ -602,6 +651,7 @@ def start_job(row, connection_provider, connection, **kwargs): with pytest.raises(requests.exceptions.RetryError) as exc: manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + # TODO #645 how to still check stats when run_jobs raised exception? assert sleep_mock.call_count > 3 # Sanity check: the job has status "error" @@ -957,3 +1007,648 @@ def test_create_job_db(tmp_path, filename, expected): db = create_job_db(path=path, df=df) assert isinstance(db, expected) assert path.exists() + + +class TestProcessBasedJobCreator: + @pytest.fixture + def dummy_backend(self, requests_mock, con) -> DummyBackend: + dummy = DummyBackend(requests_mock=requests_mock, connection=con) + dummy.setup_simple_job_status_flow(queued=2, running=3, final="finished") + return dummy + + PG_3PLUS5 = { + "id": "3plus5", + "process_graph": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}, + } + PG_INCREMENT = { + "id": "increment", + "parameters": [ + {"name": "data", "description": "data", "schema": {"type": "number"}}, + { + "name": "increment", + "description": "increment", + "schema": {"type": "number"}, + "optional": True, + "default": 1, + }, + ], + "process_graph": { + "process_id": "add", + "arguments": {"x": {"from_parameter": "data"}, "y": {"from_parameter": "increment"}}, + "result": True, + }, + } + PG_OFFSET_POLYGON = { + "id": "offset_polygon", + "parameters": [ + {"name": "data", "description": "data", "schema": {"type": "number"}}, + { + "name": "polygons", + "description": "polygons", + "schema": { + "title": "GeoJSON", + "type": "object", + "subtype": "geojson", + }, + }, + { + "name": "offset", + "description": "Offset", + "schema": {"type": "number"}, + "optional": True, + "default": 0, + }, + ], + } + + @pytest.fixture(autouse=True) + def remote_process_definitions(self, requests_mock) -> dict: + mocks = {} + processes = [self.PG_3PLUS5, self.PG_INCREMENT, self.PG_OFFSET_POLYGON] + mocks["_all"] = requests_mock.get("https://remote.test/_all", json={"processes": processes, "links": []}) + for pg in processes: + process_id = pg["id"] + mocks[process_id] = requests_mock.get(f"https://remote.test/{process_id}.json", json=pg) + return mocks + + def test_minimal(self, con, dummy_backend, remote_process_definitions): + """Bare minimum: just start a job, no parameters/arguments""" + job_factory = ProcessBasedJobCreator(process_id="3plus5", namespace="https://remote.test/3plus5.json") + + job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con) + assert isinstance(job, BatchJob) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "3plus51": { + "process_id": "3plus5", + "namespace": "https://remote.test/3plus5.json", + "arguments": {}, + "result": True, + } + }, + "status": "created", + } + } + + assert remote_process_definitions["3plus5"].call_count == 1 + + def test_basic(self, con, dummy_backend, remote_process_definitions): + """Basic parameterized UDP job generation""" + dummy_backend.extra_job_metadata_fields = ["title", "description"] + job_factory = ProcessBasedJobCreator(process_id="increment", namespace="https://remote.test/increment.json") + + job = job_factory.start_job(row=pd.Series({"data": 123}), connection=con) + assert isinstance(job, BatchJob) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 123, "increment": 1}, + "result": True, + } + }, + "status": "created", + "title": "Process 'increment' with {'data': 123, 'increment': 1}", + "description": "Process 'increment' (namespace https://remote.test/increment.json) with {'data': 123, 'increment': 1}", + } + } + assert remote_process_definitions["increment"].call_count == 1 + + @pytest.mark.parametrize( + ["parameter_defaults", "row", "expected_arguments"], + [ + (None, {"data": 123}, {"data": 123, "increment": 1}), + (None, {"data": 123, "increment": 5}, {"data": 123, "increment": 5}), + ({"increment": 5}, {"data": 123}, {"data": 123, "increment": 5}), + ({"increment": 5}, {"data": 123, "increment": 1000}, {"data": 123, "increment": 1000}), + ], + ) + def test_basic_parameterization(self, con, dummy_backend, parameter_defaults, row, expected_arguments): + """Basic parameterized UDP job generation""" + job_factory = ProcessBasedJobCreator( + process_id="increment", + namespace="https://remote.test/increment.json", + parameter_defaults=parameter_defaults, + ) + + job = job_factory.start_job(row=pd.Series(row), connection=con) + assert isinstance(job, BatchJob) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": expected_arguments, + "result": True, + } + }, + "status": "created", + } + } + + @pytest.mark.parametrize( + ["process_id", "namespace", "expected"], + [ + ( + # Classic UDP reference + "3plus5", + None, + {"process_id": "3plus5"}, + ), + ( + # Remote process definition (with "redundant" process_id) + "3plus5", + "https://remote.test/3plus5.json", + {"process_id": "3plus5", "namespace": "https://remote.test/3plus5.json"}, + ), + ( + # Remote process definition with just namespace (process_id should be inferred from that) + None, + "https://remote.test/3plus5.json", + {"process_id": "3plus5", "namespace": "https://remote.test/3plus5.json"}, + ), + ( + # Remote process definition from listing + "3plus5", + "https://remote.test/_all", + {"process_id": "3plus5", "namespace": "https://remote.test/_all"}, + ), + ], + ) + def test_process_references_in_constructor( + self, con, requests_mock, dummy_backend, remote_process_definitions, process_id, namespace, expected + ): + """Various ways to provide process references in the constructor""" + + # Register personal UDP + requests_mock.get(con.build_url("/process_graphs/3plus5"), json=self.PG_3PLUS5) + + job_factory = ProcessBasedJobCreator(process_id=process_id, namespace=namespace) + + job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con) + assert isinstance(job, BatchJob) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": {"3plus51": {**expected, "arguments": {}, "result": True}}, + "status": "created", + } + } + + def test_no_process_id_nor_namespace(self): + with pytest.raises(ValueError, match="At least one of `process_id` and `namespace` should be provided"): + _ = ProcessBasedJobCreator() + + @pytest.fixture + def job_manager(self, tmp_path, dummy_backend) -> MultiBackendJobManager: + job_manager = MultiBackendJobManager(root_dir=tmp_path / "job_mgr_root") + job_manager.add_backend("dummy", connection=dummy_backend.connection, parallel_jobs=1) + return job_manager + + def test_with_job_manager_remote_basic( + self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, remote_process_definitions + ): + job_starter = ProcessBasedJobCreator( + process_id="increment", + namespace="https://remote.test/increment.json", + parameter_defaults={"increment": 5}, + ) + + df = pd.DataFrame({"data": [1, 2, 3]}) + job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=1), + "start_job call": 3, + "job start": 3, + "job started running": 3, + "job finished": 3, + } + ) + assert set(job_db.read().status) == {"finished"} + + # Verify caching of HTTP request of remote process definition + assert remote_process_definitions["increment"].call_count == 1 + + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 1, "increment": 5}, + "result": True, + } + }, + "status": "finished", + }, + "job-001": { + "job_id": "job-001", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 2, "increment": 5}, + "result": True, + } + }, + "status": "finished", + }, + "job-002": { + "job_id": "job-002", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 3, "increment": 5}, + "result": True, + } + }, + "status": "finished", + }, + } + + @pytest.mark.parametrize( + ["parameter_defaults", "df_data", "expected_arguments"], + [ + ( + {"increment": 5}, + {"data": [1, 2, 3]}, + { + "job-000": {"data": 1, "increment": 5}, + "job-001": {"data": 2, "increment": 5}, + "job-002": {"data": 3, "increment": 5}, + }, + ), + ( + None, + {"data": [1, 2, 3], "increment": [44, 55, 66]}, + { + "job-000": {"data": 1, "increment": 44}, + "job-001": {"data": 2, "increment": 55}, + "job-002": {"data": 3, "increment": 66}, + }, + ), + ( + {"increment": 5555}, + {"data": [1, 2, 3], "increment": [44, 55, 66]}, + { + "job-000": {"data": 1, "increment": 44}, + "job-001": {"data": 2, "increment": 55}, + "job-002": {"data": 3, "increment": 66}, + }, + ), + ], + ) + def test_with_job_manager_remote_parameter_handling( + self, + tmp_path, + requests_mock, + dummy_backend, + job_manager, + sleep_mock, + parameter_defaults, + df_data, + expected_arguments, + ): + job_starter = ProcessBasedJobCreator( + process_id="increment", + namespace="https://remote.test/increment.json", + parameter_defaults=parameter_defaults, + ) + + df = pd.DataFrame(df_data) + job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=1), + "start_job call": 3, + "job start": 3, + "job finished": 3, + } + ) + assert set(job_db.read().status) == {"finished"} + + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": expected_arguments["job-000"], + "result": True, + } + }, + "status": "finished", + }, + "job-001": { + "job_id": "job-001", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": expected_arguments["job-001"], + "result": True, + } + }, + "status": "finished", + }, + "job-002": { + "job_id": "job-002", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": expected_arguments["job-002"], + "result": True, + } + }, + "status": "finished", + }, + } + + def test_with_job_manager_remote_geometry(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock): + job_starter = ProcessBasedJobCreator( + process_id="offset_polygon", + namespace="https://remote.test/offset_polygon.json", + parameter_defaults={"data": 123}, + ) + + df = geopandas.GeoDataFrame.from_features( + { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "id": "one", + "properties": {"offset": 11}, + "geometry": {"type": "Point", "coordinates": (1.0, 2.0)}, + }, + { + "type": "Feature", + "id": "two", + "properties": {"offset": 22}, + "geometry": {"type": "Point", "coordinates": (3.0, 4.0)}, + }, + ], + } + ) + + job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=1), + "start_job call": 2, + "job start": 2, + "job finished": 2, + } + ) + assert set(job_db.read().status) == {"finished"} + + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "offsetpolygon1": { + "process_id": "offset_polygon", + "namespace": "https://remote.test/offset_polygon.json", + "arguments": { + "data": 123, + "polygons": {"type": "Point", "coordinates": [1.0, 2.0]}, + "offset": 11, + }, + "result": True, + } + }, + "status": "finished", + }, + "job-001": { + "job_id": "job-001", + "pg": { + "offsetpolygon1": { + "process_id": "offset_polygon", + "namespace": "https://remote.test/offset_polygon.json", + "arguments": { + "data": 123, + "polygons": {"type": "Point", "coordinates": [3.0, 4.0]}, + "offset": 22, + }, + "result": True, + } + }, + "status": "finished", + }, + } + + @pytest.mark.parametrize( + ["db_class"], + [ + (CsvJobDatabase,), + (ParquetJobDatabase,), + ], + ) + def test_with_job_manager_remote_geometry_after_resume( + self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, db_class + ): + """Test if geometry handling works properly after resuming from CSV serialized job db.""" + job_starter = ProcessBasedJobCreator( + process_id="offset_polygon", + namespace="https://remote.test/offset_polygon.json", + parameter_defaults={"data": 123}, + ) + + df = geopandas.GeoDataFrame.from_features( + { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "id": "one", + "properties": {"offset": 11}, + "geometry": {"type": "Point", "coordinates": (1.0, 2.0)}, + }, + { + "type": "Feature", + "id": "two", + "properties": {"offset": 22}, + "geometry": {"type": "Point", "coordinates": (3.0, 4.0)}, + }, + ], + } + ) + + # Persist the job db to CSV/Parquet/... + job_db_path = tmp_path / "jobs.db" + _ = db_class(job_db_path).initialize_from_df(df) + assert job_db_path.exists() + + # Resume from persisted job db + job_db = db_class(job_db_path) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=1), + "start_job call": 2, + "job start": 2, + "job finished": 2, + } + ) + assert set(job_db.read().status) == {"finished"} + + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "offsetpolygon1": { + "process_id": "offset_polygon", + "namespace": "https://remote.test/offset_polygon.json", + "arguments": { + "data": 123, + "polygons": {"type": "Point", "coordinates": [1.0, 2.0]}, + "offset": 11, + }, + "result": True, + } + }, + "status": "finished", + }, + "job-001": { + "job_id": "job-001", + "pg": { + "offsetpolygon1": { + "process_id": "offset_polygon", + "namespace": "https://remote.test/offset_polygon.json", + "arguments": { + "data": 123, + "polygons": {"type": "Point", "coordinates": [3.0, 4.0]}, + "offset": 22, + }, + "result": True, + } + }, + "status": "finished", + }, + } + + def test_with_job_manager_udp_basic( + self, tmp_path, requests_mock, con, dummy_backend, job_manager, sleep_mock, remote_process_definitions + ): + # make deep copy + udp = copy.deepcopy(self.PG_INCREMENT) + # Register personal UDP + increment_udp_mock = requests_mock.get(con.build_url("/process_graphs/increment"), json=udp) + + job_starter = ProcessBasedJobCreator( + process_id="increment", + # No namespace to trigger personal UDP mode + namespace=None, + parameter_defaults={"increment": 5}, + ) + assert increment_udp_mock.call_count == 0 + + df = pd.DataFrame({"data": [3, 5]}) + job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "start_job call": 2, + "job finished": 2, + } + ) + assert increment_udp_mock.call_count == 2 + assert set(job_db.read().status) == {"finished"} + + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "arguments": {"data": 3, "increment": 5}, + "result": True, + } + }, + "status": "finished", + }, + "job-001": { + "job_id": "job-001", + "pg": { + "increment1": { + "process_id": "increment", + "arguments": {"data": 5, "increment": 5}, + "result": True, + } + }, + "status": "finished", + }, + } + + def test_with_job_manager_parameter_column_map( + self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, remote_process_definitions + ): + job_starter = ProcessBasedJobCreator( + process_id="increment", + namespace="https://remote.test/increment.json", + parameter_column_map={"data": "numberzzz", "increment": "add_thiz"}, + ) + + df = pd.DataFrame( + { + "data": [1, 2], + "increment": [-1, -2], + "numberzzz": [3, 5], + "add_thiz": [100, 200], + } + ) + job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "start_job call": 2, + "job finished": 2, + } + ) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 3, "increment": 100}, + "result": True, + } + }, + "status": "finished", + }, + "job-001": { + "job_id": "job-001", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 5, "increment": 200}, + "result": True, + } + }, + "status": "finished", + }, + } diff --git a/tests/internal/processes/test_parse.py b/tests/internal/processes/test_parse.py index db2dbf351..48405e537 100644 --- a/tests/internal/processes/test_parse.py +++ b/tests/internal/processes/test_parse.py @@ -1,4 +1,13 @@ -from openeo.internal.processes.parse import Parameter, Process, Returns, Schema +import pytest + +from openeo.internal.processes.parse import ( + _NO_DEFAULT, + Parameter, + Process, + Returns, + Schema, + parse_remote_process_definition, +) def test_schema(): @@ -6,6 +15,27 @@ def test_schema(): assert s.schema == {"type": "number"} +def test_schema_equality(): + assert Schema({"type": "number"}) == Schema({"type": "number"}) + assert Schema({"type": "number"}) == Schema.from_dict({"type": "number"}) + + assert Schema({"type": "number"}) != Schema({"type": "string"}) + + +@pytest.mark.parametrize( + ["schema", "expected"], + [ + ({"type": "object", "subtype": "geojson"}, True), + ({"type": "object"}, False), + ({"subtype": "geojson"}, False), + ({"type": "object", "subtype": "vectorzz"}, False), + ], +) +def test_schema_accepts_geojson(schema, expected): + assert Schema(schema).accepts_geojson() == expected + assert Schema([{"type": "number"}, schema]).accepts_geojson() == expected + + def test_parameter(): p = Parameter.from_dict({ "name": "foo", @@ -15,7 +45,7 @@ def test_parameter(): assert p.name == "foo" assert p.description == "Foo amount" assert p.schema.schema == {"type": "number"} - assert p.default is Parameter.NO_DEFAULT + assert p.default is _NO_DEFAULT assert p.optional is False @@ -39,6 +69,14 @@ def test_parameter_default_none(): assert p.default is None +def test_parameter_equality(): + p1 = Parameter.from_dict({"name": "foo", "description": "Foo", "schema": {"type": "number"}}) + p2 = Parameter.from_dict({"name": "foo", "description": "Foo", "schema": {"type": "number"}}) + p3 = Parameter.from_dict({"name": "foo", "description": "Foo", "schema": {"type": "string"}}) + assert p1 == p2 + assert p1 != p3 + + def test_returns(): r = Returns.from_dict({ "description": "Roo", @@ -98,3 +136,94 @@ def test_process_from_json(): assert p.parameters[0].schema.schema == {"type": ["number", "null"]} assert p.returns.description == "The computed absolute value." assert p.returns.schema.schema == {"type": ["number", "null"], "minimum": 0} + + +def test_parse_remote_process_definition_minimal(requests_mock): + url = "https://example.com/ndvi.json" + requests_mock.get(url, json={"id": "ndvi"}) + process = parse_remote_process_definition(url) + assert process.id == "ndvi" + assert process.parameters is None + assert process.returns is None + assert process.description is None + assert process.summary is None + + +def test_parse_remote_process_definition_parameters(requests_mock): + url = "https://example.com/ndvi.json" + requests_mock.get( + url, + json={ + "id": "ndvi", + "parameters": [ + {"name": "incr", "description": "Increment", "schema": {"type": "number"}}, + {"name": "scales", "description": "Scales", "default": [1, 1], "schema": {"type": "number"}}, + ], + }, + ) + process = parse_remote_process_definition(url) + assert process.id == "ndvi" + assert process.parameters == [ + Parameter(name="incr", description="Increment", schema=Schema({"type": "number"})), + Parameter(name="scales", description="Scales", default=[1, 1], schema=Schema({"type": "number"})), + ] + assert process.returns is None + assert process.description is None + assert process.summary is None + + +def test_parse_remote_process_definition_listing(requests_mock): + url = "https://example.com/processes.json" + requests_mock.get( + url, + json={ + "processes": [ + { + "id": "ndvi", + "parameters": [{"name": "incr", "description": "Incr", "schema": {"type": "number"}}], + }, + { + "id": "scale", + "parameters": [ + {"name": "factor", "description": "Factor", "default": 1, "schema": {"type": "number"}} + ], + }, + ], + "links": [], + }, + ) + + # No process id given + with pytest.raises(ValueError, match="Working with process listing, but got invalid process id None"): + parse_remote_process_definition(url) + + # Process id not found + with pytest.raises(LookupError, match="Process 'mehblargh' not found in process listing"): + parse_remote_process_definition(url, process_id="mehblargh") + + # Valid proces id + process = parse_remote_process_definition(url, process_id="ndvi") + assert process.id == "ndvi" + assert process.parameters == [ + Parameter(name="incr", description="Incr", schema=Schema({"type": "number"})), + ] + assert process.returns is None + assert process.description is None + assert process.summary is None + + # Another proces id + process = parse_remote_process_definition(url, process_id="scale") + assert process.id == "scale" + assert process.parameters == [ + Parameter(name="factor", description="Factor", default=1, schema=Schema({"type": "number"})), + ] + assert process.returns is None + assert process.description is None + assert process.summary is None + + +def test_parse_remote_process_definition_inconsistency(requests_mock): + url = "https://example.com/ndvi.json" + requests_mock.get(url, json={"id": "nnddvvii"}) + with pytest.raises(LookupError, match="Expected process id 'ndvi', but found 'nnddvvii'"): + _ = parse_remote_process_definition(url, process_id="ndvi") diff --git a/tests/rest/test_testing.py b/tests/rest/test_testing.py new file mode 100644 index 000000000..8feb63aa0 --- /dev/null +++ b/tests/rest/test_testing.py @@ -0,0 +1,64 @@ +import pytest + +from openeo.rest._testing import DummyBackend + + +@pytest.fixture +def dummy_backend(requests_mock, con120): + return DummyBackend(requests_mock=requests_mock, connection=con120) + + +DUMMY_PG_ADD35 = { + "add35": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}, +} + + +class TestDummyBackend: + def test_create_job(self, dummy_backend, con120): + assert dummy_backend.batch_jobs == {} + _ = con120.create_job(DUMMY_PG_ADD35) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": {"add35": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}}, + "status": "created", + } + } + + def test_start_job(self, dummy_backend, con120): + job = con120.create_job(DUMMY_PG_ADD35) + assert dummy_backend.batch_jobs == { + "job-000": {"job_id": "job-000", "pg": DUMMY_PG_ADD35, "status": "created"}, + } + job.start() + assert dummy_backend.batch_jobs == { + "job-000": {"job_id": "job-000", "pg": DUMMY_PG_ADD35, "status": "finished"}, + } + + def test_job_status_updater_error(self, dummy_backend, con120): + dummy_backend.job_status_updater = lambda job_id, current_status: "error" + + job = con120.create_job(DUMMY_PG_ADD35) + assert dummy_backend.batch_jobs["job-000"]["status"] == "created" + job.start() + assert dummy_backend.batch_jobs["job-000"]["status"] == "error" + + @pytest.mark.parametrize("final", ["finished", "error"]) + def test_setup_simple_job_status_flow(self, dummy_backend, con120, final): + dummy_backend.setup_simple_job_status_flow(queued=2, running=3, final=final) + job = con120.create_job(DUMMY_PG_ADD35) + assert dummy_backend.batch_jobs["job-000"]["status"] == "created" + + # Note that first status update (to queued here) is triggered from `start()`, not `status()` like below + job.start() + assert dummy_backend.batch_jobs["job-000"]["status"] == "queued" + + # Now go through rest of status flow, through `status()` calls + assert job.status() == "queued" + assert job.status() == "running" + assert job.status() == "running" + assert job.status() == "running" + assert job.status() == final + assert job.status() == final + assert job.status() == final + assert job.status() == final