Skip to content

Commit

Permalink
Merge branch 'issue635-job-manager-initialize'
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Oct 11, 2024
2 parents f965ddf + f7d3070 commit 40af9cd
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 28 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Added `DataCube.load_stac()` to also support creating a `load_stac` based cube without a connection ([#638](https://github.com/Open-EO/openeo-python-client/issues/638))
- `MultiBackendJobManager`: Added `initialize_from_df(df)` (to `CsvJobDatabase` and `ParquetJobDatabase`) to initialize (and persist) the job database from a given DataFrame.
Also added `create_job_db()` factory to easily create a job database from a given dataframe and its type guessed from filename extension.
([#635](https://github.com/Open-EO/openeo-python-client/issues/635))



### Changed

Expand Down
90 changes: 77 additions & 13 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,15 @@ def _make_resilient(connection):
connection.session.mount("https://", HTTPAdapter(max_retries=retries))
connection.session.mount("http://", HTTPAdapter(max_retries=retries))

def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
"""Ensure we have the required columns and the expected type for the geometry column.
@staticmethod
def _normalize_df(df: pd.DataFrame) -> pd.DataFrame:
"""
Normalize given pandas dataframe (creating a new one):
ensure we have the required columns.
:param df: The dataframe to normalize.
:return: a new dataframe that is normalized.
"""
# TODO: this was originally an internal helper, but we need a clean public API for the user

# check for some required columns.
required_with_default = [
("status", "not_started"),
Expand Down Expand Up @@ -440,13 +441,7 @@ def run_jobs(
assert not kwargs, f"Unexpected keyword arguments: {kwargs!r}"

if isinstance(job_db, (str, Path)):
job_db_path = Path(job_db)
if job_db_path.suffix.lower() == ".csv":
job_db = CsvJobDatabase(path=job_db_path)
elif job_db_path.suffix.lower() == ".parquet":
job_db = ParquetJobDatabase(path=job_db_path)
else:
raise ValueError(f"Unsupported job database file type {job_db_path!r}")
job_db = get_job_db(path=job_db)

if not isinstance(job_db, JobDatabaseInterface):
raise ValueError(f"Unsupported job_db {job_db!r}")
Expand All @@ -456,8 +451,7 @@ def run_jobs(
_log.info(f"Resuming `run_jobs` from existing {job_db}")
elif df is not None:
# TODO: start showing deprecation warnings for this usage pattern?
df = self._normalize_df(df)
job_db.persist(df)
job_db.initialize_from_df(df)

while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
self._job_update_loop(job_db=job_db, start_job=start_job)
Expand Down Expand Up @@ -697,6 +691,35 @@ def __init__(self):
super().__init__()
self._df = None

def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"):
"""
Initialize the job database from a given dataframe,
which will be first normalized to be compatible
with :py:class:`MultiBackendJobManager` usage.
:param df: dataframe with some columns your ``start_job`` callable expects
:param on_exists: what to do when the job database already exists (persisted on disk):
- "error": (default) raise an exception
- "skip": work with existing database, ignore given dataframe and skip any initialization
:return: initialized job database.
.. versionadded:: 0.33.0
"""
# TODO: option to provide custom MultiBackendJobManager subclass with custom normalize?
if self.exists():
if on_exists == "skip":
return self
elif on_exists == "error":
raise FileExistsError(f"Job database {self!r} already exists.")
else:
# TODO handle other on_exists modes: e.g. overwrite, merge, ...
raise ValueError(f"Invalid on_exists={on_exists!r}")
df = MultiBackendJobManager._normalize_df(df)
self.persist(df)
# Return self to allow chaining with constructor.
return self

@property
def df(self) -> pd.DataFrame:
if self._df is None:
Expand Down Expand Up @@ -822,3 +845,44 @@ def persist(self, df: pd.DataFrame):
self._merge_into_df(df)
self.path.parent.mkdir(parents=True, exist_ok=True)
self.df.to_parquet(self.path, index=False)


def get_job_db(path: Union[str, Path]) -> JobDatabaseInterface:
"""
Factory to get a job database at a given path,
guessing the database type from filename extension.
:param path: path to job database file.
.. versionadded:: 0.33.0
"""
path = Path(path)
if path.suffix.lower() in {".csv"}:
job_db = CsvJobDatabase(path=path)
elif path.suffix.lower() in {".parquet", ".geoparquet"}:
job_db = ParquetJobDatabase(path=path)
else:
raise ValueError(f"Could not guess job database type from {path!r}")
return job_db


def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = "error"):
"""
Factory to create a job database at given path,
initialized from a given dataframe,
and its database type guessed from filename extension.
:param path: Path to the job database file.
:param df: DataFrame to store in the job database.
:param on_exists: What to do when the job database already exists:
- "error": (default) raise an exception
- "skip": work with existing database, ignore given dataframe and skip any initialization
.. versionadded:: 0.33.0
"""
job_db = get_job_db(path)
if isinstance(job_db, FullDataFrameJobDatabase):
job_db.initialize_from_df(df=df, on_exists=on_exists)
else:
raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.")
return job_db
Loading

0 comments on commit 40af9cd

Please sign in to comment.