diff --git a/taskcluster/kinds/finetune-student/kind.yml b/taskcluster/kinds/finetune-student/kind.yml index a9c252e0e..24e4bdbe7 100644 --- a/taskcluster/kinds/finetune-student/kind.yml +++ b/taskcluster/kinds/finetune-student/kind.yml @@ -61,6 +61,8 @@ tasks: worker-type: b-largegpu-xlargedisk worker: max-run-time: 2592000 + # train_taskcluster.py exits with 17 if a request to Taskcluster fails + retry-exit-status: [17] env: ARTIFACT_EXT: zst COMPRESSION_CMD: zstdmt diff --git a/taskcluster/kinds/train-backwards/kind.yml b/taskcluster/kinds/train-backwards/kind.yml index 947f0dcb4..0b94b0bf9 100644 --- a/taskcluster/kinds/train-backwards/kind.yml +++ b/taskcluster/kinds/train-backwards/kind.yml @@ -61,6 +61,8 @@ tasks: default: b-largegpu-largedisk worker: max-run-time: 2592000 + # train_taskcluster.py exits with 17 if a request to Taskcluster fails + retry-exit-status: [17] env: ARTIFACT_EXT: zst COMPRESSION_CMD: zstdmt diff --git a/taskcluster/kinds/train-student/kind.yml b/taskcluster/kinds/train-student/kind.yml index 4470ca7bc..e7ee47e3c 100644 --- a/taskcluster/kinds/train-student/kind.yml +++ b/taskcluster/kinds/train-student/kind.yml @@ -59,6 +59,8 @@ tasks: default: b-largegpu-xlargedisk worker: max-run-time: 2592000 + # train_taskcluster.py exits with 17 if a request to Taskcluster fails + retry-exit-status: [17] env: ARTIFACT_EXT: zst COMPRESSION_CMD: zstdmt diff --git a/taskcluster/kinds/train-teacher/kind.yml b/taskcluster/kinds/train-teacher/kind.yml index 4adb3c490..f05fd87a1 100644 --- a/taskcluster/kinds/train-teacher/kind.yml +++ b/taskcluster/kinds/train-teacher/kind.yml @@ -85,6 +85,8 @@ tasks: default: b-largegpu-xlargedisk worker: max-run-time: 2592000 + # train_taskcluster.py exits with 17 if a request to Taskcluster fails + retry-exit-status: [17] env: ARTIFACT_EXT: zst COMPRESSION_CMD: zstdmt diff --git a/taskcluster/scripts/pipeline/train_taskcluster.py b/taskcluster/scripts/pipeline/train_taskcluster.py index 921cce709..f2d3035ad 100755 --- a/taskcluster/scripts/pipeline/train_taskcluster.py +++ b/taskcluster/scripts/pipeline/train_taskcluster.py @@ -1,14 +1,125 @@ #!/usr/bin/env python3 +import logging +import os import os.path +import requests import subprocess import sys TRAINING_SCRIPT = os.path.join(os.path.dirname(__file__), "train-taskcluster.sh") +CONTINUATION_ARTIFACTS = { + "config.opustrainer.yml", + "config.opustrainer.yml.state", + "devset.out", + "model.npz", + "model.npz.best-bleu-detok.npz", + "model.npz.best-bleu-detok.npz.decoder.yml", + "model.npz.best-ce-mean-words.npz", + "model.npz.best-ce-mean-words.npz.decoder.yml", + "model.npz.best-chrf.npz", + "model.npz.best-chrf.npz.decoder.yml", + "model.npz.decoder.yml", + "model.npz.optimizer.npz", + "model.npz.progress.yml", + "model.npz.yml", + "opustrainer.log", + "train.log", + "valid.log", + "vocab.spm", +} + + +ARTIFACTS_URL = "{root_url}/api/queue/v1/task/{task_id}/runs/{run_id}/artifacts" +ARTIFACT_URL = "{root_url}/api/queue/v1/task/{task_id}/runs/{run_id}/artifacts/{artifact_name}" +# The argument number where pretrained model mode is expected. +# This is 1-indexed, not 0-indexed, so it should line up with the argument +# number this is fetched in in train-taskcluster.sh +PRETRAINED_MODEL_MODE_ARG_NUMBER = 12 +# Nothing special about 17...just a number plucked out of thin air that +# should be distinct enough to retry on. +DOWNLOAD_ERROR_EXIT_CODE = 17 def main(args): - subprocess.run([TRAINING_SCRIPT, *args], check=True) + logging.basicConfig(level=logging.INFO) + + script_args = list(args) + task_id = os.environ["TASK_ID"] + run_id = int(os.environ["RUN_ID"]) + root_url = os.environ["TASKCLUSTER_ROOT_URL"] + # Must line up with where model_dir is in `train-taskcluster.sh` while that script + # still exists. + model_dir = script_args[6] + pretrained_model_mode = None + if len(args) >= PRETRAINED_MODEL_MODE_ARG_NUMBER: + pretrained_model_mode = script_args[PRETRAINED_MODEL_MODE_ARG_NUMBER - 1] + + if not os.path.exists(model_dir): + os.makedirs(model_dir) + + if run_id > 0: + logging.info("run_id > 0, attempting to resume training from an earlier run...") + prev_run_id = run_id - 1 + + while prev_run_id >= 0: + try: + resp = requests.get( + ARTIFACTS_URL.format(root_url=root_url, task_id=task_id, run_id=prev_run_id) + ) + resp.raise_for_status() + except Exception: + logging.exception("Caught exception, exiting with distinct code...") + sys.exit(DOWNLOAD_ERROR_EXIT_CODE) + + run_artifacts = set([os.path.basename(a["name"]) for a in resp.json()["artifacts"]]) + + if run_artifacts.issuperset(CONTINUATION_ARTIFACTS): + logging.info( + f"Run {prev_run_id} appears to have the artifacts we need! Downloading them..." + ) + else: + logging.info(f"Run {prev_run_id} is missing some necessary artifacts...") + prev_run_id -= 1 + continue + + for artifact in resp.json()["artifacts"]: + # Skip Taskcluster logs - we only care about artifacts that the training tools create. + if artifact["name"].startswith("public/log"): + continue + out_name = os.path.basename(artifact["name"]) + logging.info(f"Fetching {artifact['name']}...") + + try: + r = requests.get( + ARTIFACT_URL.format( + root_url=root_url, + task_id=task_id, + run_id=prev_run_id, + artifact_name=artifact["name"], + ), + stream=True, + ) + r.raise_for_status() + except Exception: + logging.exception("Caught exception, exiting with distinct code...") + sys.exit(DOWNLOAD_ERROR_EXIT_CODE) + + with open(os.path.join(model_dir, out_name), "wb+") as fd: + for chunk in r.iter_content(chunk_size=8192): + fd.write(chunk) + + # We successfully downloaded all the artifacts from a previous run. Override + # the pretrained model mode and we're done! + pretrained_model_mode = "continue" + break + + if pretrained_model_mode: + if len(script_args) < PRETRAINED_MODEL_MODE_ARG_NUMBER: + script_args.append(pretrained_model_mode) + else: + script_args[PRETRAINED_MODEL_MODE_ARG_NUMBER - 1] = pretrained_model_mode + subprocess.run([TRAINING_SCRIPT, *script_args], check=True) if __name__ == "__main__": diff --git a/tests/test_train_taskcluster.py b/tests/test_train_taskcluster.py index 17426fa2e..71918b4ea 100644 --- a/tests/test_train_taskcluster.py +++ b/tests/test_train_taskcluster.py @@ -1,8 +1,12 @@ +import io +import json import os from unittest import mock import pytest +import requests import train_taskcluster +from fixtures import DataDir TRAIN_TASKCLUSTER_SH = os.path.normpath( os.path.join( @@ -76,8 +80,262 @@ ), ) def test_all_args_forwarded(args): - with mock.patch("train_taskcluster.subprocess") as mocked_subprocess: - train_taskcluster.main(args) - assert mocked_subprocess.run.call_args_list == [ - mock.call([TRAIN_TASKCLUSTER_SH] + args, check=True), - ] + with mock.patch.multiple( + train_taskcluster, subprocess=mock.DEFAULT, requests=mock.DEFAULT + ) as tt_mock: + with mock.patch.dict(os.environ) as mocked_env: + mocked_env["TASK_ID"] = "abcdef" + mocked_env["RUN_ID"] = "0" + mocked_env["TASKCLUSTER_ROOT_URL"] = "https://some.cluster" + train_taskcluster.main(args) + assert tt_mock["subprocess"].run.call_args_list == [ + mock.call([TRAIN_TASKCLUSTER_SH] + args, check=True), + ] + + +FULL_ARTIFACTS = [ + { + "storageType": "s3", + "name": f"public/build/{artifact}", + "expires": "2035-04-23T16:22:13.477Z", + "contentType": "application/x-yaml", + } + # Note: we are depending on something under to test for construction of parts of + # these tests. However, the value of duplicating this list into the tests instead + # is debatable, as what we're _really_ testing is the logic around them, not + # which ones are or are not required. It seems reasonable to keep a single source + # of truth for this. + for artifact in train_taskcluster.CONTINUATION_ARTIFACTS +] +FULL_ARTIFACTS.append( + { + "storageType": "s3", + "name": "public/logs/live.log", + "expires": "2035-04-23T16:22:13.477Z", + "contentType": "text/plain; charset=utf-8", + }, +) +PARTIAL_ARTIFACTS = [ + { + "storageType": "s3", + "name": "public/build/model.npz", + "expires": "2035-04-23T16:22:13.477Z", + "contentType": "application/x-yaml", + }, + { + "storageType": "s3", + "name": "public/logs/live.log", + "expires": "2035-04-23T16:22:13.477Z", + "contentType": "text/plain; charset=utf-8", + }, +] + + +@pytest.mark.parametrize( + "current_run_id,resumable_run_id,reasons_created,run_artifacts,orig_pretrained_model_mode,expected_pretrained_model_mode", + ( + pytest.param( + 0, + None, + ["scheduled"], + # not used unless resumable_run_id is set + {}, + "", + "", + id="run_0_no_continuation", + ), + pytest.param( + 0, + None, + ["scheduled"], + # not used unless resumable_run_id is set + {}, + "init", + "init", + id="run_0_no_continuation_with_pretrained_model", + ), + # TODO: add some cases that test that pretrained model mode is preserved when not doing + # autocontinuation + # and also that it's overridden when expected + pytest.param( + 1, + 0, + ["scheduled", "retry"], + {0: FULL_ARTIFACTS}, + "", + "continue", + id="run_1_continues_run_0", + ), + pytest.param( + 2, + 1, + ["scheduled", "retry", "retry"], + {1: FULL_ARTIFACTS}, + "", + "continue", + id="run_2_continues_run_1", + ), + pytest.param( + 2, + 0, + ["scheduled", "rerun", "retry"], + {1: PARTIAL_ARTIFACTS, 0: FULL_ARTIFACTS}, + "", + "continue", + id="run_2_continues_run_0", + ), + pytest.param( + 3, + 1, + ["scheduled", "rerun", "exception", "retry"], + {2: PARTIAL_ARTIFACTS, 1: FULL_ARTIFACTS, 0: PARTIAL_ARTIFACTS}, + "", + "continue", + id="run_3_continues_run_1", + ), + pytest.param( + 2, + None, + ["scheduled", "rerun", "exception"], + {1: PARTIAL_ARTIFACTS, 0: PARTIAL_ARTIFACTS}, + "", + "", + id="run_2_cant_continue_earlier_runs", + ), + pytest.param( + 2, + None, + ["scheduled", "retry", "rerun"], + {1: PARTIAL_ARTIFACTS, 0: PARTIAL_ARTIFACTS}, + "use", + "use", + id="run_2_cant_continue_earlier_runs_preserves_pretrained_model_mode", + ), + ), +) +def test_autocontinue( + current_run_id, + resumable_run_id, + reasons_created, + run_artifacts, + orig_pretrained_model_mode, + expected_pretrained_model_mode, +): + with mock.patch.multiple( + train_taskcluster, subprocess=mock.DEFAULT, requests=mock.DEFAULT + ) as tt_mock: + with mock.patch.dict(os.environ) as mocked_env: + # In production, these are set by the Taskcluster worker + mocked_env["TASK_ID"] = "abcdef" + mocked_env["RUN_ID"] = str(current_run_id) + mocked_env["TASKCLUSTER_ROOT_URL"] = "https://some.cluster" + + def fake_get(url, *args, **kwargs): + """Handles the expected requests to the Taskcluster API, and throws errors + for any requests that shouldn't have been made.""" + + resp = requests.Response() + if url.endswith("artifacts"): + resp.status_code = 200 + resp.headers = {"Content-Type": "application/json"} + run_id = int(url.split("/runs/", 1)[1].split("/")[0]) + resp._content = json.dumps( + { + "artifacts": run_artifacts[run_id], + } + ).encode("utf-8") + elif any( + [ + url.endswith(artifact) + for artifact in train_taskcluster.CONTINUATION_ARTIFACTS + ] + ): + # No action needed here; we will check that the right calls were + # made based on the current_run_id later. + resp.status_code = 200 + resp._content = b"" + resp.raw = io.StringIO("") + elif url.endswith("live.log") or url.endswith("live_backing.log"): + resp.status_code = 400 + resp._content = ( + f"train_taskcluster.py wrongly tried to download a task log: {url}" + ) + else: + resp.status_code = 400 + resp._content = f"train_taskcluster.py made a call to an unexpected URL: {url}" + + return resp + + tt_mock["requests"].get = mock.Mock(wraps=fake_get) + + model_dir = DataDir("test_train_taskcluster").path + train_taskcluster.main( + [ + "model-type", + "training-type", + "src", + "trg", + "train-set-prefix", + "valid-set-prefix", + model_dir, + "best-model-metric", + "alignents", + "seed", + "mode", + orig_pretrained_model_mode, + ] + ) + + # The calls we're expecting are different when we resume vs. when we don't. + # There is some overlap, but for clarity it's much simpler just to separate + # these cases into different branches, at the cost of a small amount of + # repetition. + if resumable_run_id is not None: + calls = [] + prev_run_id = current_run_id - 1 + while prev_run_id >= resumable_run_id: + # For each previous run until we reach a resumable one, we expect + # to fetch the artifact list. + calls.append( + mock.call( + f"https://some.cluster/api/queue/v1/task/abcdef/runs/{prev_run_id}/artifacts" + ) + ) + + # However, we only expect to fetch the artifacts for the run we resume from... + if prev_run_id == resumable_run_id: + for artifact in run_artifacts[prev_run_id]: + # ...but even then, we don't expect to download the Taskcluster logs. + if not artifact["name"].startswith("public/logs"): + calls.append( + mock.call( + f"https://some.cluster/api/queue/v1/task/abcdef/runs/{prev_run_id}/artifacts/{artifact['name']}", + stream=True, + ), + ) + prev_run_id = prev_run_id - 1 + + assert tt_mock["requests"].get.call_args_list == calls + else: + # We are not continuing training - if there are earlier runs we should just see + # calls to fetch a list of their artifacts. + calls = [] + prev_run_id = current_run_id - 1 + while prev_run_id >= 0: + calls.append( + mock.call( + f"https://some.cluster/api/queue/v1/task/abcdef/runs/{prev_run_id}/artifacts" + ) + ) + prev_run_id -= 1 + + assert tt_mock["requests"].get.call_args_list == calls + + assert tt_mock["subprocess"].run.call_count == 1 + # pretrained model mode is the 12th arg to the training script, but subprocess + # is also given the script name - so we look for the expected pretrained model mode + # in the 13th arg of the subprocess.run call + assert ( + tt_mock["subprocess"].run.call_args_list[0][0][0][12] + == expected_pretrained_model_mode + ) diff --git a/utils/preflight_check.py b/utils/preflight_check.py index fdcc19673..bff0c6d5f 100755 --- a/utils/preflight_check.py +++ b/utils/preflight_check.py @@ -56,6 +56,7 @@ def load_yml(filename: str) -> any: def get_taskgraph_parameters() -> Parameters: # These are required by taskgraph. os.environ["TASK_ID"] = "fake_id" + os.environ["RUN_ID"] = "0" os.environ["TASKCLUSTER_ROOT_URL"] = "https://firefox-ci-tc.services.mozilla.com" # Load taskcluster/config.yml