Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to jobrunner v2.73.0, including pipeline #245

Merged
merged 1 commit into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions opensafely/_vendor/jobrunner/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Dict, List

from opensafely._vendor.pipeline.exceptions import ProjectValidationError
from opensafely._vendor.pipeline.models import Action
from opensafely._vendor.pipeline.outputs import get_output_dirs

from opensafely._vendor.jobrunner.lib.path_utils import ensure_unix_path
Expand All @@ -19,6 +20,7 @@ class ActionSpecification:
run: str
needs: List[str]
outputs: Dict[str, Dict[str, str]]
action: Action


def get_action_specification(config, action_id, using_dummy_data_backend=False):
Expand Down Expand Up @@ -80,6 +82,7 @@ def get_action_specification(config, action_id, using_dummy_data_backend=False):
run=run_command,
needs=action_spec.needs,
outputs=action_spec.outputs.dict(exclude_unset=True),
action=action_spec,
)


Expand Down
13 changes: 10 additions & 3 deletions opensafely/_vendor/jobrunner/cli/add_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import argparse
import dataclasses
import pprint
import sys
import textwrap
from pathlib import Path
from urllib.parse import urlparse
Expand Down Expand Up @@ -36,6 +37,7 @@ def main(
requested_actions=actions,
force_run_dependencies=force_run_dependencies,
cancelled_actions=[],
codelists_ok=True,
)
)
print("Submitting JobRequest:\n")
Expand All @@ -46,6 +48,8 @@ def main(
for job in jobs:
display_obj(job)

return job_request, jobs


def display_obj(obj):
if hasattr(obj, "asdict"):
Expand All @@ -57,7 +61,10 @@ def display_obj(obj):
print()


def run():
def run(argv=None):
if argv is None:
argv = sys.argv[1:]

configure_logging()
parser = argparse.ArgumentParser(description=__doc__.partition("\n\n")[0])
parser.add_argument("repo_url", help="URL (or local path) of git repository")
Expand All @@ -82,8 +89,8 @@ def run():
)
parser.add_argument("-f", "--force-run-dependencies", action="store_true")

args = parser.parse_args()
main(**vars(args))
args = parser.parse_args(argv)
return main(**vars(args))


if __name__ == "__main__":
Expand Down
12 changes: 11 additions & 1 deletion opensafely/_vendor/jobrunner/cli/kill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import argparse

from opensafely._vendor.jobrunner.executors import local
from opensafely._vendor.jobrunner.job_executor import JobResults
from opensafely._vendor.jobrunner.lib import database, docker
from opensafely._vendor.jobrunner.models import Job, State, StatusCode
from opensafely._vendor.jobrunner.run import job_to_job_definition, mark_job_as_failed
Expand Down Expand Up @@ -32,7 +33,16 @@ def main(partial_job_ids, cleanup=False):
)
if container_metadata:
job = job_to_job_definition(job)
metadata = local.get_job_metadata(job, {}, container_metadata)
# create a dummy JobResults with just the message we want
results = JobResults(
outputs=None,
unmatched_patterns=None,
unmatched_outputs=None,
exit_code=container_metadata["State"]["ExitCode"],
image_id=container_metadata["Image"],
message="job killed by OpenSAFELY administrator",
)
metadata = local.get_job_metadata(job, {}, container_metadata, results)
local.write_job_logs(job, metadata, copy_log_to_workspace=False)

if cleanup:
Expand Down
23 changes: 4 additions & 19 deletions opensafely/_vendor/jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from multiprocessing import cpu_count
from pathlib import Path

from opensafely._vendor import pipeline


class ConfigException(Exception):
pass
Expand Down Expand Up @@ -46,6 +48,7 @@ def _is_valid_backend_name(name):

WORKDIR = Path(os.environ.get("WORKDIR", default_work_dir)).resolve()
DATABASE_FILE = WORKDIR / "db.sqlite"
METRICS_FILE = WORKDIR / "metrics.sqlite"
GIT_REPO_DIR = WORKDIR / "repos"

# valid archive formats
Expand Down Expand Up @@ -143,25 +146,7 @@ def database_urls_from_env(env):
) # 16mb


# TODO: we might want to take this list from pipeline if we implement it there.
LEVEL4_FILE_TYPES = [
# tables
".csv",
".tsv",
# images
".jpg",
".jpeg",
".png",
".svg",
".svgz",
# reports
".html",
".pdf",
".txt",
".log",
".json",
".md",
]
LEVEL4_FILE_TYPES = pipeline.constants.LEVEL4_FILE_TYPES

