diff --git a/lib/workload/components/icav2-copy-files/check_or_launch_job_lambda_py/check_or_launch_job_lambda.py b/lib/workload/components/icav2-copy-files/check_or_launch_job_lambda_py/check_or_launch_job_lambda.py index 74d10c778..5b90f2188 100644 --- a/lib/workload/components/icav2-copy-files/check_or_launch_job_lambda_py/check_or_launch_job_lambda.py +++ b/lib/workload/components/icav2-copy-files/check_or_launch_job_lambda_py/check_or_launch_job_lambda.py @@ -28,21 +28,25 @@ # Standard imports from pathlib import Path +from tempfile import NamedTemporaryFile from typing import List from urllib.parse import urlunparse, urlparse import boto3 from os import environ import typing import logging +import re -from libica.openapi.v2.model.data import Data # Wrapica imports from wrapica.libica_models import ProjectData from wrapica.job import get_job -from wrapica.enums import ProjectDataStatusValues +from wrapica.enums import ProjectDataStatusValues, DataType from wrapica.project_data import ( - convert_uri_to_project_data_obj, project_data_copy_batch_handler, delete_project_data, - list_project_data_non_recursively + convert_uri_to_project_data_obj, project_data_copy_batch_handler, + delete_project_data, + list_project_data_non_recursively, + write_icav2_file_contents, read_icav2_file_contents, + get_project_data_obj_from_project_id_and_path, ) if typing.TYPE_CHECKING: @@ -50,8 +54,9 @@ from mypy_boto3_secretsmanager import SecretsManagerClient # Set logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +logging.basicConfig() +logger = logging.getLogger() +logger.setLevel(level=logging.INFO) # Globals SUCCESS_STATES = [ @@ -68,6 +73,9 @@ "RUNNING" ] +TINY_FILE_SIZE_LIMIT = 8388608 # 8 MiB (8 * 2^20) +MULTI_PART_ETAG_REGEX = re.compile(r"\w+-\d+") + # Try a job 10 times before giving up MAX_JOB_ATTEMPT_COUNTER = 10 DEFAULT_WAIT_TIME_SECONDS = 10 @@ -120,22 +128,70 @@ def set_icav2_env_vars(): ) -def submit_copy_job(dest_uri: str, source_uris: List[str]) -> str: +def tiny_file_transfer(dest_folder_project_data: ProjectData, source_file_project_data: ProjectData): + """ + For tiny files (that have aws s3 tagging), ICAv2 cannot transfer these + through jobs due to permission errors + :param dest_folder_project_data: + :param source_file_project_data: + :return: + """ + # Check file does not exist + try: + get_project_data_obj_from_project_id_and_path( + project_id=dest_folder_project_data.project_id, + data_path=Path(dest_folder_project_data.data.details.path) / source_file_project_data.data.details.name, + data_type=DataType.FILE + ) + except FileNotFoundError: + pass + else: + # File already exists, no need to rerun + return None + + # Download / upload tiny files + with NamedTemporaryFile() as temp_file_obj: + # Pull down data + read_icav2_file_contents( + source_file_project_data.project_id, + source_file_project_data.data.id, + Path(temp_file_obj.name) + ) + # Write file contents + write_icav2_file_contents( + project_id=dest_folder_project_data.project_id, + data_path=Path(dest_folder_project_data.data.details.path) / source_file_project_data.data.details.name, + file_stream_or_path=Path(temp_file_obj.name) + ) + + # Get new file object + dest_file_project_data_obj = get_project_data_obj_from_project_id_and_path( + project_id=dest_folder_project_data.project_id, + data_path=Path(dest_folder_project_data.data.details.path) / source_file_project_data.data.details.name, + data_type=DataType.FILE + ) + + # Append ilmn tags from old file to new file + # FIXME + + +def submit_copy_job(dest_project_data_obj: ProjectData, source_project_data_objs: List[ProjectData]) -> str: # Rerun copy batch process source_data_ids = list( map( - lambda source_uri_iter: convert_uri_to_project_data_obj( - source_uri_iter - ).data.id, - source_uris + lambda source_project_data_obj_iter_: source_project_data_obj_iter_.data.id, + source_project_data_objs ) ) - dest_project_data_obj = convert_uri_to_project_data_obj( - dest_uri, - create_data_if_not_found=True - ) + return project_data_copy_batch_handler( + source_data_ids=source_data_ids, + destination_project_id=dest_project_data_obj.project_id, + destination_folder_path=Path(dest_project_data_obj.data.details.path) + ).id + +def delete_existing_partial_data(dest_project_data_obj: ProjectData): # Check list of files in the dest project data object and make sure no file has a partial status existing_files = list_project_data_non_recursively( dest_project_data_obj.project_id, @@ -151,11 +207,42 @@ def submit_copy_job(dest_uri: str, source_uris: List[str]) -> str: existing_file.data.id ) - return project_data_copy_batch_handler( - source_data_ids=source_data_ids, - destination_project_id=dest_project_data_obj.project_id, - destination_folder_path=Path(dest_project_data_obj.data.details.path) - ).id + +def get_source_uris_as_project_data_objs(source_uris: List[str]) -> List[ProjectData]: + # Get source uris as project data objects + return list( + map( + lambda source_uri_iter: convert_uri_to_project_data_obj( + source_uri_iter + ), + source_uris + ) + ) + + +def filter_tiny_files_from_source_project_data_objs( + source_project_data_objs: List[ProjectData], + dest_folder_obj: ProjectData +) -> List[ProjectData]: + source_project_data_list_filtered: List[ProjectData] = [] + for source_project_data_obj in source_project_data_objs: + # Put all the big files into the job + if ( + source_project_data_obj.data.details.file_size_in_bytes >= TINY_FILE_SIZE_LIMIT and + MULTI_PART_ETAG_REGEX.fullmatch(source_project_data_obj.data.details.object_e_tag) is not None + ): + source_project_data_list_filtered.append(source_project_data_obj) + continue + # We have a tiny file, transfer via download + upload + logger.info( + f"File {source_project_data_obj.data.id} is too small to transfer via a job, " + f"transferring via download+upload" + ) + tiny_file_transfer( + dest_folder_obj, + source_project_data_obj + ) + return source_project_data_list_filtered def handler(event, context): @@ -172,18 +259,47 @@ def handler(event, context): source_uris = event.get("source_uris") job_id = event.get("job_id") failed_job_list = event.get("failed_job_list") - job_status = event.get("job_status") wait_time_seconds = event.get("wait_time_seconds") + # Get destination uri as project data object + logger.info("Get dest project data folder object, create if it doesn't exist") + dest_project_data_obj = convert_uri_to_project_data_obj( + dest_uri, + create_data_if_not_found=True + ) + # Check if job is None if job_id is None: + logger.info("First time, though delete any existing partial data") # First time through + delete_existing_partial_data(dest_project_data_obj) + + # Get Source Uris as project data objects + # Filter out files smaller than the min file size limit + # These are transferred over manually + source_project_data_list = filter_tiny_files_from_source_project_data_objs( + get_source_uris_as_project_data_objs(source_uris), + dest_project_data_obj + ) + + # Check we have a job to run + if len(source_project_data_list) == 0: + logger.info(f"No file larger than {TINY_FILE_SIZE_LIMIT} in size, no job to run") + return { + "dest_uri": dest_uri, + "source_uris": source_uris, + "job_id": None, + "failed_job_list": [], # Empty list or list of failed jobs + "job_status": "SUCCEEDED", + "wait_time_seconds": DEFAULT_WAIT_TIME_SECONDS + } + return { "dest_uri": dest_uri, "source_uris": source_uris, "job_id": submit_copy_job( - dest_uri=dest_uri, - source_uris=source_uris, + dest_project_data_obj=dest_project_data_obj, + source_project_data_objs=source_project_data_list, ), "failed_job_list": [], # Empty list or list of failed jobs "job_status": "RUNNING", @@ -237,12 +353,16 @@ def handler(event, context): } # Return a new job with new wait time + source_project_data_list = filter_tiny_files_from_source_project_data_objs( + get_source_uris_as_project_data_objs(source_uris), + dest_project_data_obj + ) return { "dest_uri": dest_uri, "source_uris": source_uris, "job_id": submit_copy_job( - dest_uri=dest_uri, - source_uris=source_uris, + dest_project_data_obj=dest_project_data_obj, + source_project_data_objs=source_project_data_list, ), "failed_job_list": failed_job_list, # Empty list or list of failed jobs "job_status": "RUNNING", @@ -296,12 +416,23 @@ def handler(event, context): if has_errors: # Add this job id to the failed job list failed_job_list.append(job_id) + + # Delete any existing partial data + delete_existing_partial_data(dest_project_data_obj) + + # Get source project data list uris + source_project_data_list = filter_tiny_files_from_source_project_data_objs( + get_source_uris_as_project_data_objs(source_uris), + dest_project_data_obj + ) + + # Resubmit job return { "dest_uri": dest_uri, "source_uris": source_uris, "job_id": submit_copy_job( - dest_uri=dest_uri, - source_uris=source_uris, + dest_project_data_obj=dest_project_data_obj, + source_project_data_objs=source_project_data_list, ), "failed_job_list": failed_job_list, # Empty list or list of failed jobs "job_status": "RUNNING", @@ -319,39 +450,79 @@ def handler(event, context): } -if __name__ == "__main__": - import json - environ['AWS_PROFILE'] = 'umccr-development' - environ['AWS_DEFAULT_REGION'] = 'ap-southeast-2' - environ['ICAV2_ACCESS_TOKEN_SECRET_ID'] = 'ICAv2JWTKey-umccr-prod-service-dev' - print( - json.dumps( - handler( - event={ - "job_id": None, - "failed_job_list": [], - "wait_time_seconds": 5, - "job_status": None, - "dest_uri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/cache/cttsov2/20241031d8a13553/L2401532/", - "source_uris": [ - "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/L2401532/L2401532_S7_L001_R1_001.fastq.gz", - "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/L2401532/L2401532_S7_L001_R2_001.fastq.gz" - ], - }, - context=None - ), - indent=4 - ) - ) - -# { -# "dest_uri": "s3://pipeline-prod-cache-503977275616-ap-southeast-2/byob-icav2/production/primary/240926_A01052_0232_AHW7LHDSXC/20240928f63332ac/Samples/Lane_4/LPRJ241305/", -# "source_uris": [ -# "icav2://9ec02c1f-53ba-47a5-854d-e6b53101adb7/ilmn-analyses/240926_A01052_0232_AHW7LHDSXC_f5e33a_03217c-BclConvert v4_2_7-792cba71-52fa-42b3-85a0-c6593f199353/output/Samples/Lane_4/LPRJ241305/LPRJ241305_S41_L004_R1_001.fastq.gz", -# "icav2://9ec02c1f-53ba-47a5-854d-e6b53101adb7/ilmn-analyses/240926_A01052_0232_AHW7LHDSXC_f5e33a_03217c-BclConvert v4_2_7-792cba71-52fa-42b3-85a0-c6593f199353/output/Samples/Lane_4/LPRJ241305/LPRJ241305_S41_L004_R2_001.fastq.gz" -# ], -# "job_id": "bdfb0a4d-bcae-4670-b51f-9417d23e777a", -# "failed_job_list": [], -# "job_status": "SUCCEEDED", -# "wait_time_seconds": 10 -# } +# Just Small files +# if __name__ == "__main__": +# import json +# environ['AWS_PROFILE'] = 'umccr-development' +# environ['AWS_DEFAULT_REGION'] = 'ap-southeast-2' +# environ['ICAV2_ACCESS_TOKEN_SECRET_ID'] = 'ICAv2JWTKey-umccr-prod-service-dev' +# print( +# json.dumps( +# handler( +# event={ +# "job_id": None, +# "failed_job_list": [], +# "wait_time_seconds": 5, +# "job_status": None, +# "dest_uri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/cache/cttsov2/20241031d8a13553/L2401532/", +# "source_uris": [ +# "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/L2401532/L2401532_S7_L001_R1_001.fastq.gz", +# "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/L2401532/L2401532_S7_L001_R2_001.fastq.gz" +# ], +# }, +# context=None +# ), +# indent=4 +# ) +# ) +# +# # { +# # "dest_uri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/cache/cttsov2/20241031d8a13553/L2401532/", +# # "source_uris": [ +# # "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/L2401532/L2401532_S7_L001_R1_001.fastq.gz", +# # "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/L2401532/L2401532_S7_L001_R2_001.fastq.gz" +# # ], +# # "job_id": null, +# # "failed_job_list": [], +# # "job_status": "SUCCEEDED", +# # "wait_time_seconds": 10 +# # } + + +# Copy files with mixed small and large +# if __name__ == "__main__": +# import json +# environ['AWS_PROFILE'] = 'umccr-development' +# environ['AWS_DEFAULT_REGION'] = 'ap-southeast-2' +# environ['ICAV2_ACCESS_TOKEN_SECRET_ID'] = 'ICAv2JWTKey-umccr-prod-service-dev' +# print( +# json.dumps( +# handler( +# event={ +# "job_id": None, +# "failed_job_list": [], +# "wait_time_seconds": 5, +# "job_status": None, +# "dest_uri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/cache/cttsov2/20241031d8a13553/small-files-copy-test/", +# "source_uris": [ +# "icav2://development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/small-files/chunk_file_size_8mb.bin", +# "icav2://development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/small-files/chunk_file_size_8mb_minus_1.bin" +# ], +# }, +# context=None +# ), +# indent=4 +# ) +# ) +# +# # { +# # "dest_uri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/cache/cttsov2/20241031d8a13553/L2401532/", +# # "source_uris": [ +# # "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/L2401532/L2401532_S7_L001_R1_001.fastq.gz", +# # "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/L2401532/L2401532_S7_L001_R2_001.fastq.gz" +# # ], +# # "job_id": null, +# # "failed_job_list": [], +# # "job_status": "SUCCEEDED", +# # "wait_time_seconds": 10 +# # }