Skip to content

Commit

Permalink
Bump job-runner to v2.69.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ghickman committed Aug 16, 2023
1 parent d9ad894 commit 0e62380
Show file tree
Hide file tree
Showing 89 changed files with 5,518 additions and 3,637 deletions.
2 changes: 1 addition & 1 deletion opensafely/_vendor/chardet-3.0.4.dist-info/RECORD
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
../../bin/chardetect,sha256=Iim50Xo_A-6jNKXhng0od1YWXEr5OEzrv2MBb1TfLes,256
../../bin/chardetect,sha256=k6iJnsNSUeMlNutkq2v__zPIQpuglvel238-vfYDBlA,283
chardet-3.0.4.dist-info/DESCRIPTION.rst,sha256=PQ4sBsMyKFZkjC6QpmbpLn0UtCNyeb-ZqvCGEgyZMGk,2174
chardet-3.0.4.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
chardet-3.0.4.dist-info/METADATA,sha256=RV_2I4B1Z586DL8oVO5Kp7X5bUdQ5EuKAvNoAEF8wSw,3239
Expand Down
2 changes: 1 addition & 1 deletion opensafely/_vendor/distro-1.8.0.dist-info/RECORD
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
../../bin/distro,sha256=LL7TkGdbIp5yO7jgBe9K5tlwiCJXMlG9hKhj5qmlETo,247
../../bin/distro,sha256=24TzW2cWStxsIbDGwqb31a4yu0LPhBq-sA9gV1wIgWs,274
distro-1.8.0.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
distro-1.8.0.dist-info/LICENSE,sha256=y16Ofl9KOYjhBjwULGDcLfdWBfTEZRXnduOspt-XbhQ,11325
distro-1.8.0.dist-info/METADATA,sha256=NhYw94UPXb78_Z3_VtLxTJ1zQgUUKoTndg10uKJX800,6915
Expand Down
27 changes: 9 additions & 18 deletions opensafely/_vendor/jobrunner/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

from opensafely._vendor.jobrunner.lib.path_utils import ensure_unix_path

from .extractors import is_extraction_command


class UnknownActionError(ProjectValidationError):
pass
Expand Down Expand Up @@ -56,7 +54,7 @@ def get_action_specification(config, action_id, using_dummy_data_backend=False):
]

# Special case handling for the `cohortextractor generate_cohort` command
if is_extraction_command(run_parts, require_version=1):
if is_cohortextractor_generate_cohort(run_parts):
# Set the size of the dummy data population, if that's what we're
# generating. Possibly this should be moved to the study definition
# anyway, which would make this unnecessary.
Expand All @@ -76,25 +74,18 @@ def get_action_specification(config, action_id, using_dummy_data_backend=False):
# directory the `outputs` spec is expecting.
run_parts.append(f"--output-dir={output_dirs[0]}")

elif is_extraction_command(run_parts, require_version=2):
# databuilder expects all command line arguments to be
# specified in the run command
target = "--dummy-data-file"
if using_dummy_data_backend and not any(
arg == target or arg.startswith(f"{target}=") for arg in run_parts
):
raise ProjectValidationError(
"--dummy-data-file is required for a local run"
)

# TODO: we can probably remove this fork since the v1&2 forks cover it
elif is_extraction_command(run_parts): # pragma: no cover
raise RuntimeError("Unhandled cohortextractor version")

run_command = shlex.join(run_parts)

return ActionSpecification(
run=run_command,
needs=action_spec.needs,
outputs=action_spec.outputs.dict(exclude_unset=True),
)


def is_cohortextractor_generate_cohort(args):
return (
len(args) > 1
and args[0].startswith("cohortextractor:")
and args[1] == "generate_cohort"
)
2 changes: 0 additions & 2 deletions opensafely/_vendor/jobrunner/cli/local_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,6 @@ def create_and_run_jobs(
):
# Fiddle with the configuration to suit what we need for running local jobs
docker.LABEL = docker_label
# It's more helpful in this context to have things consistent
config.RANDOMISE_JOB_ORDER = False
config.HIGH_PRIVACY_WORKSPACES_DIR = project_dir.parent
config.DATABASE_FILE = project_dir / "metadata" / "db.sqlite"
config.TMP_DIR = temp_dir
Expand Down
21 changes: 12 additions & 9 deletions opensafely/_vendor/jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,6 @@ def _is_valid_backend_name(name):
MAX_DB_WORKERS = int(os.environ.get("MAX_DB_WORKERS") or MAX_WORKERS)
MAX_RETRIES = int(os.environ.get("MAX_RETRIES", 0))

