Skip to content

Commit

Permalink
Merge pull request #643 from umccr/bugfix/copy-small-files-via-download
Browse files Browse the repository at this point in the history
  • Loading branch information
alexiswl authored Nov 3, 2024
2 parents fe3896f + d44db80 commit cac8ce0
Showing 1 changed file with 234 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,35 @@

# Standard imports
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import List
from urllib.parse import urlunparse, urlparse
import boto3
from os import environ
import typing
import logging
import re

from libica.openapi.v2.model.data import Data
# Wrapica imports
from wrapica.libica_models import ProjectData
from wrapica.job import get_job
from wrapica.enums import ProjectDataStatusValues
from wrapica.enums import ProjectDataStatusValues, DataType
from wrapica.project_data import (
convert_uri_to_project_data_obj, project_data_copy_batch_handler, delete_project_data,
list_project_data_non_recursively
convert_uri_to_project_data_obj, project_data_copy_batch_handler,
delete_project_data,
list_project_data_non_recursively,
write_icav2_file_contents, read_icav2_file_contents,
get_project_data_obj_from_project_id_and_path,
)

if typing.TYPE_CHECKING:
from mypy_boto3_ssm import SSMClient
from mypy_boto3_secretsmanager import SecretsManagerClient

# Set logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(level=logging.INFO)

# Globals
SUCCESS_STATES = [
Expand All @@ -68,6 +73,9 @@
"RUNNING"
]

TINY_FILE_SIZE_LIMIT = 8388608 # 8 MiB (8 * 2^20)
MULTI_PART_ETAG_REGEX = re.compile(r"\w+-\d+")

# Try a job 10 times before giving up
MAX_JOB_ATTEMPT_COUNTER = 10
DEFAULT_WAIT_TIME_SECONDS = 10
Expand Down Expand Up @@ -120,22 +128,70 @@ def set_icav2_env_vars():
)


def submit_copy_job(dest_uri: str, source_uris: List[str]) -> str:
def tiny_file_transfer(dest_folder_project_data: ProjectData, source_file_project_data: ProjectData):
"""
For tiny files (that have aws s3 tagging), ICAv2 cannot transfer these
through jobs due to permission errors
:param dest_folder_project_data:
:param source_file_project_data:
:return:
"""
# Check file does not exist
try:
get_project_data_obj_from_project_id_and_path(
project_id=dest_folder_project_data.project_id,
data_path=Path(dest_folder_project_data.data.details.path) / source_file_project_data.data.details.name,
data_type=DataType.FILE
)
except FileNotFoundError:
pass
else:
# File already exists, no need to rerun
return None

# Download / upload tiny files
with NamedTemporaryFile() as temp_file_obj:
# Pull down data
read_icav2_file_contents(
source_file_project_data.project_id,
source_file_project_data.data.id,
Path(temp_file_obj.name)
)
# Write file contents
write_icav2_file_contents(
project_id=dest_folder_project_data.project_id,
data_path=Path(dest_folder_project_data.data.details.path) / source_file_project_data.data.details.name,
file_stream_or_path=Path(temp_file_obj.name)
)

# Get new file object
dest_file_project_data_obj = get_project_data_obj_from_project_id_and_path(
project_id=dest_folder_project_data.project_id,
data_path=Path(dest_folder_project_data.data.details.path) / source_file_project_data.data.details.name,
data_type=DataType.FILE
)

# Append ilmn tags from old file to new file
# FIXME


def submit_copy_job(dest_project_data_obj: ProjectData, source_project_data_objs: List[ProjectData]) -> str:
# Rerun copy batch process
source_data_ids = list(
map(
lambda source_uri_iter: convert_uri_to_project_data_obj(
source_uri_iter
).data.id,
source_uris
lambda source_project_data_obj_iter_: source_project_data_obj_iter_.data.id,
source_project_data_objs
)
)

dest_project_data_obj = convert_uri_to_project_data_obj(
dest_uri,
create_data_if_not_found=True
)
return project_data_copy_batch_handler(
source_data_ids=source_data_ids,
destination_project_id=dest_project_data_obj.project_id,
destination_folder_path=Path(dest_project_data_obj.data.details.path)
).id


def delete_existing_partial_data(dest_project_data_obj: ProjectData):
# Check list of files in the dest project data object and make sure no file has a partial status
existing_files = list_project_data_non_recursively(
dest_project_data_obj.project_id,
Expand All @@ -151,11 +207,42 @@ def submit_copy_job(dest_uri: str, source_uris: List[str]) -> str:
existing_file.data.id
)

return project_data_copy_batch_handler(
source_data_ids=source_data_ids,
destination_project_id=dest_project_data_obj.project_id,
destination_folder_path=Path(dest_project_data_obj.data.details.path)
).id

