Skip to content

Commit

Permalink
fix: Update to job-runner v2.74.1
Browse files Browse the repository at this point in the history
  • Loading branch information
evansd committed Oct 1, 2024
1 parent c0469e3 commit f0b067c
Show file tree
Hide file tree
Showing 19 changed files with 194 additions and 93 deletions.
1 change: 1 addition & 0 deletions opensafely/_vendor/jobrunner/cli/local_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def create_and_run_jobs(
config.USING_DUMMY_DATA_BACKEND = True
config.CLEAN_UP_DOCKER_OBJECTS = clean_up_docker_objects
config.MAX_WORKERS = concurrency
config.MAX_DB_WORKERS = concurrency
config.DEFAULT_JOB_MEMORY_LIMIT = memory
config.DEFAULT_JOB_CPU_COUNT = cpus

Expand Down
114 changes: 96 additions & 18 deletions opensafely/_vendor/jobrunner/cli/manifests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
from opensafely._vendor.jobrunner.models import Job


def main(workspaces=None):
def main():
conn = database.get_connection()

workspaces = [
w["workspace"] for w in conn.execute("SELECT DISTINCT(workspace) FROM job;")
]
Expand All @@ -20,19 +21,68 @@ def main(workspaces=None):

level4_dir = local.get_medium_privacy_workspace(workspace)

sentinel = level4_dir / ".manifest-backfill"
if sentinel.exists():
print(" - already done, skipping")
if not level4_dir.exists():
print(" - L4 dir doesn't exist")
continue

write_manifest(workspace)
manifest_path = level4_dir / "metadata/manifest.json"

sentinel.touch()
if manifest_path.exists():
update_manifest(workspace)
else:
# this was skipped in initial backfill, as L3 dir was archived
# but we still need manifest.json for the L4 dir for airlock to
# work. But its constructed differently - we'll only have l4 output
# files available.
write_archived_manifest(workspace)


def write_manifest(workspace):
conn = database.get_connection()
LIMIT = 16 * 1024 * 1024


def update_manifest(workspace):
"""update repo and col/row counts, if missing."""
level4_dir = local.get_medium_privacy_workspace(workspace)
workspace_dir = local.get_high_privacy_workspace(workspace)
manifest = local.read_manifest_file(level4_dir, workspace)

for output, metadata in manifest["outputs"].items():
if "repo" not in metadata:
job = database.find_one(Job, id=metadata["job_id"])
metadata["repo"] = job.repo_url
print(f" - updating repo for {output}")

if metadata["level"] == "moderately_sensitive" and output.endswith(".csv"):
abspath = level4_dir / output
if not abspath.exists():
# excluded file, so look at L3 file
abspath = workspace_dir / output

if not abspath.exists():
print(f" - {output} does not exist any more")
continue

if abspath.stat().st_size > LIMIT:
print(f" - {output} is too large to measure rows")
continue

try:
csv_counts, headers = local.get_csv_counts(abspath)
except Exception:
csv_counts = {}

print(f" - updating row/col counts for {output}")
metadata["row_count"] = csv_counts.get("rows")
metadata["col_count"] = csv_counts.get("cols")

print(
f" - writing manifest for archived workspace {workspace} with {len(manifest['outputs'])} outputs"
)
local.write_manifest_file(level4_dir, manifest)


def write_archived_manifest(workspace):
conn = database.get_connection()
level4_dir = local.get_medium_privacy_workspace(workspace)

# ordering by most recent ensures we find the job that generated the
Expand Down Expand Up @@ -60,21 +110,49 @@ def write_manifest(workspace):
# older version of the file, ignore
continue

abspath = workspace_dir / output
if level != "moderately_sensitive":
continue

# use presence of message file to detect excluded files
abspath = level4_dir / output
message_file = level4_dir / (output + ".txt")

excluded = message_file.exists()

metadata = local.get_output_metadata(
abspath,
level,
job_id=job_id,
job_request=job.job_request_id,
action=job.action,
commit=job.commit,
excluded=excluded,
)
if abspath.exists():
csv_counts = {}
if abspath.name.suffix == ".csv":
csv_counts = local.get_csv_counts()

metadata = local.get_output_metadata(
abspath,
level,
job_id=job_id,
job_request=job.job_request_id,
action=job.action,
commit=job.commit,
repo=job.repo_url,
excluded=excluded,
csv_counts=csv_counts,
)

else:
# we don't have the source file to hash or inspect, probably because it was excluded
metadata = {
"level": level,
"job_id": job.id,
"job_request": job.job_request_id,
"action": job.action,
"commit": job.commit,
"repo": job.repo_url,
"size": 0,
"timestamp": message_file.stat().st_mtime if excluded else None,
"content_hash": None,
"excluded": excluded,
"message": None,
"row_count": None,
"col_count": None,
}

outputs[output] = metadata

Expand Down
64 changes: 39 additions & 25 deletions opensafely/_vendor/jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,22 +541,26 @@ def persist_outputs(job_definition, outputs, job_metadata):
# if it previously had a message, delete it
delete_files_from_directory(medium_privacy_dir, [message_file])

# Update manifest with file metdata
manifest = read_manifest_file(medium_privacy_dir, job_definition.workspace)
new_outputs = {}

for filename, level in outputs.items():
abspath = workspace_dir / filename
manifest["outputs"][filename] = get_output_metadata(
new_outputs[filename] = get_output_metadata(
abspath,
level,
job_id=job_definition.id,
job_request=job_definition.job_request_id,
action=job_definition.action,
commit=job_definition.study.commit,
repo=job_definition.study.git_repo_url,
excluded=filename in excluded_file_msgs,
message=excluded_job_msgs.get(filename),
csv_counts=csv_metadata.get(filename),
)

# Update manifest with file metdata
manifest = read_manifest_file(medium_privacy_dir, job_definition.workspace)
manifest["outputs"].update(**new_outputs)
write_manifest_file(medium_privacy_dir, manifest)

return excluded_job_msgs
Expand All @@ -569,6 +573,7 @@ def get_output_metadata(
job_request,
action,
commit,
repo,
excluded,
message=None,
csv_counts=None,
Expand All @@ -582,6 +587,7 @@ def get_output_metadata(
"job_id": job_id,
"job_request": job_request,
"action": action,
"repo": repo,
"commit": commit,
"size": stat.st_size,
"timestamp": stat.st_mtime,
Expand Down Expand Up @@ -646,16 +652,44 @@ def get_output_metadata(
"""


def get_csv_counts(path):
csv_counts = {}
with path.open() as f:
reader = csv.DictReader(f)
headers = reader.fieldnames
first_row = next(reader, None)
if first_row:
csv_counts["cols"] = len(first_row)
csv_counts["rows"] = sum(1 for _ in reader) + 1
else:
csv_counts["cols"] = csv_counts["rows"] = 0

return csv_counts, headers


def check_l4_file(job_definition, filename, size, workspace_dir):
def mb(b):
return round(b / (1024 * 1024), 2)

job_msgs = []
file_msgs = []
csv_counts = {"rows": None, "cols": None}
headers = []

suffix = Path(filename).suffix
if suffix not in config.LEVEL4_FILE_TYPES:

if size > job_definition.level4_max_filesize:
job_msgs.append(
f"File size of {mb(size)}Mb is larger that limit of {mb(job_definition.level4_max_filesize)}Mb."
)
file_msgs.append(
MAX_SIZE_MSG.format(
filename=filename,
size=mb(size),
limit=mb(job_definition.level4_max_filesize),
)
)
elif suffix not in config.LEVEL4_FILE_TYPES:
job_msgs.append(f"File type of {suffix} is not valid level 4 file")
file_msgs.append(INVALID_FILE_TYPE_MSG.format(filename=filename, suffix=suffix))

Expand All @@ -664,15 +698,7 @@ def mb(b):
# this may need to be abstracted in future
actual_file = workspace_dir / filename
try:
with actual_file.open() as f:
reader = csv.DictReader(f)
headers = reader.fieldnames
first_row = next(reader, None)
if first_row:
csv_counts["cols"] = len(first_row)
csv_counts["rows"] = sum(1 for _ in reader) + 1
else:
csv_counts["cols"] = csv_counts["rows"] = 0
csv_counts, headers = get_csv_counts(actual_file)
except Exception:
pass
else:
Expand All @@ -691,18 +717,6 @@ def mb(b):
)
)

if size > job_definition.level4_max_filesize:
job_msgs.append(
f"File size of {mb(size)}Mb is larger that limit of {mb(job_definition.level4_max_filesize)}Mb."
)
file_msgs.append(
MAX_SIZE_MSG.format(
filename=filename,
size=mb(size),
limit=mb(job_definition.level4_max_filesize),
)
)

if job_msgs:
return False, ",".join(job_msgs), "\n\n".join(file_msgs), csv_counts
else:
Expand Down
6 changes: 6 additions & 0 deletions opensafely/_vendor/jobrunner/lib/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ def get_connection(filename=None):
conn.row_factory = sqlite3.Row
cache[filename] = conn

# use WAL to enable other processes (e.g. operational tasks) to read the DB.
# job-runner should be the only active writer, which means if we need
# some other process to write the db (e.g. a backfill), then we should
# stop job-runner.
conn.execute("PRAGMA journal_mode=WAL")

return cache[filename]


Expand Down
2 changes: 2 additions & 0 deletions opensafely/_vendor/jobrunner/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ def trace_attributes(job, results=None):
# convert float seconds to ns integer
created_at=int(job.created_at * 1e9),
started_at=int(job.started_at * 1e9) if job.started_at else None,
# when did the state last change?
status_code_updated_at=job.status_code_updated_at,
requires_db=job.requires_db,
)

Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from opensafely_jobrunner-2.74.1.dist-info import *
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: opensafely-jobrunner
Version: 2.74.0
Version: 2.74.1
Summary: OpenSAFELY job scheduling and executor
Author-email: OpenSAFELY <[email protected]>
License: OpenSAFELY Job Runner
Expand All @@ -26,7 +26,7 @@ Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: opensafely-pipeline @ git+https://github.com/opensafely-core/[email protected]
Requires-Dist: opensafely-pipeline@ git+https://github.com/opensafely-core/[email protected]
Requires-Dist: ruyaml
Requires-Dist: requests
Requires-Dist: opentelemetry-exporter-otlp-proto-http
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
../../bin/add_job,sha256=toLR2KSS5cK3diLpL2Ajp_8tgNMruAyHs0StB_zG8F8,253
../../bin/flags,sha256=2k4O7Xrn-VLneAhXopy6EwaiVZFNGvZbNGRWEfbisdo,251
../../bin/kill_job,sha256=mJAcqJU9vpGr_ELZ9SeFlx8nIc3tPcfqFoQ0LOZd-g4,254
../../bin/local_run,sha256=jkMxn9s6xP_cEcjcRGwJatLY4WRjegxb8Px1HS45COQ,255
../../bin/migrate,sha256=HrGrHt548NRrLzHGT6nypdb_APyAh3gtX1Rs43PESCE,253
../../bin/prepare_for_reboot,sha256=V4JSzjPsMR5Jv6gO1u1EsjZwZ48J3i2IhrX07op-yUc,264
../../bin/retry_job,sha256=w3ThBEgbjoJMaJCmiyIs6tPrzGT1sZS6UrhJ19QrjzE,255
../../bin/add_job,sha256=auF1lMohrk8Fk05a3Ezlpi8IVlS7xVzeZReWtP-IsWw,255
../../bin/flags,sha256=epygIZezoksuFAsTDKYklZoNvcE9q_H-HxEQ6sV_Ppo,253
../../bin/kill_job,sha256=jHNhQDHDyPzXXTpvr1vkB76xacBvcPy7vci3pAzPmxc,256
../../bin/local_run,sha256=Wffot83T6YFDtKAVtlQm6c9mGigY7F1jCBIKgds8OVk,257
../../bin/migrate,sha256=wd49OrcUdgPqBacylXP1JhbH961m-gRtcsa5sdMHzTo,255
../../bin/prepare_for_reboot,sha256=oY3JNDwuV91RqasOVovpZWkfEUGo5WhVxNf2nOA9Bk0,266
../../bin/retry_job,sha256=t_oJk2Zi_rocmK5tmd2k3qRCQ0Ec_nC1PLCYa_O95XY,257
jobrunner/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
jobrunner/actions.py,sha256=bGcLGnsgTKW9mHkZSiN5HdauFADCiUu6v_VlsWhNAOc,3128
jobrunner/cli/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
jobrunner/cli/add_job.py,sha256=hbOFNJx4bUTXch3IriMOwUG66lfOHIbyGnTdPYnCvmA,2968
jobrunner/cli/flags.py,sha256=6dFPWab0vB6CGCLg5yMAp9iQJo8Y6rN5cfQ7bEXhgqs,2704
jobrunner/cli/kill_job.py,sha256=_cTNi-JjirvCVfK2vSs2SQYREkCiyj2jLeNG4yYaZAc,3333
jobrunner/cli/local_run.py,sha256=G1UQ-NEijyB4dAiC18fqTbPI4k18BGyPfj-DnOGyc6c,25679
jobrunner/cli/manifests.py,sha256=JC3QMS6zl4360J0OKtS50cnqSWcLFbMEh28UmRZQzbg,2622
jobrunner/cli/local_run.py,sha256=iWfbL4DZ0dXSqNkNCF5XdkdY0Hnc7_EH0NfZfvl7YVA,25719
jobrunner/cli/manifests.py,sha256=dVFDE4OLmS75HHaWxWqToDz4ZdpLL1mJh_3sH4Ff43c,5619
jobrunner/cli/migrate.py,sha256=V2cI3Kee67DNhqJUiDnALNvzHlP4mx_myKQycD4E9uI,609
jobrunner/cli/prepare_for_reboot.py,sha256=Zdajs1cnkCCsKklFjg3mynU2NJqd5n0zFct3SdA9Mig,1493
jobrunner/cli/retry_job.py,sha256=qDTiYwxc59QYZBLfgv_t-CAa6kmmhiCKh0sLpv5xhwA,2458
jobrunner/config.py,sha256=HU1SNXMzjyDNQO0pqFvNaoeOZyheO8QnGJlf5GdOxT8,9531
jobrunner/create_or_update_jobs.py,sha256=eA-wK0VwWwRFF5_t8Qfz4ddCOL5COnNyAAXCZWKuTgY,15344
jobrunner/executors/__init__.py,sha256=4-eu9LwIzhALtsq1LDC9NQ_5nbcjsPDdIEGvRvZwIbo,283
jobrunner/executors/local.py,sha256=k9b181DDoMYg9_lpAym7a8LJ8TQM7omICcsUlpMCEiY,35197
jobrunner/executors/local.py,sha256=fXUOiNg4uqMZswGeYhnnvS6YM23SnVGJwS80U6fjbYs,35423
jobrunner/executors/logging.py,sha256=iCISXFR8sbtCrp-E3jaQlC1Kw6Huf65b-dqomrJzywI,2104
jobrunner/executors/volumes.py,sha256=H8lISCydAyi9-g3p344KkwVhNhrqfWO5RO-NZRFVM5c,7102
jobrunner/job_executor.py,sha256=523lwyHQRPeup6dZKC-hLjW_LYCb9ppiR0tZGLssHr4,13928
jobrunner/lib/__init__.py,sha256=EwrN6m71VpDCvi-vTzcuIvelJ6gZOBB1rf5KyF13xjc,4388
jobrunner/lib/database.py,sha256=tAa650LomUvf4Gc0q0A-2cXswqoSSGPKmc1fh9SURFQ,12272
jobrunner/lib/database.py,sha256=AWwUI9AdVNSIKhyqt8H4p0bIuBQ7DUbkgYHlBgq2__Q,12590
jobrunner/lib/docker.py,sha256=C2fp3quN4vkaqg2MvMNC_k6Zbz8awN-oFtFuSLqA6xY,15825
jobrunner/lib/docker_stats.py,sha256=PBx1eU7Rax-Q-fRLKXGSvFv-UD6IYIEENqH6_hoWpKU,1357
jobrunner/lib/git.py,sha256=5Bw3bRk4EJaNEmcOADFk8Ww_NHeF5GtqDpJ5rR3KYFA,13145
Expand All @@ -42,13 +42,13 @@ jobrunner/reusable_actions.py,sha256=yt9qSKXUPIPxI-2wM7tgUFJdQlOMneqZqqT8JUpVQow
jobrunner/run.py,sha256=gr2GIrplwgKVHQqzVZUSKftcp8cF-wOOi4fGtR3QZ1U,27565
jobrunner/service.py,sha256=MhppSwuGiDTrkcduxGfmHLoUpD1Ao0fRI2lfuQkb11Y,4182
jobrunner/sync.py,sha256=nRyHluwAxQjSNw36xpq5sJXhJNLmtREOHjFjhCT7P7A,5127
jobrunner/tracing.py,sha256=O9CGARNrGYpEqxze2hJYqXJYVb8DJxfasJZFQ4NmdbI,13043
opensafely_jobrunner-2.74.0.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
opensafely_jobrunner-2.74.0.dist-info/LICENSE,sha256=F5fS3mizkbW4yOk3XP--G0oDJbZAovAKuSIZShtkCw4,671
opensafely_jobrunner-2.74.0.dist-info/METADATA,sha256=OPWQrg9ykaVDVN4IUZblXHlv29YsJqI1B-SXvSvDLhM,8212
opensafely_jobrunner-2.74.0.dist-info/RECORD,,
opensafely_jobrunner-2.74.0.dist-info/REQUESTED,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
opensafely_jobrunner-2.74.0.dist-info/WHEEL,sha256=GJ7t_kWBFywbagK5eo9IoUwLW6oyOeTKmQ-9iHFVNxQ,92
opensafely_jobrunner-2.74.0.dist-info/direct_url.json,sha256=XLe5Ipno5ybN2wjgjKC78_YrAA5dnXQZIb16GjCIhBU,174
opensafely_jobrunner-2.74.0.dist-info/entry_points.txt,sha256=hat6DNe6ZtwPqk0GIs5BOzd-18yfWfwJrouA1YAmBJY,298
opensafely_jobrunner-2.74.0.dist-info/top_level.txt,sha256=dHLIHTr12iPEGMfrfPkXrkh8qGsw52DE0cbpHQVbiic,10
jobrunner/tracing.py,sha256=F_q9wM7Do5u5GroGvqTL1VW5vElr092wg_L3CMFBHaE,13144
opensafely_jobrunner-2.74.1.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
opensafely_jobrunner-2.74.1.dist-info/LICENSE,sha256=F5fS3mizkbW4yOk3XP--G0oDJbZAovAKuSIZShtkCw4,671
opensafely_jobrunner-2.74.1.dist-info/METADATA,sha256=HumP74hsF-6zd8Em2KQgtDOLIKFye2JbYn_QCDz--Hs,8211
opensafely_jobrunner-2.74.1.dist-info/RECORD,,
opensafely_jobrunner-2.74.1.dist-info/REQUESTED,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
opensafely_jobrunner-2.74.1.dist-info/WHEEL,sha256=GV9aMThwP_4oNCtvEC2ec3qUYutgWeAzklro_0m4WJQ,91
opensafely_jobrunner-2.74.1.dist-info/direct_url.json,sha256=mfNX3sM6Fdj5fLMywICmzf-i4KV4qcNFRjJ6bBFzIy0,174
opensafely_jobrunner-2.74.1.dist-info/entry_points.txt,sha256=hat6DNe6ZtwPqk0GIs5BOzd-18yfWfwJrouA1YAmBJY,298
opensafely_jobrunner-2.74.1.dist-info/top_level.txt,sha256=dHLIHTr12iPEGMfrfPkXrkh8qGsw52DE0cbpHQVbiic,10
Loading

0 comments on commit f0b067c

Please sign in to comment.