# This is a crude mechanism for preventing a single large JobRequest with lots
# of associated Jobs from hogging all the resources. We want this configurable
# because it's useful to be able to disable this during tests and when running
# locally
RANDOMISE_JOB_ORDER = True


STATA_LICENSE = os.environ.get("STATA_LICENSE")
STATA_LICENSE_REPO = os.environ.get(
Expand Down Expand Up @@ -248,9 +242,11 @@ def parse_job_resource_weights(config_file):
# docker specific exit codes we understand
DOCKER_EXIT_CODES = {
# 137 = 128+9, which means was killed by signal 9, SIGKILL
# Note: this can also mean killed by OOM killer, but that's explicitly
# handled already.
137: "Killed by an OpenSAFELY admin",
# This could be killed externally by an admin, or terminated through the
# cancellation process.
# Note: this can also mean killed by OOM killer, if the value of OOMKilled
# is incorrect (as sometimes recently observed)
137: "Job killed by OpenSAFELY admin or memory limits",
}

# BindMountVolumeAPI config
Expand All @@ -277,3 +273,10 @@ def parse_job_resource_weights(config_file):
else:
DOCKER_USER_ID = None
DOCKER_GROUP_ID = None


# The name of a Docker network configured to allow access to just the database and
# nothing else. Setup and configuration of this network is expected to be managed
# externally. See:
# https://github.com/opensafely-core/backend-server/pull/105
DATABASE_ACCESS_NETWORK = os.environ.get("DATABASE_ACCESS_NETWORK", "jobrunner-db")
138 changes: 120 additions & 18 deletions opensafely/_vendor/jobrunner/executors/local.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import datetime
import json
import logging
import socket
import subprocess
import tempfile
import time
import urllib.parse
from pathlib import Path

from opensafely._vendor.pipeline.legacy import get_all_output_patterns_from_project_file
Expand Down Expand Up @@ -84,6 +86,20 @@ def workspace_is_archived(workspace):
return False


def was_oomkilled(container):
# Nb. this flag has been observed to be unreliable on some versions of Linux
return container["State"]["ExitCode"] == 137 and container["State"]["OOMKilled"]


def oomkilled_message(container):
message = "Job ran out of memory"
memory_limit = container.get("HostConfig", {}).get("Memory", 0)
if memory_limit > 0:
gb_limit = memory_limit / (1024**3)
message += f" (limit was {gb_limit:.2f}GB)"
return message


class LocalDockerAPI(ExecutorAPI):
"""ExecutorAPI implementation using local docker service."""

Expand Down Expand Up @@ -139,6 +155,13 @@ def execute(self, job_definition):
extra_args.extend(["--cpus", str(job_definition.cpu_count)])
if job_definition.memory_limit:
extra_args.extend(["--memory", job_definition.memory_limit])
# We use a custom Docker network configured so that database jobs can access the
# database and nothing else
if job_definition.allow_database_access and config.DATABASE_ACCESS_NETWORK:
extra_args.extend(["--network", config.DATABASE_ACCESS_NETWORK])
extra_args.extend(
get_dns_args_for_docker(job_definition.env.get("DATABASE_URL"))
)

if not volume_api.requires_root:
if config.DOCKER_USER_ID and config.DOCKER_GROUP_ID:
Expand Down Expand Up @@ -175,9 +198,12 @@ def execute(self, job_definition):

def finalize(self, job_definition):

current = self.get_status(job_definition)
if current.state != ExecutorState.EXECUTED:
return current
current_status = self.get_status(job_definition)
if current_status.state == ExecutorState.UNKNOWN:
# job had not started running, so do not finalize
return current_status

assert current_status.state in [ExecutorState.EXECUTED, ExecutorState.ERROR]

try:
finalize_job(job_definition)
Expand All @@ -188,8 +214,22 @@ def finalize(self, job_definition):
return JobStatus(ExecutorState.FINALIZED)

def terminate(self, job_definition):
current_status = self.get_status(job_definition)
if current_status.state == ExecutorState.UNKNOWN:
# job was pending, so do not go to EXECUTED
return current_status

assert current_status.state in [
ExecutorState.EXECUTING,
ExecutorState.ERROR,
ExecutorState.PREPARED,
]

docker.kill(container_name(job_definition))
return JobStatus(ExecutorState.ERROR, "terminated by api")

return JobStatus(
ExecutorState.EXECUTED, f"Job terminated by {job_definition.cancelled}"
)

def cleanup(self, job_definition):
if config.CLEAN_UP_DOCKER_OBJECTS:
Expand All @@ -216,6 +256,16 @@ def get_status(self, job_definition, timeout=15):
)

if container is None: # container doesn't exist
if job_definition.cancelled:
if volumes.get_volume_api(job_definition).volume_exists(job_definition):
# jobs prepared but not running do not need to finalize, so we
# proceed directly to the FINALIZED state here
return JobStatus(
ExecutorState.FINALIZED, "Prepared job was cancelled"
)
else:
return JobStatus(ExecutorState.UNKNOWN, "Pending job was cancelled")

# timestamp file presence means we have finished preparing
timestamp_ns = volumes.get_volume_api(job_definition).read_timestamp(
job_definition, TIMESTAMP_REFERENCE_FILE, 10
Expand All @@ -238,7 +288,22 @@ def get_status(self, job_definition, timeout=15):
ExecutorState.FINALIZED,
timestamp_ns=RESULTS[job_definition.id].timestamp_ns,
)
else: # container present but not running, i.e. finished
else:
# container present but not running, i.e. finished
# Nb. this does not include prepared jobs, as they have a volume but not a container
if job_definition.cancelled:
return JobStatus(
ExecutorState.EXECUTED,
f"Job cancelled by {job_definition.cancelled}",
)
if was_oomkilled(container):
return JobStatus(ExecutorState.ERROR, oomkilled_message(container))
if container["State"]["ExitCode"] == 137:
return JobStatus(
ExecutorState.ERROR,
"Job either ran out of memory or was killed by an admin",
)

timestamp_ns = datestr_to_ns_timestamp(container["State"]["FinishedAt"])
return JobStatus(ExecutorState.EXECUTED, timestamp_ns=timestamp_ns)

Expand Down Expand Up @@ -313,11 +378,23 @@ def finalize_job(job_definition):
container_name(job_definition), none_if_not_exists=True
)
if not container_metadata:
raise LocalDockerError("Job container has vanished")
if job_definition.cancelled:
# no logs to retain if the container didn't start yet
return
else:
raise LocalDockerError(
f"Job container {container_name(job_definition)} has vanished"
)
redact_environment_variables(container_metadata)

outputs, unmatched_patterns = find_matching_outputs(job_definition)
unmatched_outputs = get_unmatched_outputs(job_definition, outputs)
if job_definition.cancelled:
# assume no outputs because our job didn't finish
outputs = {}
unmatched_patterns = []
unmatched_outputs = []
else:
outputs, unmatched_patterns = find_matching_outputs(job_definition)
unmatched_outputs = get_unmatched_outputs(job_definition, outputs)

exit_code = container_metadata["State"]["ExitCode"]
labels = container_metadata.get("Config", {}).get("Labels", {})
Expand All @@ -326,21 +403,18 @@ def finalize_job(job_definition):
# that have db access
message = None

# special case OOMKilled
if container_metadata["State"]["OOMKilled"]:
message = "Ran out of memory"
memory_limit = container_metadata.get("HostConfig", {}).get("Memory", 0)
if memory_limit > 0:
gb_limit = memory_limit / (1024**3)
message += f" (limit for this job was {gb_limit:.2f}GB)"
if exit_code == 137 and job_definition.cancelled:
message = f"Job cancelled by {job_definition.cancelled}"
elif was_oomkilled(container_metadata):
message = oomkilled_message(container_metadata)
else:
message = config.DOCKER_EXIT_CODES.get(exit_code)