STATA_LICENSE = os.environ.get("STATA_LICENSE")
STATA_LICENSE_REPO = os.environ.get(
Expand Down
35 changes: 25 additions & 10 deletions opensafely/_vendor/jobrunner/create_or_update_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class JobRequestError(Exception):
pass


class StaleCodelistError(JobRequestError):
pass


class NothingToDoError(JobRequestError):
pass

Expand All @@ -62,10 +66,10 @@ def create_or_update_jobs(job_request):
JobRequestError,
) as e:
log.info(f"JobRequest failed:\n{e}")
create_failed_job(job_request, e)
create_job_from_exception(job_request, e)
except Exception:
log.exception("Uncaught error while creating jobs")
create_failed_job(job_request, JobRequestError("Internal error"))
create_job_from_exception(job_request, JobRequestError("Internal error"))
else:
if job_request.cancelled_actions:
log.debug("Cancelling actions: %s", job_request.cancelled_actions)
Expand Down Expand Up @@ -114,7 +118,8 @@ def create_jobs(job_request):


def validate_job_request(job_request):
if config.ALLOWED_GITHUB_ORGS:
# http prefix allows local git repos, useful for tests
if job_request.repo_url.startswith("http") and config.ALLOWED_GITHUB_ORGS:
validate_repo_url(job_request.repo_url, config.ALLOWED_GITHUB_ORGS)
if not job_request.requested_actions:
raise JobRequestError("At least one action must be supplied")
Expand Down Expand Up @@ -238,6 +243,7 @@ def recursively_build_jobs(jobs_by_action, job_request, pipeline_config, action)
commit=job_request.commit,
workspace=job_request.workspace,
database_name=job_request.database_name,
requires_db=action_spec.action.is_database_action,
action=action,
wait_for_job_ids=wait_for_job_ids,
requires_outputs_from=action_spec.needs,
Expand Down Expand Up @@ -311,12 +317,12 @@ def assert_codelists_ok(job_request, new_jobs):
# Codelists are out of date; fail the entire job request if any job
# requires database access
if job.requires_db:
raise JobRequestError(
raise StaleCodelistError(
f"Codelists are out of date (required by action {job.action})"
)


def create_failed_job(job_request, exception):
def create_job_from_exception(job_request, exception):
"""
Sometimes we want to say to the job-server (and the user): your JobRequest
was broken so we weren't able to create any jobs for it. But the only way
Expand All @@ -327,19 +333,25 @@ def create_failed_job(job_request, exception):

This is a bit of a hack, but it keeps the sync protocol simple.
"""
action = "__error__"
error = exception
state = State.FAILED
status_message = str(exception)

# Special case for the NothingToDoError which we treat as a success
if isinstance(exception, NothingToDoError):
state = State.SUCCEEDED
code = StatusCode.SUCCEEDED
status_message = "All actions have already run"
action = job_request.requested_actions[0]
error = None
# StaleCodelistError is a failure but not an INTERNAL_ERROR
elif isinstance(exception, StaleCodelistError):
code = StatusCode.STALE_CODELISTS
else:
state = State.FAILED
code = StatusCode.INTERNAL_ERROR
# include exception name in message to aid debugging
status_message = f"{type(exception).__name__}: {exception}"
action = "__error__"
error = exception

now = time.time()
job = Job(
job_request_id=job_request.id,
Expand Down Expand Up @@ -379,7 +391,10 @@ def set_cancelled_flag_for_actions(job_request_id, actions):
# working.
update_where(
Job,
{"cancelled": True},
{
"cancelled": True,
"completed_at": int(time.time()),
},
job_request_id=job_request_id,
action__in=actions,
)
37 changes: 28 additions & 9 deletions opensafely/_vendor/jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from opensafely._vendor.pipeline.legacy import get_all_output_patterns_from_project_file

from opensafely._vendor.jobrunner import config
from opensafely._vendor.jobrunner import config, record_stats
from opensafely._vendor.jobrunner.executors import volumes
from opensafely._vendor.jobrunner.job_executor import (
ExecutorAPI,
Expand Down Expand Up @@ -241,16 +241,24 @@ def get_status(self, job_definition, timeout=15):
f"docker timed out after {timeout}s inspecting container {name}"
)

metrics = record_stats.read_job_metrics(job_definition.id)

if container is None: # container doesn't exist
if job_definition.cancelled:
if volumes.get_volume_api(job_definition).volume_exists(job_definition):
# jobs prepared but not running do not need to finalize, so we
# proceed directly to the FINALIZED state here
return JobStatus(
ExecutorState.FINALIZED, "Prepared job was cancelled"
ExecutorState.FINALIZED,
"Prepared job was cancelled",
metrics=metrics,
)
else:
return JobStatus(ExecutorState.UNKNOWN, "Pending job was cancelled")
return JobStatus(
ExecutorState.UNKNOWN,
"Pending job was cancelled",
metrics=metrics,
)

# timestamp file presence means we have finished preparing
timestamp_ns = volumes.get_volume_api(job_definition).read_timestamp(
Expand All @@ -261,24 +269,31 @@ def get_status(self, job_definition, timeout=15):
# re-prepare it anyway.
if timestamp_ns is None:
# we are Jon Snow
return JobStatus(ExecutorState.UNKNOWN)
return JobStatus(ExecutorState.UNKNOWN, metrics={})
else:
# we've finish preparing
return JobStatus(ExecutorState.PREPARED, timestamp_ns=timestamp_ns)
return JobStatus(
ExecutorState.PREPARED, timestamp_ns=timestamp_ns, metrics=metrics
)

if container["State"]["Running"]:
timestamp_ns = datestr_to_ns_timestamp(container["State"]["StartedAt"])
return JobStatus(ExecutorState.EXECUTING, timestamp_ns=timestamp_ns)
return JobStatus(
ExecutorState.EXECUTING, timestamp_ns=timestamp_ns, metrics=metrics
)
elif job_definition.id in RESULTS:
return JobStatus(
ExecutorState.FINALIZED,
timestamp_ns=RESULTS[job_definition.id].timestamp_ns,
metrics=metrics,
)
else:
# container present but not running, i.e. finished
# Nb. this does not include prepared jobs, as they have a volume but not a container
timestamp_ns = datestr_to_ns_timestamp(container["State"]["FinishedAt"])
return JobStatus(ExecutorState.EXECUTED, timestamp_ns=timestamp_ns)
return JobStatus(
ExecutorState.EXECUTED, timestamp_ns=timestamp_ns, metrics=metrics
)

def get_results(self, job_definition):
if job_definition.id not in RESULTS:
Expand Down Expand Up @@ -409,7 +424,9 @@ def finalize_job(job_definition):
base_revision=labels.get("org.opensafely.base.vcs-ref", "unknown"),
base_created=labels.get("org.opencontainers.base.build-date", "unknown"),
)
job_metadata = get_job_metadata(job_definition, outputs, container_metadata)
job_metadata = get_job_metadata(
job_definition, outputs, container_metadata, results
)

if job_definition.cancelled:
write_job_logs(job_definition, job_metadata, copy_log_to_workspace=False)
Expand All @@ -426,7 +443,7 @@ def finalize_job(job_definition):
return results


def get_job_metadata(job_definition, outputs, container_metadata):
def get_job_metadata(job_definition, outputs, container_metadata, results):
# job_metadata is a big dict capturing everything we know about the state
# of the job
job_metadata = dict()
Expand All @@ -437,6 +454,7 @@ def get_job_metadata(job_definition, outputs, container_metadata):
job_metadata["docker_image_id"] = container_metadata["Image"]
# convert exit code to str so 0 exit codes get logged
job_metadata["exit_code"] = str(container_metadata["State"]["ExitCode"])
job_metadata["status_message"] = results.message
job_metadata["container_metadata"] = container_metadata
job_metadata["outputs"] = outputs
job_metadata["commit"] = job_definition.study.commit
Expand Down Expand Up @@ -679,6 +697,7 @@ def write_log_file(job_definition, job_metadata, filename, excluded):
"commit",
"docker_image_id",
"exit_code",
"status_message",
"created_at",
"completed_at",
"database_name",
Expand Down
1 change: 1 addition & 0 deletions opensafely/_vendor/jobrunner/job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class JobStatus:
timestamp_ns: int = (
None # timestamp this JobStatus occurred, in integer nanoseconds
)
metrics: dict = field(default_factory=dict)


@dataclass
Expand Down
19 changes: 0 additions & 19 deletions opensafely/_vendor/jobrunner/lib/commands.py

This file was deleted.

1 change: 1 addition & 0 deletions opensafely/_vendor/jobrunner/lib/docker_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def get_container_stats(timeout=DEFAULT_TIMEOUT):
removeprefix(row["Name"], "os-job-"): {
"cpu_percentage": float(row["CPUPerc"].rstrip("%")),
"memory_used": _parse_size(row["MemUsage"].split()[0]),
"container_id": row["Container"],
}
for row in data
if row["Name"].startswith("os-job-")
Expand Down
Loading