def get_source_uris_as_project_data_objs(source_uris: List[str]) -> List[ProjectData]:
# Get source uris as project data objects
return list(
map(
lambda source_uri_iter: convert_uri_to_project_data_obj(
source_uri_iter
),
source_uris
)
)


def filter_tiny_files_from_source_project_data_objs(
source_project_data_objs: List[ProjectData],
dest_folder_obj: ProjectData
) -> List[ProjectData]:
source_project_data_list_filtered: List[ProjectData] = []
for source_project_data_obj in source_project_data_objs:
# Put all the big files into the job
if (
source_project_data_obj.data.details.file_size_in_bytes >= TINY_FILE_SIZE_LIMIT and
MULTI_PART_ETAG_REGEX.fullmatch(source_project_data_obj.data.details.object_e_tag) is not None
):
source_project_data_list_filtered.append(source_project_data_obj)
continue
# We have a tiny file, transfer via download + upload
logger.info(
f"File {source_project_data_obj.data.id} is too small to transfer via a job, "
f"transferring via download+upload"
)
tiny_file_transfer(
dest_folder_obj,
source_project_data_obj
)
return source_project_data_list_filtered


def handler(event, context):
Expand All @@ -172,18 +259,47 @@ def handler(event, context):
source_uris = event.get("source_uris")
job_id = event.get("job_id")
failed_job_list = event.get("failed_job_list")
job_status = event.get("job_status")
wait_time_seconds = event.get("wait_time_seconds")

# Get destination uri as project data object
logger.info("Get dest project data folder object, create if it doesn't exist")
dest_project_data_obj = convert_uri_to_project_data_obj(
dest_uri,
create_data_if_not_found=True
)

# Check if job is None
if job_id is None:
logger.info("First time, though delete any existing partial data")
# First time through
delete_existing_partial_data(dest_project_data_obj)

# Get Source Uris as project data objects
# Filter out files smaller than the min file size limit
# These are transferred over manually
source_project_data_list = filter_tiny_files_from_source_project_data_objs(
get_source_uris_as_project_data_objs(source_uris),
dest_project_data_obj
)

# Check we have a job to run
if len(source_project_data_list) == 0:
logger.info(f"No file larger than {TINY_FILE_SIZE_LIMIT} in size, no job to run")
return {
"dest_uri": dest_uri,
"source_uris": source_uris,
"job_id": None,
"failed_job_list": [], # Empty list or list of failed jobs
"job_status": "SUCCEEDED",
"wait_time_seconds": DEFAULT_WAIT_TIME_SECONDS
}

return {
"dest_uri": dest_uri,
"source_uris": source_uris,
"job_id": submit_copy_job(
dest_uri=dest_uri,
source_uris=source_uris,
dest_project_data_obj=dest_project_data_obj,
source_project_data_objs=source_project_data_list,
),
"failed_job_list": [], # Empty list or list of failed jobs
"job_status": "RUNNING",
Expand Down Expand Up @@ -237,12 +353,16 @@ def handler(event, context):
}

