From 719d1242db0ff1235679411faadb17c66c91c4db Mon Sep 17 00:00:00 2001 From: ErinWeisbart <54687786+ErinWeisbart@users.noreply.github.com> Date: Tue, 8 Apr 2025 13:56:30 -0700 Subject: [PATCH 1/3] force workspace download --- worker/cp-worker.py | 58 ++++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index 4fad002..c739cba 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -179,26 +179,38 @@ def runCellProfiler(message): return 'SUCCESS' except KeyError: #Returned if that folder does not exist pass + + # Download load data file + data_file_path = os.path.join(localIn,message['data_file']) + printandlog(f"Downloading {message['data_file']} from {WORKSPACE_BUCKET}", logger) + csv_insubfolders = message['data_file'].split('/') + subfolders = '/'.join((csv_insubfolders)[:-1]) + if not os.path.exists(os.path.join(localIn,subfolders)): + os.makedirs(os.path.join(localIn,subfolders), exist_ok=True) + s3client=boto3.client('s3') + try: + s3client.download_file(WORKSPACE_BUCKET, message['data_file'], data_file_path) + except botocore.exceptions.ClientError: + printandlog(f"Can't find load data file in S3. Looking for {message['data_file']} in {WORKSPACE_BUCKET}",logger) + printandlog("Aborting. Can't run without load data.",logger) + logger.removeHandler(watchtowerlogger) + return 'DOWNLOAD_PROBLEM' + + # Download pipeline and update pipeline path in message + printandlog(f"Downloading {message['pipeline']} from {WORKSPACE_BUCKET}", logger) + pipepath = os.path.join(localIn, message['pipeline'].split('/')[-1]) + try: + s3client.download_file(WORKSPACE_BUCKET, message['pipeline'], pipepath) + except botocore.exceptions.ClientError: + printandlog(f"Can't find pipeline in S3. Looking for {message['pipeline']} in {WORKSPACE_BUCKET}",logger) + printandlog("Aborting. Can't run without pipeline.",logger) + logger.removeHandler(watchtowerlogger) + return 'DOWNLOAD_PROBLEM' downloaded_files = [] - # Optional - download all files, bypass S3 mounting + # Optional - download image files, bypass S3 mounting if DOWNLOAD_FILES.lower() == 'true': - # Download load data file and image files - data_file_path = os.path.join(localIn,message['data_file']) - printandlog(f"Downloading {message['data_file']} from {WORKSPACE_BUCKET}", logger) - csv_insubfolders = message['data_file'].split('/') - subfolders = '/'.join((csv_insubfolders)[:-1]) - if not os.path.exists(os.path.join(localIn,subfolders)): - os.makedirs(os.path.join(localIn,subfolders), exist_ok=True) - s3client=boto3.client('s3') - try: - s3client.download_file(WORKSPACE_BUCKET, message['data_file'], data_file_path) - except botocore.exceptions.ClientError: - printandlog(f"Can't find load data file in S3. Looking for {message['data_file']} in {WORKSPACE_BUCKET}",logger) - printandlog("Aborting. Can't run without load data.",logger) - logger.removeHandler(watchtowerlogger) - return 'DOWNLOAD_PROBLEM' if message['data_file'][-4:]=='.csv': printandlog('Figuring which files to download', logger) import pandas @@ -273,20 +285,8 @@ def runCellProfiler(message): printandlog(f'Downloaded {str(len(downloaded_files))} files',logger) else: printandlog("Couldn't parse data file for file download. Not supported input of .csv or .txt",logger) - # Download pipeline and update pipeline path in message - printandlog(f"Downloading {message['pipeline']} from {WORKSPACE_BUCKET}", logger) - pipepath = os.path.join(localIn, message['pipeline'].split('/')[-1]) - try: - s3client.download_file(WORKSPACE_BUCKET, message['pipeline'], pipepath) - except botocore.exceptions.ClientError: - printandlog(f"Can't find pipeline in S3. Looking for {message['pipeline']} in {WORKSPACE_BUCKET}",logger) - printandlog("Aborting. Can't run without pipeline.",logger) - logger.removeHandler(watchtowerlogger) - return 'DOWNLOAD_PROBLEM' - else: - data_file_path = os.path.join(DATA_ROOT,message['data_file']) - pipepath = os.path.join(DATA_ROOT,message["pipeline"]) + printandlog('Using bucket mount for image files', logger) # Build and run CellProfiler command cpDone = f'{localOut}/cp.is.done' From f59bd10af8081b4b83926a7ffbc5b965c14670de Mon Sep 17 00:00:00 2001 From: ErinWeisbart <54687786+ErinWeisbart@users.noreply.github.com> Date: Wed, 9 Apr 2025 09:03:47 -0700 Subject: [PATCH 2/3] docs: auto-download workspace --- documentation/DCP-documentation/step_1_configuration.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/documentation/DCP-documentation/step_1_configuration.md b/documentation/DCP-documentation/step_1_configuration.md index 3dc89a2..c5dddf3 100644 --- a/documentation/DCP-documentation/step_1_configuration.md +++ b/documentation/DCP-documentation/step_1_configuration.md @@ -28,8 +28,10 @@ For more information and examples, see [External Buckets](external_buckets.md). This is generally the bucket in the account in which you are running compute. * **SOURCE_BUCKET:** The bucket where the image files you will be reading are. Often, this is the same as AWS_BUCKET. -* **WORKSPACE:** The bucket where non-image files you will be reading are (e.g. pipeline, load_data.csv, etc.). +These files can be downloaded or read directly off the bucket (see `DOWNLOAD_FILES` below for more). +* **WORKSPACE_BUCKET:** The bucket where non-image files you will be reading are (e.g. pipeline, load_data.csv, etc.). Often, this is the same as AWS_BUCKET. +Workspace files will always be automatically downloaded to your EC2 instance (as of v2.2.1). * **DESTINATION_BUCKET:** The bucket where you want to write your output files. Often, this is the same as AWS_BUCKET. * **UPLOAD_FLAGS:** If you need to add flags to an AWS CLI command to upload flags to your DESTINATION_BUCKET, this is where you enter them. @@ -57,6 +59,7 @@ If you have multiple Dockers running per machine, each Docker will have access t This typically requires a larger EBS volume (depending on the size of your image sets, and how many sets are processed per group), but avoids occasional issues with S3FS that can crop up on longer runs. By default, DCP uses S3FS to mount the S3 `SOURCE_BUCKET` as a pseudo-file system on each EC2 instance in your spot fleet to avoid file download. If you are unable to mount the `SOURCE_BUCKET` (perhaps because of a permissions issue) you should proceed with `DOWNLOAD_FILES = 'True'`. +Note that as of v2.2.1, all non-image files (e.g. load_data.csv's and pipelines) are downloaded regardless of this setting and regardless of whether `SOURCE_BUCKET` and `WORKSPACE_BUCKET` are the same. * **ASSIGN_IP:** Whether or not to assign an a public IPv4 address to each instance in the spot fleet. If set to 'False' will overwrite whatever is in the Fleet file. If set to 'True' will respect whatever is in the Fleet file. From 6b37607f1a43a16a196f8caa46068f8914a3f326 Mon Sep 17 00:00:00 2001 From: ErinWeisbart <54687786+ErinWeisbart@users.noreply.github.com> Date: Wed, 9 Apr 2025 09:05:21 -0700 Subject: [PATCH 3/3] cleanup --- worker/cp-worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index c739cba..e54b6b8 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -109,6 +109,8 @@ def printandlog(text,logger): ################################# def runCellProfiler(message): + s3client=boto3.client('s3') + #List the directories in the bucket- this prevents a strange s3fs error rootlist=os.listdir(DATA_ROOT) for eachSubDir in rootlist: @@ -167,7 +169,6 @@ def runCellProfiler(message): # See if this is a message you've already handled, if you've so chosen if CHECK_IF_DONE_BOOL.upper() == 'TRUE': try: - s3client=boto3.client('s3') bucketlist=s3client.list_objects(Bucket=DESTINATION_BUCKET,Prefix=f'{remoteOut}/') objectsizelist=[k['Size'] for k in bucketlist['Contents']] objectsizelist = [i for i in objectsizelist if i >= MIN_FILE_SIZE_BYTES] @@ -187,7 +188,6 @@ def runCellProfiler(message): subfolders = '/'.join((csv_insubfolders)[:-1]) if not os.path.exists(os.path.join(localIn,subfolders)): os.makedirs(os.path.join(localIn,subfolders), exist_ok=True) - s3client=boto3.client('s3') try: s3client.download_file(WORKSPACE_BUCKET, message['data_file'], data_file_path) except botocore.exceptions.ClientError: