Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial PoC implementation of UDPJobFactory #644

Merged
merged 22 commits into from
Oct 16, 2024

Conversation

soxofaan
Copy link
Member

for issue #604

self-review PR

tests/extra/test_job_management.py Show resolved Hide resolved
tests/extra/test_job_management.py Show resolved Hide resolved
openeo/extra/job_management.py Outdated Show resolved Hide resolved
openeo/extra/job_management.py Outdated Show resolved Hide resolved
openeo/extra/job_management.py Outdated Show resolved Hide resolved
soxofaan added a commit that referenced this pull request Oct 11, 2024
…alone utility

Finetune openeo.internal.processes.parse to properly support this
(e.g. leverage named tuples for immutability, less boilerplate and out of the box equality checks)
soxofaan added a commit that referenced this pull request Oct 11, 2024
improves long term traceability and observability
…alone utility

Finetune openeo.internal.processes.parse to properly support this
(e.g. leverage named tuples for immutability, less boilerplate and out of the box equality checks)
improves long term traceability and observability
@soxofaan soxofaan force-pushed the issue604-udp-based-job-manager branch from 31d0c76 to ff8b553 Compare October 11, 2024 12:46
@soxofaan soxofaan marked this pull request as ready for review October 11, 2024 21:23
CHANGELOG.md Outdated Show resolved Hide resolved
openeo/extra/job_management.py Show resolved Hide resolved
openeo/extra/job_management.py Show resolved Hide resolved
openeo/extra/job_management.py Outdated Show resolved Hide resolved
openeo/extra/job_management.py Outdated Show resolved Hide resolved
@soxofaan soxofaan mentioned this pull request Oct 11, 2024
soxofaan added a commit that referenced this pull request Oct 14, 2024
@soxofaan soxofaan force-pushed the issue604-udp-based-job-manager branch from 0f7a9bc to 130db87 Compare October 14, 2024 15:02
@soxofaan
Copy link
Member Author

Ok I think this PR is ready for final review and/or merging.

the diff is quite large, but that is mainly because of testing and testing utility improvements.

The core of this PR is the new UDPJobFactory class at