# Return a new job with new wait time
source_project_data_list = filter_tiny_files_from_source_project_data_objs(
get_source_uris_as_project_data_objs(source_uris),
dest_project_data_obj
)
return {
"dest_uri": dest_uri,
"source_uris": source_uris,
"job_id": submit_copy_job(
dest_uri=dest_uri,
source_uris=source_uris,
dest_project_data_obj=dest_project_data_obj,
source_project_data_objs=source_project_data_list,
),
"failed_job_list": failed_job_list, # Empty list or list of failed jobs
"job_status": "RUNNING",
Expand Down Expand Up @@ -296,12 +416,23 @@ def handler(event, context):
if has_errors:
# Add this job id to the failed job list
failed_job_list.append(job_id)

# Delete any existing partial data
delete_existing_partial_data(dest_project_data_obj)

# Get source project data list uris
source_project_data_list = filter_tiny_files_from_source_project_data_objs(
get_source_uris_as_project_data_objs(source_uris),
dest_project_data_obj
)

# Resubmit job
return {
"dest_uri": dest_uri,
"source_uris": source_uris,
"job_id": submit_copy_job(
dest_uri=dest_uri,
source_uris=source_uris,
dest_project_data_obj=dest_project_data_obj,
source_project_data_objs=source_project_data_list,
),
"failed_job_list": failed_job_list, # Empty list or list of failed jobs
"job_status": "RUNNING",
Expand All @@ -319,39 +450,79 @@ def handler(event, context):
}


if __name__ == "__main__":
import json
environ['AWS_PROFILE'] = 'umccr-development'
environ['AWS_DEFAULT_REGION'] = 'ap-southeast-2'
environ['ICAV2_ACCESS_TOKEN_SECRET_ID'] = 'ICAv2JWTKey-umccr-prod-service-dev'
print(
json.dumps(
handler(
event={
"job_id": None,
"failed_job_list": [],
"wait_time_seconds": 5,
"job_status": None,
"dest_uri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/cache/cttsov2/20241031d8a13553/L2401532/",
"source_uris": [
"s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/L2401532/L2401532_S7_L001_R1_001.fastq.gz",
"s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/L2401532/L2401532_S7_L001_R2_001.fastq.gz"
],
},
context=None
),
indent=4
)
)

# {
# "dest_uri": "s3://pipeline-prod-cache-503977275616-ap-southeast-2/byob-icav2/production/primary/240926_A01052_0232_AHW7LHDSXC/20240928f63332ac/Samples/Lane_4/LPRJ241305/",
# "source_uris": [
# "icav2://9ec02c1f-53ba-47a5-854d-e6b53101adb7/ilmn-analyses/240926_A01052_0232_AHW7LHDSXC_f5e33a_03217c-BclConvert v4_2_7-792cba71-52fa-42b3-85a0-c6593f199353/output/Samples/Lane_4/LPRJ241305/LPRJ241305_S41_L004_R1_001.fastq.gz",
# "icav2://9ec02c1f-53ba-47a5-854d-e6b53101adb7/ilmn-analyses/240926_A01052_0232_AHW7LHDSXC_f5e33a_03217c-BclConvert v4_2_7-792cba71-52fa-42b3-85a0-c6593f199353/output/Samples/Lane_4/LPRJ241305/LPRJ241305_S41_L004_R2_001.fastq.gz"
# ],
# "job_id": "bdfb0a4d-bcae-4670-b51f-9417d23e777a",
# "failed_job_list": [],
# "job_status": "SUCCEEDED",
# "wait_time_seconds": 10
# }
# Just Small files
# if __name__ == "__main__":
# import json
# environ['AWS_PROFILE'] = 'umccr-development'
# environ['AWS_DEFAULT_REGION'] = 'ap-southeast-2'
# environ['ICAV2_ACCESS_TOKEN_SECRET_ID'] = 'ICAv2JWTKey-umccr-prod-service-dev'
# print(
# json.dumps(
# handler(
# event={
# "job_id": None,
# "failed_job_list": [],
# "wait_time_seconds": 5,
# "job_status": None,
# "dest_uri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/cache/cttsov2/20241031d8a13553/L2401532/",
# "source_uris": [
# "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/L2401532/L2401532_S7_L001_R1_001.fastq.gz",
# "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/L2401532/L2401532_S7_L001_R2_001.fastq.gz"
# ],
# },
# context=None
# ),
# indent=4
# )
# )
#
# # {
# # "dest_uri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/cache/cttsov2/20241031d8a13553/L2401532/",
# # "source_uris": [
# # "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/L2401532/L2401532_S7_L001_R1_001.fastq.gz",
# # "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/L2401532/L2401532_S7_L001_R2_001.fastq.gz"
# # ],
# # "job_id": null,
# # "failed_job_list": [],
# # "job_status": "SUCCEEDED",
# # "wait_time_seconds": 10
# # }


# Copy files with mixed small and large
# if __name__ == "__main__":
# import json
# environ['AWS_PROFILE'] = 'umccr-development'
# environ['AWS_DEFAULT_REGION'] = 'ap-southeast-2'
# environ['ICAV2_ACCESS_TOKEN_SECRET_ID'] = 'ICAv2JWTKey-umccr-prod-service-dev'
# print(
# json.dumps(
# handler(
# event={
# "job_id": None,
# "failed_job_list": [],
# "wait_time_seconds": 5,
# "job_status": None,
# "dest_uri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/cache/cttsov2/20241031d8a13553/small-files-copy-test/",
# "source_uris": [
# "icav2://development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/small-files/chunk_file_size_8mb.bin",
# "icav2://development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/small-files/chunk_file_size_8mb_minus_1.bin"
# ],
# },
# context=None
# ),
# indent=4
# )
# )
#
# # {
# # "dest_uri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/cache/cttsov2/20241031d8a13553/L2401532/",
# # "source_uris": [
# # "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/L2401532/L2401532_S7_L001_R1_001.fastq.gz",
# # "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/241024_A00130_0336_BHW7MVDSXC/20241030c613872c/Samples/Lane_1/L2401532/L2401532_S7_L001_R2_001.fastq.gz"
# # ],
# # "job_id": null,
# # "failed_job_list": [],
# # "job_status": "SUCCEEDED",
# # "wait_time_seconds": 10
# # }

0 comments on commit cac8ce0

Please sign in to comment.