results = JobResults(
outputs=outputs,
unmatched_patterns=unmatched_patterns,
unmatched_outputs=unmatched_outputs,
exit_code=container_metadata["State"]["ExitCode"],
exit_code=exit_code,
image_id=container_metadata["Image"],
message=message,
timestamp_ns=time.time_ns(),
Expand All @@ -351,8 +425,12 @@ def finalize_job(job_definition):
base_created=labels.get("org.opencontainers.base.build-date", "unknown"),
)
job_metadata = get_job_metadata(job_definition, outputs, container_metadata)
write_job_logs(job_definition, job_metadata)
persist_outputs(job_definition, results.outputs, job_metadata)

if job_definition.cancelled:
write_job_logs(job_definition, job_metadata, copy_log_to_workspace=False)
else:
write_job_logs(job_definition, job_metadata, copy_log_to_workspace=True)
persist_outputs(job_definition, results.outputs, job_metadata)
RESULTS[job_definition.id] = results


Expand Down Expand Up @@ -602,3 +680,27 @@ def write_manifest_file(workspace_dir, manifest):
manifest_file_tmp = manifest_file.with_suffix(".tmp")
manifest_file_tmp.write_text(json.dumps(manifest, indent=2))
manifest_file_tmp.replace(manifest_file)


def get_dns_args_for_docker(database_url):
# This is various shades of horrible. For containers on a custom network, Docker
# creates an embedded DNS server, available on 127.0.0.11 from within the container.
# This proxies non-local requests out to the host DNS server. We want to lock these
# containers down the absolute bare minimum of network access, which does not
# include DNS. However there is no way of disabling this embedded server, see:
# https://github.com/moby/moby/issues/19474
#
# As a workaround, we give it a "dummy" IP in place of the host resolver so that
# requests from inside the container never go anywhere. This IP was taken from the
# reserved test range specified in:
# https://www.rfc-editor.org/rfc/rfc5737
args = ["--dns", "192.0.2.0"]

# Where the database URL uses a hostname rather than an IP, we resolve that here and
# use the `--add-host` option to include it in the container's `/etc/hosts` file.
if database_url:
database_host = urllib.parse.urlparse(database_url).hostname
database_ip = socket.gethostbyname(database_host)
if database_host != database_ip:
args.extend(["--add-host", f"{database_host}:{database_ip}"])
return args
21 changes: 0 additions & 21 deletions opensafely/_vendor/jobrunner/extractors.py

This file was deleted.

12 changes: 10 additions & 2 deletions opensafely/_vendor/jobrunner/job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class JobDefinition:
allow_database_access: bool # whether this job should have access to the database
cpu_count: str = None # number of CPUs to be allocated
memory_limit: str = None # memory limit to apply
# if a job has been cancelled, the name of the canceller - either "user" or "admin"
cancelled: str = None


class ExecutorState(Enum):
Expand Down Expand Up @@ -193,6 +195,8 @@ def finalize(self, job_definition: JobDefinition) -> JobStatus:
The action log file and any useful metadata from the job run should also be written to a separate log storage
area in long-term storage.
If the job has been cancelled, it should only preserve the action log file.
When the finalize task finishes, the get_status() call should now return FINALIZED for this job, and
get_results() call should return the JobResults for this job.
Expand All @@ -203,11 +207,15 @@ def finalize(self, job_definition: JobDefinition) -> JobStatus:

def terminate(self, job_definition: JobDefinition) -> JobStatus:
"""
Terminate a running job, transitioning to the ERROR state.
Terminate a running job, transitioning to the EXECUTED state.
1. If any task for this job is running, terminate it, do not wait for it to complete.
2. Return ERROR state with a message.
2. Return EXECUTED state with a message.
Terminating a running job is considered an expected state, not an error state. This decision
also makes it easier for current executor implementations to cleanup after termination, and
is consistent with the handling of programs that exit of their own accord with a return code.
"""

Expand Down
Loading

0 comments on commit 0e62380

Please sign in to comment.