Skip to content

Commit

Permalink
Merge branch 'issue115-crossbackend'
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Sep 11, 2023
2 parents 461f144 + f500b33 commit 47b8ef4
Show file tree
Hide file tree
Showing 14 changed files with 930 additions and 117 deletions.
17 changes: 16 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,22 @@ All notable changes to this project will be documented in this file.

The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [Current: 0.8.x]

## [Current: 0.9.x]


### Added

- Initial aggregator-level implementation of cross-backend processing
([#115](https://github.com/Open-EO/openeo-aggregator/issues/115))


### Changed

### Fixed


## [0.8.x]

### Added

Expand Down
1 change: 0 additions & 1 deletion scripts/crossbackend-processing-poc.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def main():
with TimingLogger(title=f"Connecting to {backend_url}", logger=_log):
connection = openeo.connect(url=backend_url).authenticate_oidc()

@functools.lru_cache(maxsize=100)
def backend_for_collection(collection_id) -> str:
metadata = connection.describe_collection(collection_id)
return metadata["summaries"][STAC_PROPERTY_FEDERATION_BACKENDS][0]
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"requests",
"attrs",
"openeo>=0.17.0",
"openeo_driver>=0.57.1.dev",
"openeo_driver>=0.65.0.dev",
"flask~=2.0",
"gunicorn~=20.0",
"python-json-logger>=2.0.0",
Expand Down
2 changes: 1 addition & 1 deletion src/openeo_aggregator/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.8.5a1"
__version__ = "0.9.0a1"
102 changes: 85 additions & 17 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,17 @@
)
from openeo_aggregator.metadata.reporter import LoggerReporter
from openeo_aggregator.partitionedjobs import PartitionedJob
from openeo_aggregator.partitionedjobs.crossbackend import (
CrossBackendSplitter,
SubGraphId,
)
from openeo_aggregator.partitionedjobs.splitting import FlimsySplitter, TileGridSplitter
from openeo_aggregator.partitionedjobs.tracking import (
PartitionedJobConnection,
PartitionedJobTracker,
)
from openeo_aggregator.utils import (
Clock,
FlatPG,
PGWithMetadata,
dict_merge,
Expand All @@ -111,6 +116,7 @@ def set_backends_for_collection(self, cid: str, backends: Iterable[str]):
self._data[cid]["backends"] = list(backends)

def get_backends_for_collection(self, cid: str) -> List[str]:
"""Get backend ids that provide given collection id."""
if cid not in self._data:
raise CollectionNotFoundException(collection_id=cid)
return self._data[cid]["backends"]
Expand Down Expand Up @@ -205,6 +211,11 @@ def evaluate(backend_id, pg):

return [functools.partial(evaluate, pg=pg) for pg in process_graphs]

def get_backends_for_collection(self, cid: str) -> List[str]:
"""Get backend ids that provide given collection id."""
metadata, internal = self._get_all_metadata_cached()
return internal.get_backends_for_collection(cid=cid)

def get_backend_candidates_for_collections(self, collections: Iterable[str]) -> List[str]:
"""
Get backend ids providing all given collections
Expand Down Expand Up @@ -568,13 +579,16 @@ def _process_load_ml_model(
class AggregatorBatchJobs(BatchJobs):

def __init__(
self,
backends: MultiBackendConnection,
processing: AggregatorProcessing,
partitioned_job_tracker: Optional[PartitionedJobTracker] = None,
self,
*,
backends: MultiBackendConnection,
catalog: AggregatorCollectionCatalog,
processing: AggregatorProcessing,
partitioned_job_tracker: Optional[PartitionedJobTracker] = None,
):
super(AggregatorBatchJobs, self).__init__()
self.backends = backends
self._catalog = catalog
self.processing = processing
self.partitioned_job_tracker = partitioned_job_tracker

Expand Down Expand Up @@ -611,18 +625,29 @@ def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]:
})

def create_job(
self, user_id: str, process: dict, api_version: str,
metadata: dict, job_options: dict = None
self,
user_id: str,
process: dict,
api_version: str,
metadata: dict,
job_options: Optional[dict] = None,
) -> BatchJobMetadata:
if "process_graph" not in process:
raise ProcessGraphMissingException()

# TODO: better, more generic/specific job_option(s)?
if job_options and (
job_options.get(JOB_OPTION_SPLIT_STRATEGY)
or job_options.get(JOB_OPTION_TILE_GRID)
):
return self._create_partitioned_job(
if job_options and (job_options.get(JOB_OPTION_SPLIT_STRATEGY) or job_options.get(JOB_OPTION_TILE_GRID)):
if job_options.get(JOB_OPTION_SPLIT_STRATEGY) == "crossbackend":
# TODO this is temporary feature flag to trigger "crossbackend" splitting
return self._create_crossbackend_job(
user_id=user_id,
process=process,
api_version=api_version,
metadata=metadata,
job_options=job_options,
)
else:
return self._create_partitioned_job(
user_id=user_id,
process=process,
api_version=api_version,
Expand Down Expand Up @@ -681,8 +706,9 @@ def _create_job_standard(
raise OpenEOApiException(f"Failed to create job on backend {backend_id!r}: {e!r}")
return BatchJobMetadata(
id=JobIdMapping.get_aggregator_job_id(backend_job_id=job.job_id, backend_id=backend_id),
# Note: required, but unused metadata
status="dummy", created="dummy", process={"dummy": "dummy"}
# Note: additional required, but unused metadata
status="dummy",
created="dummy",
)

def _create_partitioned_job(
Expand Down Expand Up @@ -710,11 +736,52 @@ def _create_partitioned_job(
raise ValueError("Could not determine splitting strategy from job options")
pjob: PartitionedJob = splitter.split(process=process, metadata=metadata, job_options=job_options)

job_id = self.partitioned_job_tracker.create(user_id=user_id, pjob=pjob, flask_request=flask.request)
pjob_id = self.partitioned_job_tracker.create(user_id=user_id, pjob=pjob, flask_request=flask.request)

return BatchJobMetadata(
id=JobIdMapping.get_aggregator_job_id(backend_job_id=pjob_id, backend_id=JobIdMapping.AGG),
# Note: additional required, but unused metadata
status="dummy",
created="dummy",
)

def _create_crossbackend_job(
self,
user_id: str,
process: PGWithMetadata,
api_version: str,
metadata: dict,
job_options: Optional[dict] = None,
) -> BatchJobMetadata:
"""
Advanced/handled batch job creation:
- split original job in (possibly) multiple sub-jobs,
e.g. split the process graph based on `load_collection` availability
- distribute sub-jobs across (possibly) multiple back-ends
- keep track of them through a "parent job" in a `PartitionedJobTracker`.
"""
if not self.partitioned_job_tracker:
raise FeatureUnsupportedException(message="Partitioned job tracking is not supported")

def backend_for_collection(collection_id) -> str:
return self._catalog.get_backends_for_collection(cid=collection_id)[0]

splitter = CrossBackendSplitter(
backend_for_collection=backend_for_collection,
# TODO: job option for `always_split` feature?
always_split=True,
)

pjob_id = self.partitioned_job_tracker.create_crossbackend_pjob(
user_id=user_id, process=process, metadata=metadata, job_options=job_options, splitter=splitter
)

return BatchJobMetadata(
id=JobIdMapping.get_aggregator_job_id(backend_job_id=job_id, backend_id=JobIdMapping.AGG),
status="dummy", created="dummy", process={"dummy": "dummy"}
id=JobIdMapping.get_aggregator_job_id(backend_job_id=pjob_id, backend_id=JobIdMapping.AGG),
# Note: additional required, but unused metadata
status="dummy",
created="dummy",
)

def _get_connection_and_backend_job_id(
Expand Down Expand Up @@ -1127,8 +1194,9 @@ def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig):

batch_jobs = AggregatorBatchJobs(
backends=backends,
catalog=catalog,
processing=processing,
partitioned_job_tracker=partitioned_job_tracker
partitioned_job_tracker=partitioned_job_tracker,
)

secondary_services = AggregatorSecondaryServices(backends=backends, processing=processing, config=config)
Expand Down
1 change: 1 addition & 0 deletions src/openeo_aggregator/partitionedjobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def to_subjobs_dict(
"""Helper to convert a collection of SubJobs to a dictionary"""
# TODO: hide this logic in a setter or __init__ (e.g. when outgrowing the constraints of typing.NamedTuple)
if isinstance(subjobs, Sequence):
# TODO: eliminate this `Sequence` code path, and just always work with dict?
return {f"{i:04d}": j for i, j in enumerate(subjobs)}
elif isinstance(subjobs, dict):
return {str(k): v for k, v in subjobs.items()}
Expand Down
Loading

0 comments on commit 47b8ef4

Please sign in to comment.