diff --git a/CHANGELOG.md b/CHANGELOG.md index f8d01fc57..e91aaabfc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,8 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `MultiBackendJobManager`: Added `initialize_from_df(df)` (to `CsvJobDatabase` and `ParquetJobDatabase`) to initialize (and persist) the job database from a given DataFrame. Also added `create_job_db()` factory to easily create a job database from a given dataframe and its type guessed from filename extension. ([#635](https://github.com/Open-EO/openeo-python-client/issues/635)) - - +- `MultiBackendJobManager.run_jobs()` now returns a dictionary with counters/stats about various events during the job run ([#645](https://github.com/Open-EO/openeo-python-client/issues/645)) ### Changed diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 7dd43bfed..bc1b9ff8e 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -1,4 +1,5 @@ import abc +import collections import contextlib import datetime import functools @@ -380,7 +381,7 @@ def run_jobs( start_job: Callable[[], BatchJob] = _start_job_default, job_db: Union[str, Path, JobDatabaseInterface, None] = None, **kwargs, - ): + ) -> dict: """Runs jobs, specified in a dataframe, and tracks parameters. :param df: @@ -422,6 +423,10 @@ def run_jobs( Support for Parquet files depends on the ``pyarrow`` package as :ref:`optional dependency `. + :return: dictionary with stats collected during the job running loop. + Note that the set of fields in this dictionary is experimental + and subject to change + .. versionchanged:: 0.31.0 Added support for persisting the job metadata in Parquet format. @@ -430,6 +435,9 @@ def run_jobs( which can be a path to a CSV or Parquet file, or a user-defined :py:class:`JobDatabaseInterface` object. The deprecated ``output_file`` argument is still supported for now. + + .. versionchanged:: 0.33.0 + return a stats dictionary """ # TODO: Defining start_jobs as a Protocol might make its usage more clear, and avoid complicated doctrings, # but Protocols are only supported in Python 3.8 and higher. @@ -457,23 +465,35 @@ def run_jobs( # TODO: start showing deprecation warnings for this usage pattern? job_db.initialize_from_df(df) + stats = collections.defaultdict(int) while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0: - self._job_update_loop(job_db=job_db, start_job=start_job) + self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats) + stats["run_jobs loop"] += 1 + time.sleep(self.poll_sleep) + stats["sleep"] += 1 - def _job_update_loop(self, job_db: JobDatabaseInterface, start_job: Callable[[], BatchJob]): + return stats + + def _job_update_loop( + self, job_db: JobDatabaseInterface, start_job: Callable[[], BatchJob], stats: Optional[dict] = None + ): """ Inner loop logic of job management: go through the necessary jobs to check for status updates, trigger status events, start new jobs when there is room for them, etc. """ + stats = stats if stats is not None else collections.defaultdict(int) + with ignore_connection_errors(context="get statuses"): - self._track_statuses(job_db) + self._track_statuses(job_db, stats=stats) + stats["track_statuses"] += 1 not_started = job_db.get_by_status(statuses=["not_started"], max=200) if len(not_started) > 0: # Check number of jobs running at each backend running = job_db.get_by_status(statuses=["created", "queued", "running"]) + stats["job_db get_by_status"] += 1 per_backend = running.groupby("backend_name").size().to_dict() _log.info(f"Running per backend: {per_backend}") for backend_name in self.backends: @@ -482,10 +502,13 @@ def _job_update_loop(self, job_db: JobDatabaseInterface, start_job: Callable[[], to_add = self.backends[backend_name].parallel_jobs - backend_load to_launch = not_started.iloc[0:to_add] for i in to_launch.index: - self._launch_job(start_job, not_started, i, backend_name) + self._launch_job(start_job, df=not_started, i=i, backend_name=backend_name, stats=stats) + stats["job launch"] += 1 + job_db.persist(to_launch) + stats["job_db persist"] += 1 - def _launch_job(self, start_job, df, i, backend_name): + def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = None): """Helper method for launching jobs :param start_job: @@ -508,6 +531,7 @@ def _launch_job(self, start_job, df, i, backend_name): :param backend_name: name of the backend that will execute the job. """ + stats = stats if stats is not None else collections.defaultdict(int) df.loc[i, "backend_name"] = backend_name row = df.loc[i] @@ -515,6 +539,7 @@ def _launch_job(self, start_job, df, i, backend_name): _log.info(f"Starting job on backend {backend_name} for {row.to_dict()}") connection = self._get_connection(backend_name, resilient=True) + stats["start_job call"] += 1 job = start_job( row=row, connection_provider=self._get_connection, @@ -524,23 +549,30 @@ def _launch_job(self, start_job, df, i, backend_name): except requests.exceptions.ConnectionError as e: _log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True) df.loc[i, "status"] = "start_failed" + stats["start_job error"] += 1 else: df.loc[i, "start_time"] = rfc3339.utcnow() if job: df.loc[i, "id"] = job.job_id with ignore_connection_errors(context="get status"): status = job.status() + stats["job get status"] += 1 df.loc[i, "status"] = status if status == "created": # start job if not yet done by callback try: job.start() + stats["job start"] += 1 df.loc[i, "status"] = job.status() + stats["job get status"] += 1 except OpenEoApiError as e: _log.error(e) df.loc[i, "status"] = "start_failed" + stats["job start error"] += 1 else: + # TODO: what is this "skipping" about actually? df.loc[i, "status"] = "skipped" + stats["start_job skipped"] += 1 def on_job_done(self, job: BatchJob, row): """ @@ -623,11 +655,13 @@ def ensure_job_dir_exists(self, job_id: str) -> Path: if not job_dir.exists(): job_dir.mkdir(parents=True) - def _track_statuses(self, job_db: JobDatabaseInterface): + def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = None): """ Tracks status (and stats) of running jobs (in place). Optionally cancels jobs when running too long. """ + stats = stats if stats is not None else collections.defaultdict(int) + active = job_db.get_by_status(statuses=["created", "queued", "running"]) for i in active.index: job_id = active.loc[i, "id"] @@ -638,6 +672,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface): con = self._get_connection(backend_name) the_job = con.job(job_id) job_metadata = the_job.describe() + stats["job describe"] += 1 new_status = job_metadata["status"] _log.info( @@ -645,15 +680,19 @@ def _track_statuses(self, job_db: JobDatabaseInterface): ) if new_status == "finished": + stats["job finished"] += 1 self.on_job_done(the_job, active.loc[i]) if previous_status != "error" and new_status == "error": + stats["job failed"] += 1 self.on_job_error(the_job, active.loc[i]) if previous_status in {"created", "queued"} and new_status == "running": + stats["job started running"] += 1 active.loc[i, "running_start_time"] = rfc3339.utcnow() if new_status == "canceled": + stats["job canceled"] += 1 self.on_job_cancel(the_job, active.loc[i]) if self._cancel_running_job_after and new_status == "running": @@ -667,10 +706,14 @@ def _track_statuses(self, job_db: JobDatabaseInterface): active.loc[i, key] = _format_usage_stat(job_metadata, key) except OpenEoApiError as e: + stats["job tracking error"] += 1 print(f"error for job {job_id!r} on backend {backend_name}") print(e) + + stats["job_db persist"] += 1 job_db.persist(active) + def _format_usage_stat(job_metadata: dict, field: str) -> str: value = deep_get(job_metadata, "usage", field, "value", default=0) unit = deep_get(job_metadata, "usage", field, "unit", default="") @@ -977,6 +1020,3 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: def __call__(self, *arg, **kwargs) -> BatchJob: """Syntactic sugar for calling `start_job` directly.""" return self.start_job(*arg, **kwargs) - - - diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index de751c218..086faf3a3 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -29,9 +29,9 @@ CsvJobDatabase, MultiBackendJobManager, ParquetJobDatabase, + UDPJobFactory, create_job_db, get_job_db, - UDPJobFactory, ) from openeo.rest._testing import OPENEO_BACKEND, DummyBackend, build_capabilities from openeo.util import rfc3339 @@ -113,8 +113,17 @@ def start_job(row, connection, **kwargs): year = int(row["year"]) return BatchJob(job_id=f"job-{year}", connection=connection) - manager.run_jobs(df=df, start_job=start_job, output_file=output_file) - assert sleep_mock.call_count > 10 + run_stats = manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + assert run_stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=10), + "start_job call": 7, # TODO? + "job started running": 5, + "job finished": 5, + "job_db persist": dirty_equals.IsInt(gt=5), + "run_jobs loop": dirty_equals.IsInt(gt=5), + } + ) result = pd.read_csv(output_file) assert len(result) == 5 @@ -148,8 +157,17 @@ def start_job(row, connection, **kwargs): job_db = CsvJobDatabase(output_file).initialize_from_df(df) - manager.run_jobs(job_db=job_db, start_job=start_job) - assert sleep_mock.call_count > 10 + run_stats = manager.run_jobs(job_db=job_db, start_job=start_job) + assert run_stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=10), + "start_job call": 7, # TODO? + "job started running": 5, + "job finished": 5, + "job_db persist": dirty_equals.IsInt(gt=5), + "run_jobs loop": dirty_equals.IsInt(gt=5), + } + ) result = pd.read_csv(output_file) assert len(result) == 5 @@ -176,8 +194,14 @@ def start_job(row, connection, **kwargs): output_file = tmp_path / "jobs.db" job_db = db_class(output_file).initialize_from_df(df) - manager.run_jobs(job_db=job_db, start_job=start_job) - assert sleep_mock.call_count > 10 + run_stats = manager.run_jobs(job_db=job_db, start_job=start_job) + assert run_stats == dirty_equals.IsPartialDict( + { + "start_job call": 7, # TODO? + "job finished": 5, + "job_db persist": dirty_equals.IsInt(gt=5), + } + ) result = job_db.read() assert len(result) == 5 @@ -205,8 +229,14 @@ def start_job(row, connection, **kwargs): output_file = tmp_path / filename job_db = create_job_db(path=output_file, df=df) - manager.run_jobs(job_db=job_db, start_job=start_job) - assert sleep_mock.call_count > 10 + run_stats = manager.run_jobs(job_db=job_db, start_job=start_job) + assert run_stats == dirty_equals.IsPartialDict( + { + "start_job call": 7, # TODO? + "job finished": 5, + "job_db persist": dirty_equals.IsInt(gt=5), + } + ) result = job_db.read() assert len(result) == 5 @@ -235,6 +265,7 @@ def start_job(row, connection, **kwargs): # Trigger context switch to job thread sleep(1) manager.stop_job_thread() + # TODO #645 how to collect stats with the threaded run_job? assert sleep_mock.call_count > 10 result = pd.read_csv(output_file) @@ -543,8 +574,12 @@ def start_job(row, connection_provider, connection, **kwargs): output_file = tmp_path / "jobs.csv" - manager.run_jobs(df=df, start_job=start_job, output_file=output_file) - assert sleep_mock.call_count > 3 + run_stats = manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + assert run_stats == dirty_equals.IsPartialDict( + { + "start_job call": 1, + } + ) # Sanity check: the job succeeded result = pd.read_csv(output_file) @@ -615,6 +650,7 @@ def start_job(row, connection_provider, connection, **kwargs): with pytest.raises(requests.exceptions.RetryError) as exc: manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + # TODO #645 how to still check stats when run_jobs raised exception? assert sleep_mock.call_count > 3 # Sanity check: the job has status "error" @@ -1103,8 +1139,14 @@ def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job # TODO #636 avoid this cumbersome pattern using private _normalize_df API job_db.persist(job_manager._normalize_df(df)) - job_manager.run_jobs(job_db=job_db, start_job=job_starter) - assert sleep_mock.call_count > 0 + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=1), + "start_job call": 3, + "job start": 3, + } + ) result = job_db.read() assert set(result.status) == {"finished"}