class UDPJobFactory:
"""
Batch job factory based on a parameterized process definition
(e.g a user-defined process (UDP) or a remote process definition),
to be used together with :py:class:`MultiBackendJobManager`.
Usage example with a remote process definition:
.. code-block:: python
from openeo.extra.job_management import (
MultiBackendJobManager,
create_job_db,
UDPJobFactory,
)
# Job creator, based on a parameterized openEO process definition
job_starter = UDPJobFactory(
namespace="https://example.com/my_process.json",
)
# Initialize job database from dataframe, with parameters to use.
df = pd.DataFrame(...)
job_db = create_job_db("jobs.csv").initialize_from_df(df)
# Create and run job manager
job_manager = MultiBackendJobManager(...)
job_manager.run_jobs(job_db=job_db, start_job=job_starter)
The factory will take care of filling in the process parameters
based on matching column names in dataframe from the job database,
with some additional override/fallback options:
- When provided, ``parameter_column_map`` will be consulted
for resolving a parameter name (key) to a desired column name (value).
- One common case is handled automatically as convenience functionality.
When:
- ``parameter_column_map`` is not provided (or set to ``None``),
- and there is a *single parameter* that accepts inline GeoJSON geometries,
- and the dataframe is a GeoPandas dataframe with a *single geometry* column,
then this parameter and this geometries column will be linked automatically.
- If a parameter can not be matched with a column by name as described above,
a default value will be picked,
first by looking in ``parameter_defaults`` (if provided),
and then by looking up the default value from the parameter schema in the process definition.
- Finally if no (default) value can be determined and the parameter
is not flagged as optional, an error will be raised.
:param process_id: (optional) openEO process identifier.
Can be omitted when working with a remote process definition
given as URL in the ``namespace`` parameter.
:param namespace: (optional) openEO process namespace.
Typically used to provide a URL to a remote process definition.
:param parameter_defaults: Default values for process parameters,
to be used when not provided from the dataframe row in
:py:meth:`MultiBackendJobManager.run_jobs`.
:param parameter_column_map: Optional overrides
for linking parameters to dataframe columns:
mapping of process parameter names as key
to dataframe column names as value.
.. versionadded:: 0.33.0
"""
# TODO: find a better class name (e.g. eliminate over-specificity of "UDP",
# or avoid "factory" as technical mumbo-jumbo)?
def __init__(
self,
*,
process_id: Optional[str] = None,
namespace: Union[str, None] = None,
parameter_defaults: Optional[dict] = None,
parameter_column_map: Optional[dict] = None,
):
if process_id is None and namespace is None:
raise ValueError("At least one of `process_id` and `namespace` should be provided.")
self._process_id = process_id
self._namespace = namespace
self._parameter_defaults = parameter_defaults or {}
self._parameter_column_map = parameter_column_map
self._cache = LazyLoadCache()
def _get_process_definition(self, connection: Connection) -> Process:
if isinstance(self._namespace, str) and re.match("https?://", self._namespace):
# Remote process definition handling
return self._cache.get(
key=("remote_process_definition", self._namespace, self._process_id),
load=lambda: parse_remote_process_definition(namespace=self._namespace, process_id=self._process_id),
)
elif self._namespace is None:
# Handling of a user-specific UDP
udp_raw = connection.user_defined_process(self._process_id).describe()
return Process.from_dict(udp_raw)
else:
raise NotImplementedError(
f"Unsupported process definition source udp_id={self._process_id!r} namespace={self._namespace!r}"
)
def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:
"""
Implementation of the ``start_job`` callable interface
of :py:meth:`MultiBackendJobManager.run_jobs`
to create a job based on given dataframe row
:param row: The row in the pandas dataframe that stores the jobs state and other tracked data.
:param connection: The connection to the backend.
"""
# TODO: refactor out some methods, for better reuse and decoupling:
# `get_arguments()` (to build the arguments dictionary), `get_cube()` (to create the cube),
process_definition = self._get_process_definition(connection=connection)
process_id = process_definition.id
parameters = process_definition.parameters or []
if self._parameter_column_map is None:
self._parameter_column_map = self._guess_parameter_column_map(parameters=parameters, row=row)
arguments = {}
for parameter in parameters:
param_name = parameter.name
column_name = self._parameter_column_map.get(param_name, param_name)
if column_name in row.index:
# Get value from dataframe row
value = row.loc[column_name]
elif param_name in self._parameter_defaults:
# Fallback on default values from constructor
value = self._parameter_defaults[param_name]
elif parameter.has_default():
# Explicitly use default value from parameter schema
value = parameter.default
elif parameter.optional:
# Skip optional parameters without any fallback default value
continue
else:
raise ValueError(f"Missing required parameter {param_name !r} for process {process_id!r}")
# Prepare some values/dtypes for JSON encoding
if isinstance(value, numpy.integer):
value = int(value)
elif isinstance(value, numpy.number):
value = float(value)
elif isinstance(value, shapely.geometry.base.BaseGeometry):
value = shapely.geometry.mapping(value)
arguments[param_name] = value
cube = connection.datacube_from_process(process_id=process_id, namespace=self._namespace, **arguments)
title = row.get("title", f"Process {process_id!r} with {repr_truncate(arguments)}")
description = row.get("description", f"Process {process_id!r} (namespace {self._namespace}) with {arguments}")
job = connection.create_job(cube, title=title, description=description)
return job
def __call__(self, *arg, **kwargs) -> BatchJob:
"""Syntactic sugar for calling :py:meth:`start_job`."""
return self.start_job(*arg, **kwargs)
@staticmethod
def _guess_parameter_column_map(parameters: List[Parameter], row: pd.Series) -> dict:
"""
Guess parameter-column mapping from given parameter list and dataframe row
"""
parameter_column_map = {}
# Geometry based mapping: try to automatically map geometry columns to geojson parameters
geojson_parameters = [p.name for p in parameters if p.schema.accepts_geojson()]
geometry_columns = [i for (i, v) in row.items() if isinstance(v, shapely.geometry.base.BaseGeometry)]
if geojson_parameters and geometry_columns:
if len(geojson_parameters) == 1 and len(geometry_columns) == 1:
# Most common case: one geometry parameter and one geometry column: can be mapped naively
parameter_column_map[geojson_parameters[0]] = geometry_columns[0]
elif all(p in geometry_columns for p in geojson_parameters):
# Each geometry param has geometry column with same name: easy to map
parameter_column_map.update((p, p) for p in geojson_parameters)
else:
raise RuntimeError(
f"Problem with mapping geometry columns ({geometry_columns}) to process parameters ({geojson_parameters})"
)
_log.debug(f"Guessed parameter-column map: {parameter_column_map}")
return parameter_column_map

