Skip to content

Commit

Permalink
Issue #645 introduce returning event stats from MultiBackendJobManage…
Browse files Browse the repository at this point in the history
…r.run_jobs
  • Loading branch information
soxofaan committed Oct 11, 2024
1 parent ff8b553 commit 64cafcf
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 25 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `MultiBackendJobManager`: Added `initialize_from_df(df)` (to `CsvJobDatabase` and `ParquetJobDatabase`) to initialize (and persist) the job database from a given DataFrame.
Also added `create_job_db()` factory to easily create a job database from a given dataframe and its type guessed from filename extension.
([#635](https://github.com/Open-EO/openeo-python-client/issues/635))


- `MultiBackendJobManager.run_jobs()` now returns a dictionary with counters/stats about various events during the job run ([#645](https://github.com/Open-EO/openeo-python-client/issues/645))

### Changed

Expand Down
60 changes: 50 additions & 10 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import collections
import contextlib
import datetime
import functools
Expand Down Expand Up @@ -380,7 +381,7 @@ def run_jobs(
start_job: Callable[[], BatchJob] = _start_job_default,
job_db: Union[str, Path, JobDatabaseInterface, None] = None,
**kwargs,
):
) -> dict:
"""Runs jobs, specified in a dataframe, and tracks parameters.
:param df:
Expand Down Expand Up @@ -422,6 +423,10 @@ def run_jobs(
Support for Parquet files depends on the ``pyarrow`` package
as :ref:`optional dependency <installation-optional-dependencies>`.
:return: dictionary with stats collected during the job running loop.
Note that the set of fields in this dictionary is experimental
and subject to change
.. versionchanged:: 0.31.0
Added support for persisting the job metadata in Parquet format.
Expand All @@ -430,6 +435,9 @@ def run_jobs(
which can be a path to a CSV or Parquet file,
or a user-defined :py:class:`JobDatabaseInterface` object.
The deprecated ``output_file`` argument is still supported for now.
.. versionchanged:: 0.33.0
return a stats dictionary
"""
# TODO: Defining start_jobs as a Protocol might make its usage more clear, and avoid complicated doctrings,
# but Protocols are only supported in Python 3.8 and higher.
Expand Down Expand Up @@ -457,23 +465,35 @@ def run_jobs(
# TODO: start showing deprecation warnings for this usage pattern?
job_db.initialize_from_df(df)

stats = collections.defaultdict(int)
while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
self._job_update_loop(job_db=job_db, start_job=start_job)
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
stats["run_jobs loop"] += 1

time.sleep(self.poll_sleep)
stats["sleep"] += 1

def _job_update_loop(self, job_db: JobDatabaseInterface, start_job: Callable[[], BatchJob]):
return stats

def _job_update_loop(
self, job_db: JobDatabaseInterface, start_job: Callable[[], BatchJob], stats: Optional[dict] = None
):
"""
Inner loop logic of job management:
go through the necessary jobs to check for status updates,
trigger status events, start new jobs when there is room for them, etc.
"""
stats = stats if stats is not None else collections.defaultdict(int)

with ignore_connection_errors(context="get statuses"):
self._track_statuses(job_db)
self._track_statuses(job_db, stats=stats)
stats["track_statuses"] += 1

not_started = job_db.get_by_status(statuses=["not_started"], max=200)
if len(not_started) > 0:
# Check number of jobs running at each backend
running = job_db.get_by_status(statuses=["created", "queued", "running"])
stats["job_db get_by_status"] += 1
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
for backend_name in self.backends:
Expand All @@ -482,10 +502,13 @@ def _job_update_loop(self, job_db: JobDatabaseInterface, start_job: Callable[[],
to_add = self.backends[backend_name].parallel_jobs - backend_load
to_launch = not_started.iloc[0:to_add]
for i in to_launch.index:
self._launch_job(start_job, not_started, i, backend_name)
self._launch_job(start_job, df=not_started, i=i, backend_name=backend_name, stats=stats)
stats["job launch"] += 1

job_db.persist(to_launch)
stats["job_db persist"] += 1

def _launch_job(self, start_job, df, i, backend_name):
def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = None):
"""Helper method for launching jobs
:param start_job:
Expand All @@ -508,13 +531,15 @@ def _launch_job(self, start_job, df, i, backend_name):
:param backend_name:
name of the backend that will execute the job.
"""
stats = stats if stats is not None else collections.defaultdict(int)

df.loc[i, "backend_name"] = backend_name
row = df.loc[i]
try:
_log.info(f"Starting job on backend {backend_name} for {row.to_dict()}")
connection = self._get_connection(backend_name, resilient=True)

stats["start_job call"] += 1
job = start_job(
row=row,
connection_provider=self._get_connection,
Expand All @@ -524,23 +549,30 @@ def _launch_job(self, start_job, df, i, backend_name):
except requests.exceptions.ConnectionError as e:
_log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True)
df.loc[i, "status"] = "start_failed"
stats["start_job error"] += 1
else:
df.loc[i, "start_time"] = rfc3339.utcnow()
if job:
df.loc[i, "id"] = job.job_id
with ignore_connection_errors(context="get status"):
status = job.status()
stats["job get status"] += 1
df.loc[i, "status"] = status
if status == "created":
# start job if not yet done by callback
try:
job.start()
stats["job start"] += 1
df.loc[i, "status"] = job.status()
stats["job get status"] += 1
except OpenEoApiError as e:
_log.error(e)
df.loc[i, "status"] = "start_failed"
stats["job start error"] += 1
else:
# TODO: what is this "skipping" about actually?
df.loc[i, "status"] = "skipped"
stats["start_job skipped"] += 1

def on_job_done(self, job: BatchJob, row):
"""
Expand Down Expand Up @@ -623,11 +655,13 @@ def ensure_job_dir_exists(self, job_id: str) -> Path:
if not job_dir.exists():
job_dir.mkdir(parents=True)

def _track_statuses(self, job_db: JobDatabaseInterface):
def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = None):
"""
Tracks status (and stats) of running jobs (in place).
Optionally cancels jobs when running too long.
"""
stats = stats if stats is not None else collections.defaultdict(int)

active = job_db.get_by_status(statuses=["created", "queued", "running"])
for i in active.index:
job_id = active.loc[i, "id"]
Expand All @@ -638,22 +672,27 @@ def _track_statuses(self, job_db: JobDatabaseInterface):
con = self._get_connection(backend_name)
the_job = con.job(job_id)
job_metadata = the_job.describe()
stats["job describe"] += 1
new_status = job_metadata["status"]

_log.info(
f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r} (previously {previous_status!r})"
)

if new_status == "finished":
stats["job finished"] += 1
self.on_job_done(the_job, active.loc[i])

if previous_status != "error" and new_status == "error":
stats["job failed"] += 1
self.on_job_error(the_job, active.loc[i])

if previous_status in {"created", "queued"} and new_status == "running":
stats["job started running"] += 1
active.loc[i, "running_start_time"] = rfc3339.utcnow()

if new_status == "canceled":
stats["job canceled"] += 1
self.on_job_cancel(the_job, active.loc[i])

if self._cancel_running_job_after and new_status == "running":
Expand All @@ -667,10 +706,14 @@ def _track_statuses(self, job_db: JobDatabaseInterface):
active.loc[i, key] = _format_usage_stat(job_metadata, key)

except OpenEoApiError as e:
stats["job tracking error"] += 1
print(f"error for job {job_id!r} on backend {backend_name}")
print(e)

stats["job_db persist"] += 1
job_db.persist(active)


def _format_usage_stat(job_metadata: dict, field: str) -> str:
value = deep_get(job_metadata, "usage", field, "value", default=0)
unit = deep_get(job_metadata, "usage", field, "unit", default="")
Expand Down Expand Up @@ -977,6 +1020,3 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:
def __call__(self, *arg, **kwargs) -> BatchJob:
"""Syntactic sugar for calling `start_job` directly."""
return self.start_job(*arg, **kwargs)



68 changes: 55 additions & 13 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
CsvJobDatabase,
MultiBackendJobManager,
ParquetJobDatabase,
UDPJobFactory,
create_job_db,
get_job_db,
UDPJobFactory,
)
from openeo.rest._testing import OPENEO_BACKEND, DummyBackend, build_capabilities
from openeo.util import rfc3339
Expand Down Expand Up @@ -113,8 +113,17 @@ def start_job(row, connection, **kwargs):
year = int(row["year"])
return BatchJob(job_id=f"job-{year}", connection=connection)

manager.run_jobs(df=df, start_job=start_job, output_file=output_file)
assert sleep_mock.call_count > 10
run_stats = manager.run_jobs(df=df, start_job=start_job, output_file=output_file)
assert run_stats == dirty_equals.IsPartialDict(
{
"sleep": dirty_equals.IsInt(gt=10),
"start_job call": 7, # TODO?
"job started running": 5,
"job finished": 5,
"job_db persist": dirty_equals.IsInt(gt=5),
"run_jobs loop": dirty_equals.IsInt(gt=5),
}
)

result = pd.read_csv(output_file)
assert len(result) == 5
Expand Down Expand Up @@ -148,8 +157,17 @@ def start_job(row, connection, **kwargs):

job_db = CsvJobDatabase(output_file).initialize_from_df(df)

manager.run_jobs(job_db=job_db, start_job=start_job)
assert sleep_mock.call_count > 10
run_stats = manager.run_jobs(job_db=job_db, start_job=start_job)
assert run_stats == dirty_equals.IsPartialDict(
{
"sleep": dirty_equals.IsInt(gt=10),
"start_job call": 7, # TODO?
"job started running": 5,
"job finished": 5,
"job_db persist": dirty_equals.IsInt(gt=5),
"run_jobs loop": dirty_equals.IsInt(gt=5),
}
)

result = pd.read_csv(output_file)
assert len(result) == 5
Expand All @@ -176,8 +194,14 @@ def start_job(row, connection, **kwargs):
output_file = tmp_path / "jobs.db"
job_db = db_class(output_file).initialize_from_df(df)

manager.run_jobs(job_db=job_db, start_job=start_job)
assert sleep_mock.call_count > 10
run_stats = manager.run_jobs(job_db=job_db, start_job=start_job)
assert run_stats == dirty_equals.IsPartialDict(
{
"start_job call": 7, # TODO?
"job finished": 5,
"job_db persist": dirty_equals.IsInt(gt=5),
}
)

result = job_db.read()
assert len(result) == 5
Expand Down Expand Up @@ -205,8 +229,14 @@ def start_job(row, connection, **kwargs):
output_file = tmp_path / filename
job_db = create_job_db(path=output_file, df=df)

manager.run_jobs(job_db=job_db, start_job=start_job)
assert sleep_mock.call_count > 10
run_stats = manager.run_jobs(job_db=job_db, start_job=start_job)
assert run_stats == dirty_equals.IsPartialDict(
{
"start_job call": 7, # TODO?
"job finished": 5,
"job_db persist": dirty_equals.IsInt(gt=5),
}
)

result = job_db.read()
assert len(result) == 5
Expand Down Expand Up @@ -235,6 +265,7 @@ def start_job(row, connection, **kwargs):
# Trigger context switch to job thread
sleep(1)
manager.stop_job_thread()
# TODO #645 how to collect stats with the threaded run_job?
assert sleep_mock.call_count > 10

result = pd.read_csv(output_file)
Expand Down Expand Up @@ -543,8 +574,12 @@ def start_job(row, connection_provider, connection, **kwargs):

output_file = tmp_path / "jobs.csv"

manager.run_jobs(df=df, start_job=start_job, output_file=output_file)
assert sleep_mock.call_count > 3
run_stats = manager.run_jobs(df=df, start_job=start_job, output_file=output_file)
assert run_stats == dirty_equals.IsPartialDict(
{
"start_job call": 1,
}
)

# Sanity check: the job succeeded
result = pd.read_csv(output_file)
Expand Down Expand Up @@ -615,6 +650,7 @@ def start_job(row, connection_provider, connection, **kwargs):
with pytest.raises(requests.exceptions.RetryError) as exc:
manager.run_jobs(df=df, start_job=start_job, output_file=output_file)

# TODO #645 how to still check stats when run_jobs raised exception?
assert sleep_mock.call_count > 3

# Sanity check: the job has status "error"
Expand Down Expand Up @@ -1103,8 +1139,14 @@ def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job
# TODO #636 avoid this cumbersome pattern using private _normalize_df API
job_db.persist(job_manager._normalize_df(df))

job_manager.run_jobs(job_db=job_db, start_job=job_starter)
assert sleep_mock.call_count > 0
stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter)
assert stats == dirty_equals.IsPartialDict(
{
"sleep": dirty_equals.IsInt(gt=1),
"start_job call": 3,
"job start": 3,
}
)

result = job_db.read()
assert set(result.status) == {"finished"}
Expand Down

0 comments on commit 64cafcf

Please sign in to comment.