diff --git a/src/loaders/common/callback_server_wrapper.py b/src/loaders/common/callback_server_wrapper.py index 3ba1663a2..7431bb975 100644 --- a/src/loaders/common/callback_server_wrapper.py +++ b/src/loaders/common/callback_server_wrapper.py @@ -16,17 +16,36 @@ class Conf: """ Configuration class for the workspace downloader and workspace uploader scripts. + + Instance variables: + + token - a KBase token appropriate for the KBase environment + callback_url - the url of the callback service to contact + job_data_dir - the directory for SDK jobs per user + input_queue - queue for the workspace downloader tasks + output_dir - the directory for a specific workspace id under sourcedata/ws + retrieve_sample - whether to retrieve sample for each genome object + ignore_no_sample_error - whether to ignore the error when no sample data is found + ws - workspace client + asu - assemblyUtil client + ss - sampleService client + pools - a pool of worker processes + """ def __init__( - self, - job_dir: str, - output_dir: str, - worker_function: Callable, - kb_base_url: str = "https://ci.kbase.us/services/", - token_filepath: str | None = None, - workers: int = 5, - retrieve_sample: bool = False, - ignore_no_sample_error: bool = False, + self, + job_dir: str, + output_dir: str | None = None, + kb_base_url: str = "https://ci.kbase.us/services/", + token_filepath: str | None = None, + au_service_ver: str = "release", + workers: int = 5, + max_callback_server_tasks: int = 20, + worker_function: Callable | None = None, + retrieve_sample: bool = False, + ignore_no_sample_error: bool = False, + workspace_downloader: bool = False, + catalog_admin: bool = False, ) -> None: """ Initialize the configuration class. @@ -34,52 +53,79 @@ def __init__( Args: job_dir (str): The directory for SDK jobs per user. output_dir (str): The directory for a specific workspace id under sourcedata/ws. - worker_function (Callable): The function that will be called by the workers. kb_base_url (str): The base url of the KBase services. - token_filepath (str): The file path that stores a KBase token appropriate for the KBase environment. + token_filepath (str): The file path that stores a KBase token appropriate for the KBase environment. If not supplied, the token must be provided in the environment variable KB_AUTH_TOKEN. + The KB_ADMIN_AUTH_TOKEN environment variable will get set by this token if the user runs as catalog admin. + au_service_ver (str): The service version of AssemblyUtilClient + ('dev', 'beta', 'release', or a git commit). workers (int): The number of workers to use for multiprocessing. + max_callback_server_tasks (int): The maximum number of subtasks for the callback server. + worker_function (Callable): The function that will be called by the workers. retrieve_sample (bool): Whether to retrieve sample for each genome object. ignore_no_sample_error (bool): Whether to ignore the error when no sample data is found. + workspace_downloader (bool): Whether to be used for the workspace downloader script. + catalog_admin (bool): Whether to run the callback server as catalog admin. """ - port = loader_helper.find_free_port() - token = loader_helper.get_token(token_filepath) - self.retrieve_sample = retrieve_sample - self.ignore_no_sample_error = ignore_no_sample_error ipv4 = loader_helper.get_ip() + port = loader_helper.find_free_port() + + # common instance variables + + self.token = loader_helper.get_token(token_filepath) + + # setup and run callback server container self._start_callback_server( docker.from_env(), uuid.uuid4().hex, job_dir, kb_base_url, - token, + self.token, port, + max_callback_server_tasks, ipv4, + catalog_admin, ) - ws_url = os.path.join(kb_base_url, "ws") - sample_url = os.path.join(kb_base_url, "sampleservice") - callback_url = "http://" + ipv4 + ":" + str(port) - print("callback_url:", callback_url) - - self.ws = Workspace(ws_url, token=token) - self.asu = AssemblyUtil(callback_url, token=token) - self.ss = SampleService(sample_url, token=token) + self.callback_url = "http://" + ipv4 + ":" + str(port) + print("callback_url:", self.callback_url) - self.workers = workers - self.output_dir = output_dir - self.input_queue = Queue() - self.output_queue = Queue() self.job_data_dir = loader_helper.make_job_data_dir(job_dir) - self.pools = Pool(workers, worker_function, [self]) + + # unique to downloader + if workspace_downloader: + if worker_function is None: + raise ValueError( + "worker_function cannot be None for the workspace downloader script" + ) + + self.input_queue = Queue() + self.output_dir = output_dir + + self.retrieve_sample = retrieve_sample + self.ignore_no_sample_error = ignore_no_sample_error + + ws_url = os.path.join(kb_base_url, "ws") + self.ws = Workspace(ws_url, token=self.token) + + self.asu = AssemblyUtil( + self.callback_url, service_ver=au_service_ver, token=self.token + ) + + sample_url = os.path.join(kb_base_url, "sampleservice") + self.ss = SampleService(sample_url, token=self.token) + + self.pools = Pool(workers, worker_function, [self]) def _setup_callback_server_envs( - self, - job_dir: str, - kb_base_url: str, - token: str, - port: int, - ipv4: str, + self, + job_dir: str, + kb_base_url: str, + token: str, + port: int, + max_callback_server_tasks: int, + ipv4: str, + catalog_admin: bool, ) -> Tuple[dict[str, Union[int, str]], dict[str, dict[str, str]]]: """ Setup the environment variables and volumes for the callback server. @@ -89,6 +135,9 @@ def _setup_callback_server_envs( kb_base_url (str): The base url of the KBase services. token (str): The KBase token. port (int): The port number for the callback server. + max_callback_server_tasks (int): The maximum number of subtasks for the callback server. + ipv4 (str): The ipv4 address for the callback server. + catalog_admin (bool): Whether to run the callback server as catalog admin. Returns: tuple: A tuple of the environment variables and volumes for the callback server. @@ -102,9 +151,14 @@ def _setup_callback_server_envs( env["KB_BASE_URL"] = kb_base_url env["JOB_DIR"] = job_dir env["CALLBACK_PORT"] = port + env["JR_MAX_TASKS"] = max_callback_server_tasks env["CALLBACK_IP"] = ipv4 # specify an ipv4 address for the callback server # otherwise, the callback container will use the an ipv6 address + # set admin token to get catalog secure params + if catalog_admin: + env["KB_ADMIN_AUTH_TOKEN"] = token + # setup volumes required for docker container docker_host = os.environ["DOCKER_HOST"] if docker_host.startswith("unix:"): @@ -116,14 +170,16 @@ def _setup_callback_server_envs( return env, vol def _start_callback_server( - self, - client: docker.client, - container_name: str, - job_dir: str, - kb_base_url: str, - token: str, - port: int, - ipv4: str, + self, + client: docker.client, + container_name: str, + job_dir: str, + kb_base_url: str, + token: str, + port: int, + max_callback_server_tasks: int, + ipv4: str, + catalog_admin: bool, ) -> None: """ Start the callback server. @@ -134,9 +190,20 @@ def _start_callback_server( job_dir (str): The directory for SDK jobs per user. kb_base_url (str): The base url of the KBase services. token (str): The KBase token. + max_callback_server_tasks (int): The maximum number of subtasks for the callback server. port (int): The port number for the callback server. + ipv4 (str): The ipv4 address for the callback server. + catalog_admin (bool): Whether to run the callback server as catalog admin. """ - env, vol = self._setup_callback_server_envs(job_dir, kb_base_url, token, port, ipv4) + env, vol = self._setup_callback_server_envs( + job_dir, + kb_base_url, + token, + port, + max_callback_server_tasks, + ipv4, + catalog_admin, + ) self.container = client.containers.run( name=container_name, image=CALLBACK_IMAGE_NAME, @@ -147,9 +214,21 @@ def _start_callback_server( ) time.sleep(2) + def _get_container_logs(self) -> None: + """ + Get logs from the callback server container. + """ + logs = self.container.logs() + if logs: + print("\n****** Logs from the Callback Server ******\n") + logs = logs.decode("utf-8") + for line in logs.split("\n"): + print(line) + def stop_callback_server(self) -> None: """ Stop the callback server. """ + self._get_container_logs() self.container.stop() self.container.remove() diff --git a/src/loaders/common/loader_common_names.py b/src/loaders/common/loader_common_names.py index 8c1484fde..d8e1c17c3 100644 --- a/src/loaders/common/loader_common_names.py +++ b/src/loaders/common/loader_common_names.py @@ -129,6 +129,13 @@ 'APPDEV': 'https://appdev.kbase.us/services/', 'PROD': 'https://kbase.us/services/'} +# containers.conf path +CONTAINERS_CONF_PATH = "~/.config/containers/containers.conf" +# params in containers.conf file +CONTAINERS_CONF_PARAMS = { + "seccomp_profile": "\"unconfined\"", + "log_driver": "\"k8s-file\"" +} # field name for Kbase object metadata FLD_KB_OBJ_UPA = "upa" FLD_KB_OBJ_NAME = "name" diff --git a/src/loaders/common/loader_helper.py b/src/loaders/common/loader_helper.py index ecc2ff702..f3be1feac 100644 --- a/src/loaders/common/loader_helper.py +++ b/src/loaders/common/loader_helper.py @@ -1,4 +1,6 @@ import argparse +import configparser +import fcntl import itertools import json import os @@ -26,6 +28,8 @@ ) from src.loaders.common.loader_common_names import ( COLLECTION_SOURCE_DIR, + CONTAINERS_CONF_PARAMS, + CONTAINERS_CONF_PATH, DOCKER_HOST, FATAL_ERROR, FATAL_STACKTRACE, @@ -298,6 +302,45 @@ def start_podman_service(uid: int): return proc +def _get_containers_config(conf_path: str): + """Get containers.conf file at home directory.""" + config = configparser.ConfigParser() + config.read(conf_path) + return config + + +def is_config_modification_required(): + """check if the config requires modification.""" + conf_path = os.path.expanduser(CONTAINERS_CONF_PATH) + config = _get_containers_config(conf_path) + if not config.has_section("containers"): + return True + for key, val in CONTAINERS_CONF_PARAMS.items(): + if config.get("containers", key, fallback=None) != val: + return True + return False + + +def setup_callback_server_logs(): + """Set up containers.conf file for the callback server logs.""" + conf_path = os.path.expanduser(CONTAINERS_CONF_PATH) + with open(conf_path, "w") as writer: + try: + fcntl.flock(writer.fileno(), fcntl.LOCK_EX) + config = _get_containers_config(conf_path) + + if not config.has_section("containers"): + config.add_section("containers") + + for key, val in CONTAINERS_CONF_PARAMS.items(): + config.set("containers", key, val) + + config.write(writer) + print(f"containers.conf is modified and saved to path: {conf_path}") + finally: + fcntl.flock(writer.fileno(), fcntl.LOCK_UN) + + def is_upa_info_complete(upa_dir: str): """ Check whether an UPA needs to be downloaded or not by loading the metadata file. @@ -430,19 +473,30 @@ def create_hardlink_between_files(new_file, target_file): os.link(target_file, new_file) -def list_objects(wsid, conf, object_type, include_metadata=False, batch_size=10000): +def list_objects(wsid, ws, object_type, include_metadata=False, batch_size=10000): """ List all objects information given a workspace ID. + + Args: + wsid (int): Target workspace addressed by the permanent ID + ws (Workspace): Workspace client + object_type (str): Type of the objects to be listed + include_metadata (boolean): Whether to include the user provided metadata in the returned object_info + batch_size (int): Number of objects to process in each batch + + Returns: + list: a list of objects on the target workspace + """ if batch_size > 10000: raise ValueError("Maximum value for listing workspace objects is 10000") - maxObjectID = conf.ws.get_workspace_info({"id": wsid})[4] + maxObjectID = ws.get_workspace_info({"id": wsid})[4] batch_input = [ [idx + 1, idx + batch_size] for idx in range(0, maxObjectID, batch_size) ] objs = [ - conf.ws.list_objects( + ws.list_objects( _list_objects_params(wsid, min_id, max_id, object_type, include_metadata) ) for min_id, max_id in batch_input diff --git a/src/loaders/workspace_downloader/workspace_downloader.py b/src/loaders/workspace_downloader/workspace_downloader.py index c45d5db75..4ce670ae3 100644 --- a/src/loaders/workspace_downloader/workspace_downloader.py +++ b/src/loaders/workspace_downloader/workspace_downloader.py @@ -1,6 +1,8 @@ """ -usage: workspace_downloader.py [-h] --workspace_id WORKSPACE_ID [--kbase_collection KBASE_COLLECTION] [--source_version SOURCE_VERSION] [--root_dir ROOT_DIR] - [--env {CI,NEXT,APPDEV,PROD}] [--workers WORKERS] [--token_filepath TOKEN_FILEPATH] [--keep_job_dir] [--retrieve_sample] [--ignore_no_sample_error] +usage: workspace_downloader.py [-h] --workspace_id WORKSPACE_ID [--kbase_collection KBASE_COLLECTION] [--source_version SOURCE_VERSION] + [--root_dir ROOT_DIR] [--env {CI,NEXT,APPDEV,PROD}] [--workers WORKERS] [--token_filepath TOKEN_FILEPATH] + [--cbs_max_tasks CBS_MAX_TASKS] [--au_service_ver AU_SERVICE_VER] [--keep_job_dir] [--retrieve_sample] + [--ignore_no_sample_error] PROTOTYPE - Download genome files from the workspace service (WSS). @@ -21,7 +23,12 @@ KBase environment, defaulting to PROD (default: PROD) --workers WORKERS Number of workers for multiprocessing (default: 5) --token_filepath TOKEN_FILEPATH - A file path that stores KBase token + A file path that stores a KBase token appropriate for the KBase environment + If not provided, the token must be provided in the `KB_AUTH_TOKEN` environment variable + --cbs_max_tasks CBS_MAX_TASKS + The maximum number of subtasks for the callback server (default: 20) + --au_service_ver AU_SERVICE_VER + The service version of AssemblyUtil client('dev', 'beta', 'release', or a git commit) (default: release) --keep_job_dir Keep SDK job directory after download task is completed --retrieve_sample Retrieve sample for each genome object --ignore_no_sample_error @@ -382,7 +389,21 @@ def main(): optional.add_argument( "--token_filepath", type=str, - help="A file path that stores KBase token", + help="A file path that stores a KBase token appropriate for the KBase environment. " + "If not provided, the token must be provided in the `KB_AUTH_TOKEN` environment variable. " + ) + optional.add_argument( + "--cbs_max_tasks", + type=int, + default=20, + help="The maximum number of subtasks for the callback server", + ) + optional.add_argument( + "--au_service_ver", + type=str, + default="release", + help="The service version of AssemblyUtil client" + "('dev', 'beta', 'release', or a git commit)", ) optional.add_argument( "--keep_job_dir", @@ -409,6 +430,8 @@ def main(): env = args.env workers = args.workers token_filepath = args.token_filepath + cbs_max_tasks = args.cbs_max_tasks + au_service_ver = args.au_service_ver keep_job_dir = args.keep_job_dir retrieve_sample = args.retrieve_sample ignore_no_sample_error = args.ignore_no_sample_error @@ -447,19 +470,22 @@ def main(): # set up conf and start callback server conf = Conf( - job_dir, - output_dir, - _process_input, - kb_base_url, - token_filepath, - workers, - retrieve_sample, - ignore_no_sample_error, + job_dir=job_dir, + output_dir=output_dir, + kb_base_url=kb_base_url, + token_filepath=token_filepath, + au_service_ver=au_service_ver, + workers=workers, + max_callback_server_tasks=cbs_max_tasks, + worker_function=_process_input, + retrieve_sample=retrieve_sample, + ignore_no_sample_error=ignore_no_sample_error, + workspace_downloader=True, ) genome_objs = loader_helper.list_objects( workspace_id, - conf, + conf.ws, loader_common_names.OBJECTS_NAME_GENOME, include_metadata=True, ) @@ -470,7 +496,7 @@ def main(): assembly_objs = loader_helper.list_objects( workspace_id, - conf, + conf.ws, loader_common_names.OBJECTS_NAME_ASSEMBLY, include_metadata=True, ) diff --git a/src/loaders/workspace_uploader/workspace_uploader.py b/src/loaders/workspace_uploader/workspace_uploader.py index b76b84b61..65cd27b97 100644 --- a/src/loaders/workspace_uploader/workspace_uploader.py +++ b/src/loaders/workspace_uploader/workspace_uploader.py @@ -1,9 +1,13 @@ """ usage: workspace_uploader.py [-h] --workspace_id WORKSPACE_ID [--kbase_collection KBASE_COLLECTION] [--source_ver SOURCE_VER] - [--root_dir ROOT_DIR] [--token_filepath TOKEN_FILEPATH] [--env {CI,NEXT,APPDEV,PROD}] - [--upload_file_ext UPLOAD_FILE_EXT [UPLOAD_FILE_EXT ...]] [--workers WORKERS] [--keep_job_dir] + [--root_dir ROOT_DIR] [--load_id LOAD_ID] [--token_filepath TOKEN_FILEPATH] [--env {CI,NEXT,APPDEV,PROD}] + [--upload_file_ext UPLOAD_FILE_EXT [UPLOAD_FILE_EXT ...]] [--batch_size BATCH_SIZE] + [--cbs_max_tasks CBS_MAX_TASKS] [--au_service_ver AU_SERVICE_VER] [--keep_job_dir] [--as_catalog_admin] -PROTOTYPE - Upload assembly files to the workspace service (WSS). +PROTOTYPE - Upload assembly files to the workspace service (WSS). Note that the uploader determines whether a genome is already uploaded in +one of two ways. First it consults the *.yaml files in each genomes directory; if that file shows the genome has been uploaded it skips it +regardless of the current state of the workspace. Second, it checks that the most recent version of the genome object in the workspace, if it +exists, was part of the current load ID (see the load ID parameter description below). If so, the genome is skipped. options: -h, --help show this help message and exit @@ -22,15 +26,26 @@ optional arguments: --root_dir ROOT_DIR Root directory for the collections project. (default: /global/cfs/cdirs/kbase/collections) + --load_id LOAD_ID The load id of the objects being uploaded to a workspace. If not provided, a random load_id will be generated. For a + particular load, any restarts / resumes of the load should use the same load ID to prevent reuploading the same data. + A new load ID will make new versions of all the objects from the prior upload. --token_filepath TOKEN_FILEPATH - A file path that stores a KBase token appropriate for the KBase environment - If not provided, the token must be provided in the `KB_AUTH_TOKEN` environment variable + A file path that stores a KBase token appropriate for the KBase environment. + If not provided, the token must be provided in the `KB_AUTH_TOKEN` environment variable. --env {CI,NEXT,APPDEV,PROD} KBase environment (default: PROD) --upload_file_ext UPLOAD_FILE_EXT [UPLOAD_FILE_EXT ...] Upload only files that match given extensions (default: ['genomic.fna.gz']) - --workers WORKERS Number of workers for multiprocessing (default: 5) + --batch_size BATCH_SIZE + Number of files to upload per batch (default: 2500) + --cbs_max_tasks CBS_MAX_TASKS + The maximum number of subtasks for the callback server (default: 20) + --au_service_ver AU_SERVICE_VER + The service version of AssemblyUtil client('dev', 'beta', 'release', or a git commit) (default: release) --keep_job_dir Keep SDK job directory after upload task is completed + --as_catalog_admin True means the provided user token has catalog admin privileges and will be used to retrieve secure SDK app + parameters from the catalog. If false, the default, SDK apps run as part of this application will not have access to + catalog secure parameters. e.g. PYTHONPATH=. python src/loaders/workspace_uploader/workspace_uploader.py --workspace_id 69046 --kbase_collection GTDB --source_ver 207 --env CI --keep_job_dir @@ -48,30 +63,46 @@ e.g. /global/cfs/cdirs/kbase/collections/collectionssource/ -> ENV -> kbase_collection -> source_ver -> UPA -> .fna.gz file """ import argparse +import click import os import shutil import time +import uuid +from collections import namedtuple from datetime import datetime -from multiprocessing import cpu_count from pathlib import Path -from typing import Tuple +from typing import Generator import yaml +from src.clients.AssemblyUtilClient import AssemblyUtil +from src.clients.workspaceClient import Workspace from src.loaders.common import loader_common_names, loader_helper from src.loaders.common.callback_server_wrapper import Conf # setup KB_AUTH_TOKEN as env or provide a token_filepath in --token_filepath # export KB_AUTH_TOKEN="your-kb-auth-token" -UPLOAD_FILE_EXT = ["genomic.fna.gz"] # uplaod only files that match given extensions -JOB_DIR_IN_ASSEMBLYUTIL_CONTAINER = "/kb/module/work/tmp" -UPLOADED_YAML = "uploaded.yaml" +_UPLOAD_FILE_EXT = ["genomic.fna.gz"] # uplaod only files that match given extensions +_JOB_DIR_IN_ASSEMBLYUTIL_CONTAINER = "/kb/module/work/tmp" +_UPLOADED_YAML = "uploaded.yaml" +_WS_MAX_BATCH_SIZE = 10000 + +_AssemblyTuple = namedtuple( + "AssemblyTuple", + ["assembly_name", "host_assembly_dir", "container_internal_assembly_path"], +) def _get_parser(): parser = argparse.ArgumentParser( - description="PROTOTYPE - Upload assembly files to the workspace service (WSS).", + description="PROTOTYPE - Upload assembly files to the workspace service (WSS).\n\n" + "Note that the uploader determines whether a genome is already uploaded in one of two ways. " + "First it consults the *.yaml files in each genomes directory; if that file shows the genome " + "has been uploaded it skips it regardless of the current state of the workspace. " + "Second, it checks that the most recent version of the genome object in the workspace, " + "if it exists, was part of the current load ID (see the load ID parameter description below). " + "If so, the genome is skipped.", formatter_class=loader_helper.ExplicitDefaultsHelpFormatter, ) @@ -89,15 +120,15 @@ def _get_parser(): f"--{loader_common_names.KBASE_COLLECTION_ARG_NAME}", type=str, help="The name of the collection being processed. " - "Specifies where the files to be uploaded exist (in the default NONE environment) " - "and the name of the collection to be created in the specific KBase environment in the 'collectionsource' directory.", + "Specifies where the files to be uploaded exist (in the default NONE environment) " + "and the name of the collection to be created in the specific KBase environment in the 'collectionsource' directory.", ) required.add_argument( f"--{loader_common_names.SOURCE_VER_ARG_NAME}", type=str, help="The source version of the collection being processed. " - "Specifies where the files to be uploaded exist (in the default NONE environment) " - "and the source version of the collection to be created in the specific KBase environment in the 'collectionsource' directory.", + "Specifies where the files to be uploaded exist (in the default NONE environment) " + "and the source version of the collection to be created in the specific KBase environment in the 'collectionsource' directory.", ) # Optional argument @@ -105,13 +136,21 @@ def _get_parser(): f"--{loader_common_names.ROOT_DIR_ARG_NAME}", type=str, default=loader_common_names.ROOT_DIR, - help=loader_common_names.ROOT_DIR_DESCR + help=loader_common_names.ROOT_DIR_DESCR, + ) + optional.add_argument( + "--load_id", + type=str, + help="The load id of the objects being uploaded to a workspace. " + "If not provided, a random load_id will be generated. " + "For a particular load, any restarts / resumes of the load should use the same load ID to prevent reuploading the same data. " + "A new load ID will make new versions of all the objects from the prior upload.", ) optional.add_argument( "--token_filepath", type=str, help="A file path that stores a KBase token appropriate for the KBase environment. " - "If not provided, the token must be provided in the `KB_AUTH_TOKEN` environment variable.", + "If not provided, the token must be provided in the `KB_AUTH_TOKEN` environment variable.", ) optional.add_argument( "--env", @@ -124,20 +163,40 @@ def _get_parser(): "--upload_file_ext", type=str, nargs="+", - default=UPLOAD_FILE_EXT, + default=_UPLOAD_FILE_EXT, help="Upload only files that match given extensions", ) optional.add_argument( - "--workers", + "--batch_size", type=int, - default=5, - help="Number of workers for multiprocessing", + default=2500, + help="Number of files to upload per batch", + ) + optional.add_argument( + "--cbs_max_tasks", + type=int, + default=20, + help="The maximum number of subtasks for the callback server", + ) + optional.add_argument( + "--au_service_ver", + type=str, + default="release", + help="The service version of AssemblyUtil client" + "('dev', 'beta', 'release', or a git commit)", ) optional.add_argument( "--keep_job_dir", action="store_true", help="Keep SDK job directory after upload task is completed", ) + optional.add_argument( + "--as_catalog_admin", + action="store_true", + help="True means the provided user token has catalog admin privileges and will " + "be used to retrieve secure SDK app parameters from the catalog. If false, the default, " + "SDK apps run as part of this application will not have access to catalog secure parameters.", + ) return parser @@ -145,7 +204,7 @@ def _get_yaml_file_path(assembly_dir: str) -> str: """ Get the uploaded.yaml file path from collections source directory. """ - file_path = os.path.join(assembly_dir, UPLOADED_YAML) + file_path = os.path.join(assembly_dir, _UPLOADED_YAML) Path(file_path).touch(exist_ok=True) return file_path @@ -160,51 +219,56 @@ def _get_source_file(assembly_dir: str, assembly_file: str) -> str: return src_file -def _upload_assembly_to_workspace( - conf: Conf, +def _upload_assemblies_to_workspace( + asu: AssemblyUtil, workspace_id: int, - file_path: str, - assembly_name: str, -) -> str: - """Upload an assembly file to workspace.""" - success, attempts, max_attempts = False, 0, 3 - while attempts < max_attempts and not success: - try: - time.sleep(attempts) - assembly_ref = conf.asu.save_assembly_from_fasta2( - { - "file": {"path": file_path}, - "workspace_id": workspace_id, - "assembly_name": assembly_name, - } - ) - success = True - except Exception as e: - print(f"Error:\n{e}\nfrom attempt {attempts + 1}.\nTrying to rerun.") - attempts += 1 + load_id: int, + assembly_tuples: list[_AssemblyTuple], +) -> tuple[str, ...]: + """ + Upload assembly files to the target workspace in batch. The bulk method fails + and an error will be thrown if any of the assembly files in batch fails to upload. + Returns the list of workspace UPAs for the created objects in the same order as + `assembly_tuples`. + """ + inputs = [ + { + "file": assembly_tuple.container_internal_assembly_path, + "assembly_name": assembly_tuple.assembly_name, + "object_metadata": {"load_id": load_id}, + } + for assembly_tuple in assembly_tuples + ] - if not success: - raise ValueError( - f"Upload Failed for {file_path} after {max_attempts} attempts!" - ) + assembly_ref = asu.save_assemblies_from_fastas( + {"workspace_id": workspace_id, "inputs": inputs} + ) - upa = assembly_ref["upa"].replace("/", "_") - return upa + upas = tuple( + [ + result_dict["upa"].replace("/", "_") + for result_dict in assembly_ref["results"] + ] + ) + return upas def _read_upload_status_yaml_file( - upload_env_key: str, - workspace_id: int, - assembly_dir: str, - assembly_name: str, -) -> Tuple[dict[str, dict[int, list[str]]], bool]: + upload_env_key: str, + workspace_id: int, + load_id: str, + assembly_dir: str, + assembly_name: str, +) -> tuple[dict[str, dict[int, list[str]]], bool]: """ Get metadata and upload status of an assembly from the uploaded.yaml file. """ uploaded = False if upload_env_key not in loader_common_names.KB_ENV: - raise ValueError(f"Currently only support these {loader_common_names.KB_ENV} envs for upload") + raise ValueError( + f"Currently only support these {loader_common_names.KB_ENV} envs for upload" + ) file_path = _get_yaml_file_path(assembly_dir) @@ -217,40 +281,57 @@ def _read_upload_status_yaml_file( if workspace_id not in data[upload_env_key]: data[upload_env_key][workspace_id] = dict() - assembly_dict = data[upload_env_key][workspace_id] - if assembly_dict and assembly_dict["file_name"] == assembly_name: + workspace_dict = data[upload_env_key][workspace_id] + + if "file_name" not in workspace_dict: + workspace_dict["file_name"] = assembly_name + + if "loads" not in workspace_dict: + workspace_dict["loads"] = dict() + + if load_id in workspace_dict["loads"]: uploaded = True + return data, uploaded def _update_upload_status_yaml_file( - upload_env_key: str, - workspace_id: int, - upa: str, - assembly_dir: str, - assembly_name: str, + upload_env_key: str, + workspace_id: int, + load_id: str, + upa: str, + assembly_tuple: _AssemblyTuple, ) -> None: """ Update the uploaded.yaml file in target genome_dir with newly uploaded assembly names and upa info. """ - data, uploaded = _read_upload_status_yaml_file(upload_env_key, workspace_id, assembly_dir, assembly_name) + data, uploaded = _read_upload_status_yaml_file( + upload_env_key, + workspace_id, + load_id, + assembly_tuple.host_assembly_dir, + assembly_tuple.assembly_name, + ) if uploaded: - raise ValueError(f"Assembly {assembly_name} already exists in workspace {workspace_id}") + raise ValueError( + f"Assembly {assembly_tuple.assembly_name} already exists in workspace {workspace_id}" + ) - data[upload_env_key][workspace_id] = {"file_name": assembly_name, "upa": upa} + data[upload_env_key][workspace_id]["loads"][load_id] = {"upa": upa} - file_path = _get_yaml_file_path(assembly_dir) + file_path = _get_yaml_file_path(assembly_tuple.host_assembly_dir) with open(file_path, "w") as file: yaml.dump(data, file) def _fetch_assemblies_to_upload( - upload_env_key: str, - workspace_id: int, - collection_source_dir: str, - upload_file_ext: list[str], -) -> Tuple[int, dict[str, str]]: + upload_env_key: str, + workspace_id: int, + load_id: str, + collection_source_dir: str, + upload_file_ext: list[str], +) -> tuple[int, dict[str, str]]: count = 0 wait_to_upload_assemblies = dict() @@ -261,7 +342,6 @@ def _fetch_assemblies_to_upload( ] for assembly_dir in assembly_dirs: - assembly_file_list = [ f for f in os.listdir(assembly_dir) @@ -270,17 +350,26 @@ def _fetch_assemblies_to_upload( ] if len(assembly_file_list) != 1: - raise ValueError(f"One and only one assembly file that ends with {upload_file_ext} " - f"must be present in {assembly_dir} directory") + raise ValueError( + f"One and only one assembly file that ends with {upload_file_ext} " + f"must be present in {assembly_dir} directory" + ) count += 1 assembly_name = assembly_file_list[0] - _, uploaded = _read_upload_status_yaml_file(upload_env_key, workspace_id, assembly_dir, assembly_name) + _, uploaded = _read_upload_status_yaml_file( + upload_env_key, + workspace_id, + load_id, + assembly_dir, + assembly_name, + ) if uploaded: print( - f"Assembly {assembly_name} already exists in workspace {workspace_id}. Skipping." + f"Assembly {assembly_name} already exists in " + f"workspace {workspace_id} load {load_id}. Skipping." ) continue @@ -289,7 +378,54 @@ def _fetch_assemblies_to_upload( return count, wait_to_upload_assemblies -def _prepare_skd_job_dir_to_upload(conf: Conf, wait_to_upload_assemblies: dict[str, str]) -> str: +def _query_workspace_with_load_id( + ws: Workspace, + workspace_id: int, + load_id: str, + assembly_names: list[str], +) -> tuple[list[str], list[str]]: + if len(assembly_names) > _WS_MAX_BATCH_SIZE: + raise ValueError( + f"The effective max batch size must be <= {_WS_MAX_BATCH_SIZE}" + ) + refs = [{"wsid": workspace_id, "name": name} for name in assembly_names] + res = ws.get_object_info3( + {"objects": refs, "ignoreErrors": 1, "includeMetadata": 1} + ) + uploaded_obj_names_batch = [ + info[1] + for info in res["infos"] + if info is not None and "load_id" in info[10] and info[10]["load_id"] == load_id + ] + uploaded_obj_upas_batch = [ + path[0].replace("/", "_") for path in res["paths"] if path is not None + ] + return uploaded_obj_names_batch, uploaded_obj_upas_batch + + +def _query_workspace_with_load_id_mass( + ws: Workspace, + workspace_id: int, + load_id: str, + assembly_names: list[str], + batch_size: int = _WS_MAX_BATCH_SIZE, +) -> tuple[list[str], list[str]]: + uploaded_obj_names = [] + uploaded_obj_upas = [] + + for idx in range(0, len(assembly_names), batch_size): + obj_names_batch, obj_upas_batch = _query_workspace_with_load_id( + ws, workspace_id, load_id, assembly_names[idx : idx + batch_size] + ) + uploaded_obj_names.extend(obj_names_batch) + uploaded_obj_upas.extend(obj_upas_batch) + + return uploaded_obj_names, uploaded_obj_upas + + +def _prepare_skd_job_dir_to_upload( + conf: Conf, wait_to_upload_assemblies: dict[str, str] +) -> str: """ Prepare SDK job directory to upload. """ @@ -302,13 +438,13 @@ def _prepare_skd_job_dir_to_upload(conf: Conf, wait_to_upload_assemblies: dict[s def _post_process( - upload_env_key: str, - workspace_id: int, - host_assembly_dir: str, - assembly_name: str, - upload_dir: str, - output_dir: str, - upa: str, + upload_env_key: str, + workspace_id: int, + load_id: str, + assembly_tuple: _AssemblyTuple, + upload_dir: str, + output_dir: str, + upa: str, ) -> None: """ Create a standard entry in sourcedata/workspace for each assembly. @@ -318,129 +454,147 @@ def _post_process( """ # Create a standard entry in sourcedata/workspace # hardlink to the original assembly file in sourcedata - src_file = _get_source_file(host_assembly_dir, assembly_name) + src_file = _get_source_file( + assembly_tuple.host_assembly_dir, + assembly_tuple.assembly_name, + ) target_dir = os.path.join(output_dir, upa) os.makedirs(target_dir, exist_ok=True) dest_file = os.path.join(target_dir, f"{upa}.fna.gz") loader_helper.create_hardlink_between_files(dest_file, src_file) # Update the uploaded.yaml file - _update_upload_status_yaml_file(upload_env_key, workspace_id, upa, host_assembly_dir, assembly_name) + _update_upload_status_yaml_file( + upload_env_key, + workspace_id, + load_id, + upa, + assembly_tuple, + ) # Creates a softlink from new_dir to the contents of upa_dir. new_dir = os.path.join(upload_dir, upa) loader_helper.create_softlink_between_dirs(new_dir, target_dir) -def _process_input(conf: Conf) -> None: - """ - Process input from input_queue and put the result in output_queue. - """ - while True: - task = conf.input_queue.get(block=True) - if not task: - print("Stopping") - break - - upa = None - ( - upload_env_key, - workspace_id, - container_internal_assembly_path, - host_assembly_dir, - assembly_name, - upload_dir, - counter, - assembly_files_len, - ) = task - - try: - upa = _upload_assembly_to_workspace( - conf, workspace_id, container_internal_assembly_path, assembly_name - ) - _post_process( - upload_env_key, - workspace_id, - host_assembly_dir, - assembly_name, - upload_dir, - conf.output_dir, - upa - ) - except Exception as e: - print(f"Failed assembly name: {assembly_name}. Exception:") - print(e) - - conf.output_queue.put((assembly_name, upa)) - - if counter % 3000 == 0: - print(f"Assemblies processed: {counter}/{assembly_files_len}, " - f"Percentage: {counter / assembly_files_len * 100:.2f}%, " - f"Time: {datetime.now()}") - - def _upload_assembly_files_in_parallel( - conf: Conf, - upload_env_key: str, - workspace_id: int, - upload_dir: str, - wait_to_upload_assemblies: dict[str, str], + asu: AssemblyUtil, + ws: Workspace, + upload_env_key: str, + workspace_id: int, + load_id: str, + upload_dir: str, + wait_to_upload_assemblies: dict[str, str], + batch_size: int, + output_dir: str, ) -> list[str]: """ Upload assembly files to the target workspace in parallel using multiprocessing. Parameters: - conf: Conf object + asu: AssemblyUtil client + ws: Workspace client upload_env_key: environment variable key in uploaded.yaml file workspace_id: target workspace id + load_id: load id upload_dir: a directory in collectionssource that creates new directories linking to sourcedata wait_to_upload_assemblies: a dictionary that maps assembly file name to assembly directory + batch_size: a number of files to upload per batch + output_dir: a directory in sourcedata/workspace to store new assembly entries Returns: - a list of assembly names that failed to upload + number of assembly files have been sucessfully uploaded from wait_to_upload_assemblies """ assembly_files_len = len(wait_to_upload_assemblies) - print(f"Start uploading {assembly_files_len} assembly files with {conf.workers} workers\n") + print(f"Start uploading {assembly_files_len} assembly files\n") - # Put the assembly files in the input_queue - counter = 1 - for assembly_name, host_assembly_dir in wait_to_upload_assemblies.items(): + uploaded_count = 0 + uploaded_fail = False + for assembly_tuples in _gen(wait_to_upload_assemblies, batch_size): + batch_upas = tuple() + batch_uploaded_tuples = [] - container_internal_assembly_path = os.path.join( - JOB_DIR_IN_ASSEMBLYUTIL_CONTAINER, assembly_name - ) - conf.input_queue.put( - ( + try: + batch_upas = _upload_assemblies_to_workspace( + asu, workspace_id, load_id, assembly_tuples + ) + batch_uploaded_tuples = assembly_tuples + except Exception as e: + print(e) + uploaded_fail = True + + try: + # figure out uploads that succeeded + name2tuple = { + assembly_tuple.assembly_name: assembly_tuple + for assembly_tuple in assembly_tuples + } + ( + uploaded_obj_names, + uploaded_obj_upas, + ) = _query_workspace_with_load_id_mass( + ws, workspace_id, load_id, list(name2tuple.keys()) + ) + + batch_upas = tuple(uploaded_obj_upas) + batch_uploaded_tuples = [ + name2tuple[name] for name in uploaded_obj_names + ] + + except Exception as e: + print( + f"WARNING: There are inconsistencies between " + f"the workspace and the yaml files as the result of {e}\n" + f"Run the script again to attempt resolution." + ) + + # post process on sucessful uploads + for assembly_tuple, upa in zip(batch_uploaded_tuples, batch_upas): + _post_process( upload_env_key, workspace_id, - container_internal_assembly_path, - host_assembly_dir, - assembly_name, + load_id, + assembly_tuple, upload_dir, - counter, - assembly_files_len, + output_dir, + upa, ) - ) - if counter % 5000 == 0: - print(f"Jobs added to the queue: {counter}/{assembly_files_len}, " - f"Percentage: {counter / assembly_files_len * 100:.2f}%, " - f"Time: {datetime.now()}") + uploaded_count += len(batch_uploaded_tuples) + if uploaded_count % 100 == 0: + print( + f"Assemblies uploaded: {uploaded_count}/{assembly_files_len}, " + f"Percentage: {uploaded_count / assembly_files_len * 100:.2f}%, " + f"Time: {datetime.now()}" + ) - counter += 1 + if uploaded_fail: + return uploaded_count - # Signal the workers to terminate when they finish uploading assembly files - for _ in range(conf.workers): - conf.input_queue.put(None) + return uploaded_count - results = [conf.output_queue.get() for _ in range(assembly_files_len)] - failed_names = [assembly_name for assembly_name, upa in results if upa is None] - # Close and join the processes - conf.pools.close() - conf.pools.join() +def _dict2tuple_list(assemblies_dict: dict[str, str]) -> list[_AssemblyTuple]: + assemblyTuple_list = [ + _AssemblyTuple( + i[0], i[1], os.path.join(_JOB_DIR_IN_ASSEMBLYUTIL_CONTAINER, i[0]) + ) + for i in assemblies_dict.items() + ] + return assemblyTuple_list + - return failed_names +def _gen( + wait_to_upload_assemblies: dict[str, str], + batch_size: int, +) -> Generator[list[_AssemblyTuple], None, None]: + """ + Generator function to yield the assembly files to upload. + """ + assemblyTuple_list = _dict2tuple_list(wait_to_upload_assemblies) + # yield AssemblyTuples in batch + for idx in range(0, len(wait_to_upload_assemblies), batch_size): + yield assemblyTuple_list[idx : idx + batch_size] def main(): @@ -453,16 +607,29 @@ def main(): root_dir = getattr(args, loader_common_names.ROOT_DIR_ARG_NAME) token_filepath = args.token_filepath upload_file_ext = args.upload_file_ext - workers = args.workers + batch_size = args.batch_size + cbs_max_tasks = args.cbs_max_tasks + au_service_ver = args.au_service_ver keep_job_dir = args.keep_job_dir + catalog_admin = args.as_catalog_admin + load_id = args.load_id + if not load_id: + print("load_id is not provided. Generating a load_id ...") + load_id = uuid.uuid4().hex + print( + f"load_id is {load_id}.\n" + f"Please keep using this load version until the load is complete!" + ) env = args.env kb_base_url = loader_common_names.KB_BASE_URL_MAP[env] if workspace_id <= 0: parser.error(f"workspace_id needs to be > 0") - if workers < 1 or workers > cpu_count(): - parser.error(f"minimum worker is 1 and maximum worker is {cpu_count()}") + if batch_size <= 0: + parser.error(f"batch_size needs to be > 0") + if cbs_max_tasks <= 0: + parser.error(f"cbs_max_tasks needs to be > 0") uid = os.getuid() username = os.getlogin() @@ -471,35 +638,81 @@ def main(): collection_source_dir = loader_helper.make_collection_source_dir( root_dir, loader_common_names.DEFAULT_ENV, kbase_collection, source_version ) - upload_dir = loader_helper.make_collection_source_dir(root_dir, env, kbase_collection, source_version) + upload_dir = loader_helper.make_collection_source_dir( + root_dir, env, kbase_collection, source_version + ) output_dir = loader_helper.make_sourcedata_ws_dir(root_dir, env, workspace_id) proc = None conf = None try: + # setup container.conf file for the callback server logs if needed + if loader_helper.is_config_modification_required(): + if click.confirm( + f"The config file at {loader_common_names.CONTAINERS_CONF_PATH}\n" + f"needs to be modified to allow for container logging.\n" + f"Params 'seccomp_profile' and 'log_driver' will be added/updated under section [containers]. Do so now?\n" + ): + loader_helper.setup_callback_server_logs() + else: + print("Permission denied and exiting ...") + return + # start podman service proc = loader_helper.start_podman_service(uid) - # set up conf, start callback server, and upload assemblies to workspace + # set up conf for uploader, start callback server, and upload assemblies to workspace conf = Conf( - job_dir, - output_dir, - _process_input, - kb_base_url, - token_filepath, - workers, + job_dir=job_dir, + kb_base_url=kb_base_url, + token_filepath=token_filepath, + max_callback_server_tasks=cbs_max_tasks, + catalog_admin=catalog_admin, ) count, wait_to_upload_assemblies = _fetch_assemblies_to_upload( - env, - workspace_id, + env, + workspace_id, + load_id, collection_source_dir, upload_file_ext, ) + # set up workspace client + ws_url = os.path.join(kb_base_url, "ws") + ws = Workspace(ws_url, token=conf.token) + + uploaded_obj_names, uploaded_obj_upas = _query_workspace_with_load_id_mass( + ws, workspace_id, load_id, list(wait_to_upload_assemblies.keys()) + ) + # fix inconsistencies between the workspace and the local yaml files + if uploaded_obj_names: + print("Start failure recovery process ...") + wait_to_update_assemblies = { + assembly_name: wait_to_upload_assemblies[assembly_name] + for assembly_name in uploaded_obj_names + } + uploaded_tuples = _dict2tuple_list(wait_to_update_assemblies) + for assembly_tuple, upa in zip(uploaded_tuples, uploaded_obj_upas): + _post_process( + env, + workspace_id, + load_id, + assembly_tuple, + upload_dir, + output_dir, + upa, + ) + # remove assemblies that are already uploaded + for assembly_name in uploaded_obj_names: + wait_to_upload_assemblies.pop(assembly_name) + print("Recovery process completed ...") + if not wait_to_upload_assemblies: - print(f"All {count} assembly files already exist in workspace {workspace_id}") + print( + f"All {count} assembly files already exist in workspace {workspace_id}" + ) return wtus_len = len(wait_to_upload_assemblies) @@ -507,26 +720,33 @@ def main(): print(f"Detected {count - wtus_len} assembly files already exist in workspace") data_dir = _prepare_skd_job_dir_to_upload(conf, wait_to_upload_assemblies) - print(f"{wtus_len} assemblies in {data_dir} are ready to upload to workspace {workspace_id}") + print( + f"{wtus_len} assemblies in {data_dir} are ready to upload to workspace {workspace_id}" + ) + asu = AssemblyUtil( + conf.callback_url, service_ver=au_service_ver, token=conf.token + ) start = time.time() - failed_names = _upload_assembly_files_in_parallel( - conf, + uploaded_count = _upload_assembly_files_in_parallel( + asu, + ws, env, workspace_id, + load_id, upload_dir, wait_to_upload_assemblies, + batch_size, + output_dir, ) - - assembly_count = wtus_len - len(failed_names) - upload_time = (time.time() - start) / 60 - assy_per_min = assembly_count / upload_time - print(f"\n{workers} workers took {upload_time:.2f} minutes to upload {assembly_count} assemblies, " - f"averaging {assy_per_min:.2f} assemblies per minute") + upload_time = (time.time() - start) / 60 + assy_per_min = uploaded_count / upload_time - if failed_names: - raise ValueError(f"\nFailed to upload {failed_names}") + print( + f"\ntook {upload_time:.2f} minutes to upload {uploaded_count} assemblies, " + f"averaging {assy_per_min:.2f} assemblies per minute" + ) finally: # stop callback server if it is on diff --git a/test/src/loaders/workspace_uploader/workspace_uploader_test.py b/test/src/loaders/workspace_uploader/workspace_uploader_test.py index f63452d09..ce5534ea7 100644 --- a/test/src/loaders/workspace_uploader/workspace_uploader_test.py +++ b/test/src/loaders/workspace_uploader/workspace_uploader_test.py @@ -1,7 +1,6 @@ import os import shutil import uuid -from multiprocessing import Queue from pathlib import Path from typing import NamedTuple from unittest.mock import Mock, create_autospec @@ -9,8 +8,8 @@ import pytest from src.clients.AssemblyUtilClient import AssemblyUtil +from src.clients.workspaceClient import Workspace from src.loaders.common import loader_helper -from src.loaders.common.callback_server_wrapper import Conf from src.loaders.workspace_uploader import workspace_uploader ASSEMBLY_DIR_NAMES = ["GCF_000979855.1", "GCF_000979175.1"] @@ -65,7 +64,7 @@ def test_get_yaml_file_path(setup_and_teardown): assembly_dir = params.assembly_dirs[0] yaml_path = workspace_uploader._get_yaml_file_path(assembly_dir) - expected_yaml_path = os.path.join(assembly_dir, workspace_uploader.UPLOADED_YAML) + expected_yaml_path = os.path.join(assembly_dir, workspace_uploader._UPLOADED_YAML) assert expected_yaml_path == yaml_path assert os.path.exists(yaml_path) @@ -86,11 +85,11 @@ def test_read_upload_status_yaml_file(setup_and_teardown): # test empty yaml file in assembly_dir data, uploaded = workspace_uploader._read_upload_status_yaml_file( - "CI", 12345, assembly_dir, assembly_name + "CI", 12345, "214", assembly_dir, assembly_name ) - - expected_data = {"CI": {12345: dict()}} - + expected_data = { + "CI": {12345: {"file_name": assembly_name, "loads": {}}} + } assert not uploaded assert expected_data == data @@ -99,22 +98,27 @@ def test_update_upload_status_yaml_file(setup_and_teardown): params = setup_and_teardown assembly_dir = params.assembly_dirs[0] assembly_name = ASSEMBLY_NAMES[0] + assembly_tuple = workspace_uploader._AssemblyTuple( + assembly_name, assembly_dir, "/path/to/file/in/AssembilyUtil" + ) workspace_uploader._update_upload_status_yaml_file( - "CI", 12345, "12345_58_1", assembly_dir, assembly_name + "CI", 12345, "214", "12345_58_1", assembly_tuple ) data, uploaded = workspace_uploader._read_upload_status_yaml_file( - "CI", 12345, assembly_dir, assembly_name + "CI", 12345, "214", assembly_dir, assembly_name ) - expected_data = {"CI": {12345: {"file_name": assembly_name, "upa": "12345_58_1"}}} + expected_data = { + "CI": {12345: {"file_name": assembly_name, "loads": {"214": {"upa": "12345_58_1"}}}} + } assert uploaded assert expected_data == data with pytest.raises(ValueError, match=f"already exists in workspace"): workspace_uploader._update_upload_status_yaml_file( - "CI", 12345, "12345_58_1", assembly_dir, assembly_name + "CI", 12345, "214", "12345_58_1", assembly_tuple ) @@ -126,8 +130,9 @@ def test_fetch_assemblies_to_upload(setup_and_teardown): count, wait_to_upload_assemblies = workspace_uploader._fetch_assemblies_to_upload( "CI", 12345, + "214", collection_source_dir, - workspace_uploader.UPLOAD_FILE_EXT, + workspace_uploader._UPLOAD_FILE_EXT, ) expected_count = len(ASSEMBLY_NAMES) @@ -144,8 +149,11 @@ def test_fetch_assemblies_to_upload(setup_and_teardown): # Both assemnly files will be skipped in the next fetch_assemblies_to_upload call upas = ["12345_58_1", "12345_58_2"] for assembly_name, assembly_dir, upa in zip(ASSEMBLY_NAMES, assembly_dirs, upas): + assembly_tuple = workspace_uploader._AssemblyTuple( + assembly_name, assembly_dir, "/path/to/file/in/AssembilyUtil" + ) workspace_uploader._update_upload_status_yaml_file( - "CI", 12345, upa, assembly_dir, assembly_name + "CI", 12345, "214", upa, assembly_tuple ) ( @@ -154,8 +162,9 @@ def test_fetch_assemblies_to_upload(setup_and_teardown): ) = workspace_uploader._fetch_assemblies_to_upload( "CI", 12345, + "214", collection_source_dir, - workspace_uploader.UPLOAD_FILE_EXT, + workspace_uploader._UPLOAD_FILE_EXT, ) assert expected_count == new_count @@ -192,21 +201,26 @@ def test_post_process(setup_and_teardown): host_assembly_dir = params.assembly_dirs[0] assembly_name = ASSEMBLY_NAMES[0] src_file = params.target_files[0] + assembly_tuple = workspace_uploader._AssemblyTuple( + assembly_name, host_assembly_dir, "/path/to/file/in/AssembilyUtil" + ) workspace_uploader._post_process( "CI", 88888, - host_assembly_dir, - assembly_name, + "214", + assembly_tuple, upload_dir, output_dir, "12345_58_1", ) data, uploaded = workspace_uploader._read_upload_status_yaml_file( - "CI", 88888, host_assembly_dir, assembly_name + "CI", 88888, "214", host_assembly_dir, assembly_name ) - expected_data = {"CI": {88888: {"file_name": assembly_name, "upa": "12345_58_1"}}} + expected_data = { + "CI": {88888: {"file_name": assembly_name, "loads": {"214": {"upa": "12345_58_1"}}}} + } dest_file = os.path.join( os.path.join(output_dir, "12345_58_1"), f"12345_58_1.fna.gz" @@ -223,78 +237,295 @@ def test_post_process(setup_and_teardown): def test_upload_assembly_to_workspace(setup_and_teardown): - _ = setup_and_teardown + params = setup_and_teardown assembly_name = ASSEMBLY_NAMES[0] + host_assembly_dir = params.assembly_dirs[0] - conf = Mock() - conf.asu = create_autospec(AssemblyUtil, spec_set=True, instance=True) - conf.asu.save_assembly_from_fasta2.return_value = {"upa": "12345/58/1"} - upa = workspace_uploader._upload_assembly_to_workspace( - conf, 12345, "/path/to/file/in/AssembilyUtil", assembly_name + asu = create_autospec(AssemblyUtil, spec_set=True, instance=True) + asu.save_assemblies_from_fastas.return_value = {"results":[{"upa": "12345/58/1"}]} + assembly_tuple = workspace_uploader._AssemblyTuple( + assembly_name, host_assembly_dir, "/path/to/file/in/AssembilyUtil" ) - - assert upa == "12345_58_1" - conf.asu.save_assembly_from_fasta2.assert_called_once_with( + upas = workspace_uploader._upload_assemblies_to_workspace( + asu, 12345, "214", [assembly_tuple] + ) + assert upas == tuple(["12345_58_1"]) + asu.save_assemblies_from_fastas.assert_called_once_with( { - "file": {"path": "/path/to/file/in/AssembilyUtil"}, "workspace_id": 12345, - "assembly_name": assembly_name, + "inputs": [ + { + "file": assembly_tuple.container_internal_assembly_path, + "assembly_name": assembly_tuple.assembly_name, + "object_metadata": {"load_id": "214"}, + } + ] } ) -def test_assembly_files_in_parallel(setup_and_teardown): +def test_generator(setup_and_teardown): + params = setup_and_teardown + assembly_dirs = params.assembly_dirs + wait_to_upload_assemblies = { + assembly_name: assembly_dir + for assembly_name, assembly_dir in zip(ASSEMBLY_NAMES, assembly_dirs) + } + assemblyTuple_list = list(workspace_uploader._gen(wait_to_upload_assemblies, 1)) + expected_assemblyTuple_list = [ + [ + workspace_uploader._AssemblyTuple( + "GCF_000979855.1_gtlEnvA5udCFS_genomic.fna.gz", + assembly_dirs[0], + "/kb/module/work/tmp/GCF_000979855.1_gtlEnvA5udCFS_genomic.fna.gz", + ) + ], + [ + workspace_uploader._AssemblyTuple( + "GCF_000979175.1_gtlEnvA5udCFS_genomic.fna.gz", + assembly_dirs[1], + "/kb/module/work/tmp/GCF_000979175.1_gtlEnvA5udCFS_genomic.fna.gz", + ) + ], + ] + assert assemblyTuple_list == expected_assemblyTuple_list + + +def test_upload_assembly_files_in_parallel(setup_and_teardown): params = setup_and_teardown + src_files = params.target_files + assembly_dirs = params.assembly_dirs upload_dir = Path(params.tmp_dir) / "upload_dir" upload_dir.mkdir() - assembly_dirs = params.assembly_dirs + output_dir = Path(params.tmp_dir) / "output_dir" + output_dir.mkdir() wait_to_upload_assemblies = { assembly_name: assembly_dir for assembly_name, assembly_dir in zip(ASSEMBLY_NAMES, assembly_dirs) } - conf = Mock() - conf.workers = 5 - conf.input_queue = Queue() - conf.output_queue = Queue() - - # an uploaded successful - conf.output_queue.put((ASSEMBLY_NAMES[0], "12345_58_1")) - # an upload failed - conf.output_queue.put((ASSEMBLY_NAMES[1], None)) - - failed_names = workspace_uploader._upload_assembly_files_in_parallel( - conf, "CI", 12345, upload_dir, wait_to_upload_assemblies - ) + # ws.get_object_info3() is unused in this test case + ws = create_autospec(Workspace, spec_set=True, instance=True) + asu = create_autospec(AssemblyUtil, spec_set=True, instance=True) + asu.save_assemblies_from_fastas.return_value = { + "results": [ + {"upa": "12345/58/1"}, + {"upa": "12345/60/1"} + ] + } - expected_tuple1 = ( + uploaded_count = workspace_uploader._upload_assembly_files_in_parallel( + asu, + ws, "CI", 12345, - os.path.join( - workspace_uploader.JOB_DIR_IN_ASSEMBLYUTIL_CONTAINER, ASSEMBLY_NAMES[0] - ), - assembly_dirs[0], - ASSEMBLY_NAMES[0], + "214", upload_dir, - 1, - len(ASSEMBLY_NAMES), + wait_to_upload_assemblies, + 2, + output_dir, + ) + + assert uploaded_count == 2 + + # assert that no interactions occurred with ws + ws.get_object_info3.assert_not_called() + + # assert that asu was called correctly + asu.save_assemblies_from_fastas.assert_called_once_with( + { + "workspace_id": 12345, + "inputs": [ + { + "file": "/kb/module/work/tmp/GCF_000979855.1_gtlEnvA5udCFS_genomic.fna.gz", + "assembly_name": "GCF_000979855.1_gtlEnvA5udCFS_genomic.fna.gz", + "object_metadata": {"load_id": "214"}, + }, + { + "file": "/kb/module/work/tmp/GCF_000979175.1_gtlEnvA5udCFS_genomic.fna.gz", + "assembly_name": "GCF_000979175.1_gtlEnvA5udCFS_genomic.fna.gz", + "object_metadata": {"load_id": "214"}, + } + ] + } ) - expected_tuple2 = ( + # check softlink for post_process + assert os.readlink(os.path.join(upload_dir, "12345_58_1")) == os.path.join( + output_dir, "12345_58_1" + ) + assert os.readlink(os.path.join(upload_dir, "12345_60_1")) == os.path.join( + output_dir, "12345_60_1" + ) + + # check hardlink for post_process + assert os.path.samefile( + src_files[0], + os.path.join(output_dir, "12345_58_1", "12345_58_1.fna.gz") + ) + + assert os.path.samefile( + src_files[1], + os.path.join(output_dir, "12345_60_1", "12345_60_1.fna.gz") + ) + + +def test_fail_upload_assembly_files_in_parallel(setup_and_teardown): + params = setup_and_teardown + assembly_dirs = params.assembly_dirs + upload_dir = Path(params.tmp_dir) / "upload_dir" + upload_dir.mkdir() + output_dir = Path(params.tmp_dir) / "output_dir" + output_dir.mkdir() + + wait_to_upload_assemblies = { + assembly_name: assembly_dir + for assembly_name, assembly_dir in zip(ASSEMBLY_NAMES, assembly_dirs) + } + + ws = create_autospec(Workspace, spec_set=True, instance=True) + asu = create_autospec(AssemblyUtil, spec_set=True, instance=True) + asu.save_assemblies_from_fastas.side_effect = Exception("Illegal character in object name") + ws.get_object_info3.return_value = { + 'infos': [None, None], 'paths': [None, None] + } + + uploaded_count = workspace_uploader._upload_assembly_files_in_parallel( + asu, + ws, "CI", 12345, - os.path.join( - workspace_uploader.JOB_DIR_IN_ASSEMBLYUTIL_CONTAINER, ASSEMBLY_NAMES[1] - ), - assembly_dirs[1], - ASSEMBLY_NAMES[1], + "214", upload_dir, + wait_to_upload_assemblies, 2, - len(ASSEMBLY_NAMES), + output_dir, + ) + + assert uploaded_count == 0 + + # assert that asu was called correctly + asu.save_assemblies_from_fastas.assert_called_once_with( + { + "workspace_id": 12345, + "inputs": [ + { + "file": "/kb/module/work/tmp/GCF_000979855.1_gtlEnvA5udCFS_genomic.fna.gz", + "assembly_name": "GCF_000979855.1_gtlEnvA5udCFS_genomic.fna.gz", + "object_metadata": {"load_id": "214"}, + }, + { + "file": "/kb/module/work/tmp/GCF_000979175.1_gtlEnvA5udCFS_genomic.fna.gz", + "assembly_name": "GCF_000979175.1_gtlEnvA5udCFS_genomic.fna.gz", + "object_metadata": {"load_id": "214"}, + } + ] + } + ) + + # assert that ws was called correctly + ws.get_object_info3.assert_called_once_with( + { + "objects": [ + {"wsid": 12345, "name": "GCF_000979855.1_gtlEnvA5udCFS_genomic.fna.gz"}, + {"wsid": 12345, "name": "GCF_000979175.1_gtlEnvA5udCFS_genomic.fna.gz"} + ], + "ignoreErrors": 1, + "includeMetadata": 1 + } ) - assert conf.input_queue.get() == expected_tuple1 - assert conf.input_queue.get() == expected_tuple2 - assert conf.output_queue.empty() - assert failed_names == [ASSEMBLY_NAMES[1]] + +def test_fail_query_workspace_with_load_id_mass(setup_and_teardown): + ws = create_autospec(Workspace, spec_set=True, instance=True) + with pytest.raises( + Exception, match="The effective max batch size must be <= 10000" + ): + workspace_uploader._query_workspace_with_load_id_mass( + ws, + 12345, + "214", + [str(num) for num in range(100001)], + 10001, + ) + + # assert that no interactions occurred with ws + ws.get_object_info3.assert_not_called() + + +def test_query_workspace_with_load_id_mass(setup_and_teardown): + # happy test + ws = create_autospec(Workspace, spec_set=True, instance=True) + ws.get_object_info3.return_value = { + 'infos': [ + [ + 1086, + 'GCF_000980105.1_gtlEnvA5udCFS_genomic.fna.gz', + 'KBaseGenomeAnnotations.Assembly-6.3', + '2024-01-18T23:12:44+0000', + 18, + 'sijiex', + 69046, + 'sijiex:narrative_1688077625427', + 'aaa726d2b976e27e729ac288812e81f6', + 71823, + { + 'GC content': '0.41571', + 'Size': '4079204', + 'N Contigs': '260', + 'MD5': '8aa6b1244e18c4f93bb3307902bd3a4d', + "load_id": "998" + } + ], + [ + 1068, + 'GCF_000979375.1_gtlEnvA5udCFS_genomic.fna.gz', + 'KBaseGenomeAnnotations.Assembly-6.3', + '2024-01-18T23:12:35+0000', + 18, + 'sijiex', + 69046, + 'sijiex:narrative_1688077625427', + '866033827fd54569c953e8b3dd58d0aa', + 38242, + { + 'GC content': '0.41526', + 'Size': '4092300', + 'N Contigs': '136', + 'MD5': '1e007bad0811a6d6e09a882d3bf802ab', + "load_id": "998" + } + ], + None], + 'paths': [['69046/1086/18'], ['69046/1068/18'], None] + } + + obj_names, obj_upas = workspace_uploader._query_workspace_with_load_id_mass( + ws, + 69046, + "998", + [ + "GCF_000980105.1_gtlEnvA5udCFS_genomic.fna.gz", + "GCF_000979375.1_gtlEnvA5udCFS_genomic.fna.gz", + "aloha", + ] + ) + assert obj_names == [ + "GCF_000980105.1_gtlEnvA5udCFS_genomic.fna.gz", + "GCF_000979375.1_gtlEnvA5udCFS_genomic.fna.gz", + ] + assert obj_upas == ["69046_1086_18", "69046_1068_18"] + + # assert that ws was called correctly + ws.get_object_info3.assert_called_once_with( + { + "objects": [ + {"wsid": 69046, "name": "GCF_000980105.1_gtlEnvA5udCFS_genomic.fna.gz"}, + {"wsid": 69046, "name": "GCF_000979375.1_gtlEnvA5udCFS_genomic.fna.gz"}, + {"wsid": 69046, "name": "aloha"} + ], + "ignoreErrors": 1, + "includeMetadata": 1 + } + )