user-facing usage (as documented):

        from openeo.extra.job_management import (
            MultiBackendJobManager,
            create_job_db,
            UDPJobFactory,
        )

        # Job creator, based on a parameterized openEO process definition
        job_starter = UDPJobFactory(
            namespace="https://example.com/my_process.json",
        )

        # Initialize job database from dataframe, with parameters to use.
        df = pd.DataFrame(...)
        job_db = create_job_db("jobs.csv").initialize_from_df(df)

        # Create and run job manager
        job_manager = MultiBackendJobManager(...)
        job_manager.run_jobs(job_db=job_db, start_job=job_starter)

@soxofaan
Copy link
Member Author

soxofaan commented Oct 14, 2024

Also note that this is inspired by PR ESA-APEx/esa-apex-toolbox-python#1 as suggested in #604

That PR ESA-APEx/esa-apex-toolbox-python#1 was however outdated due to recent job manager changes and not really mergeable as-is.
I also explicitly choose an approach that avoids subclassing MultiBackendJobManager and found a way to implement it as decoupled component (a callable that can be passed as start_job to run_jobs), which should make it more reusable and more robust against future job manager changes

@soxofaan
Copy link
Member Author

soxofaan commented Oct 15, 2024

Just found out there was actually another PR about this:

interesting idea from that PR to add here:

  • add support for adding save_result

@soxofaan soxofaan requested a review from jdries October 15, 2024 17:23
Copy link
Collaborator

@jdries jdries left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code wise this looks quite fine, especially for an MVP.
For API testing, the opinions of users could be valuable, or maybe data engineering.
@HansVRP Feel free to provide feedback.
Otherwise, I suggest to merge it, so that we can work it into the apex demo, and then also gain some experience ourselves.

@HansVRP
Copy link
Contributor

HansVRP commented Oct 15, 2024

Will review this sprint. Any deadline I need to be aware of?

Copy link
Contributor

@HansVRP HansVRP left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some initial comments; still going through the remainder


class UDPJobFactory:
"""
Batch job factory based on a parameterized process definition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether UDP Job Factory is the clearest name. Without having read the function definition, I am not really sure what to expect. The definition does not necessarily clarify it either.

Does factory here mean that we create jobs based on UDPs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I'm also not very happy with the current name, but haven't found something significant better yet

some ideas

  • UDPJobCreator (but I'd actually prefer to avoid "UDP")
  • ParameterizedProcessBasedJobCreator is quite long, ProcessBasedJobCreator is doable
  • ProcessParameterFiller, ProcessParameterInjector, ProcessTemplate (are quite cryptic)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already decided to pick ProcessBasedJobCreator, which should already improve the situation.

but still open for better options

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that option sounds good to me.

Process in itself is also still a bit vague, but preferable over openeoprocessjobcreator

openeo/extra/job_management.py Outdated Show resolved Hide resolved
openeo/extra/job_management.py Show resolved Hide resolved
openeo/extra/job_management.py Outdated Show resolved Hide resolved
openeo/extra/job_management.py Show resolved Hide resolved
openeo/extra/job_management.py Outdated Show resolved Hide resolved
openeo/extra/job_management.py Show resolved Hide resolved
soxofaan added a commit that referenced this pull request Oct 16, 2024
@soxofaan
Copy link
Member Author

Thanks a lot for the review notes! I worked some more on the documentation part to clarify some things.

Regardless, I'd propose to already merge this, because it is a large PR in terms of lines of code, so it's easy to get conflicts with other PRs. Functionality-wise it just adds something new, which I made sure to flag clearly as experimental, to give us some wiggle room to clean up later

soxofaan added a commit that referenced this pull request Oct 16, 2024
soxofaan added a commit that referenced this pull request Oct 16, 2024
@soxofaan soxofaan force-pushed the issue604-udp-based-job-manager branch from 2030fac to 7bf73de Compare October 16, 2024 13:11
@soxofaan soxofaan merged commit dcc7119 into master Oct 16, 2024
15 checks passed
@soxofaan soxofaan deleted the issue604-udp-based-job-manager branch October 16, 2024 13:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants