diff --git a/flepimop/gempyor_pkg/setup.cfg b/flepimop/gempyor_pkg/setup.cfg index e5fb0902a..abb72a716 100644 --- a/flepimop/gempyor_pkg/setup.cfg +++ b/flepimop/gempyor_pkg/setup.cfg @@ -53,6 +53,8 @@ console_scripts = gempyor-seir = gempyor.simulate_seir:simulate gempyor-simulate = gempyor.simulate:simulate flepimop-calibrate = gempyor.calibrate:calibrate + flepimop-pull = gempyor.resume_pull:fetching_resume_files + flepimop-push = gempyor.flepimop_push:flepimop_push [options.packages.find] where = src diff --git a/flepimop/gempyor_pkg/src/gempyor/file_paths.py b/flepimop/gempyor_pkg/src/gempyor/file_paths.py index 627e8188a..08e420051 100644 --- a/flepimop/gempyor_pkg/src/gempyor/file_paths.py +++ b/flepimop/gempyor_pkg/src/gempyor/file_paths.py @@ -58,13 +58,7 @@ def create_file_name( """ if create_directory: os.makedirs( - create_dir_name( - run_id, - prefix, - ftype, - inference_filepath_suffix, - inference_filename_prefix, - ), + create_dir_name(run_id, prefix, ftype, inference_filepath_suffix, inference_filename_prefix,), exist_ok=True, ) @@ -123,13 +117,7 @@ def create_file_name_without_extension( """ if create_directory: os.makedirs( - create_dir_name( - run_id, - prefix, - ftype, - inference_filepath_suffix, - inference_filename_prefix, - ), + create_dir_name(run_id, prefix, ftype, inference_filepath_suffix, inference_filename_prefix,), exist_ok=True, ) filename = Path( @@ -169,11 +157,7 @@ def run_id(timestamp: None | datetime = None) -> str: def create_dir_name( - run_id: str, - prefix: str, - ftype: str, - inference_filepath_suffix: str, - inference_filename_prefix: str, + run_id: str, prefix: str, ftype: str, inference_filepath_suffix: str, inference_filename_prefix: str, ) -> str: """ Generate a directory name based on the given parameters. @@ -198,12 +182,52 @@ def create_dir_name( """ return os.path.dirname( create_file_name_without_extension( - run_id, - prefix, - 1, - ftype, - inference_filepath_suffix, - inference_filename_prefix, - create_directory=False, + run_id, prefix, 1, ftype, inference_filepath_suffix, inference_filename_prefix, create_directory=False, ) ) + + +def create_file_name_for_push( + flepi_run_index: str, prefix: str, flepi_slot_index: str, flepi_block_index: str +) -> list[str]: + """ + Generate a list of file names for different types of inference results. + + This function generates a list of file names based on the provided run index, prefix, slot index, + and block index. Each file name corresponds to a different type of inference result, such as + "seir", "hosp", "llik", etc. The file names are generated using the `create_file_name` function, + with specific extensions based on the type: "csv" for "seed" and "parquet" for all other types. + + Args: + flepi_run_index : + The index of the run. This is used to uniquely identify the run. + prefix : + A prefix string to be included in the file names. This is typically used to categorize or + identify the files. + flepi_slot_index : + The slot index used in the filename. This is formatted as a zero-padded nine-digit number. + flepi_block_index : + The block index used in the filename. This typically indicates a specific block or segment + of the data being processed. + + Returns: + list + A list of generated file names, each corresponding to a different type of inference result. + The file names include the provided prefix, run index, slot index, block index, type, and + the appropriate file extension (either "csv" or "parquet"). + """ + type_list = ["seir", "hosp", "llik", "spar", "snpi", "hnpi", "hpar", "init", "seed"] + extension_map = {type_name: "csv" if type_name == "seed" else "parquet" for type_name in type_list} + name_list = [] + for type_name, extension in extension_map.items(): + file_name = create_file_name( + run_id=flepi_run_index, + prefix=prefix, + inference_filename_prefix="{:09d}.".format(int(flepi_slot_index)), + inference_filepath_suffix="chimeric/intermediate", + index=flepi_block_index, + ftype=type_name, + extension=extension, + ) + name_list.append(file_name) + return name_list diff --git a/flepimop/gempyor_pkg/src/gempyor/flepimop_push.py b/flepimop/gempyor_pkg/src/gempyor/flepimop_push.py new file mode 100644 index 000000000..600bc757f --- /dev/null +++ b/flepimop/gempyor_pkg/src/gempyor/flepimop_push.py @@ -0,0 +1,206 @@ +import os +import click +import shutil +from .file_paths import create_file_name_for_push + + +@click.command() +@click.option( + "--s3_upload", + "s3_upload", + envvar="S3_UPLOAD", + help="push files to aws", + required=True, +) +@click.option( + "--data-path", + "data_path", + envvar="PROJECT_PATH", + type=click.Path(exists=True), + required=True, +) +@click.option( + "--flepi_run_index", + "flepi_run_index", + envvar="FLEPI_RUN_INDEX", + type=click.STRING, + required=True, +) +@click.option( + "--flepi_prefix", + "flepi_prefix", + envvar="FLEPI_PREFIX", + type=click.STRING, + required=True, +) +@click.option( + "--flepi_block_index", + "flepi_block_index", + envvar="FLEPI_BLOCK_INDEX", + type=click.STRING, + required=True, +) +@click.option( + "--flepi_slot_index", + "flepi_slot_index", + envvar="FLEPI_SLOT_INDEX", + type=click.STRING, + required=True, +) +@click.option( + "--s3_results_path", + "s3_results_path", + envvar="S3_RESULTS_PATH", + type=click.STRING, + default="", + required=False, +) +@click.option( + "--fs_results_path", + "fs_results_path", + envvar="FS_RESULTS_PATH", + type=click.Path(), + default="", + required=False, +) +def flepimop_push( + s3_upload: str, + data_path: str, + flepi_run_index: str, + flepi_prefix: str, + flepi_slot_index: str, + flepi_block_index: str, + s3_results_path: str = "", + fs_results_path: str = "", +) -> None: + """ + Push files to either AWS S3 or the local filesystem. + + This function generates a list of file names based on the provided parameters, checks which files + exist locally, and uploads or copies these files to either AWS S3 or the local filesystem based on + the specified options. + + Parameters: + ---------- + s3_upload : str + String indicating whether to push files to AWS S3. If set to true, files will be uploaded to S3. + If set to False, files will be copied to the local filesystem as specified by `fs_results_path`. + + data_path : str + The local directory path where the data files are stored. + + flepi_run_index : str + The index of the FLEPI run. This is used to uniquely identify the run and generate the corresponding file names. + + flepi_prefix : str + A prefix string to be included in the file names. This is typically used to categorize or identify the files. + + flepi_slot_index : str + The slot index used in the filename. This is formatted as a zero-padded nine-digit number, which helps in + distinguishing different slots of data processing. + + flepi_block_index : str + The block index used in the filename. This typically indicates a specific block or segment of the data being processed. + + s3_results_path : str, optional + The S3 path where the results should be uploaded. This parameter is required if `s3_upload` is set to true. + Default is an empty string, which will raise an error if `s3_upload` is True. + + fs_results_path : str, optional + The local filesystem path where the results should be copied. + Default is an empty string, which means no files will be copied locally unless specified. + + Raises: + ------ + ValueError + If `s3_upload` is set to True and `s3_results_path` is not provided. + + ModuleNotFoundError + If `boto3` is not installed when `s3_upload` is set to True. + + Notes: + ----- + - This function first checks for the existence of the files generated by `create_file_name_for_push` + in the `data_path` directory. Only the files that exist will be pushed to AWS S3 or copied to the local filesystem. + + - When uploading to AWS S3, the function attempts to create the specified path in the S3 bucket if it does not exist. + + - Local directories specified by `fs_results_path` are created if they do not already exist. + + Example Usage: + -------------- + ```bash + flepimop-push --s3_upload true --data-path /path/to/data --flepi_run_index run_01 --flepi_prefix prefix_01 \ + --flepi_slot_index 1 --flepi_block_index 1 --s3_results_path s3://my-bucket/results/ + ``` + + This would push the existing files generated by the `create_file_name_for_push` function to the specified S3 bucket. + """ + file_name_list = create_file_name_for_push( + flepi_run_index=flepi_run_index, + prefix=flepi_prefix, + flepi_slot_index=flepi_slot_index, + flepi_block_index=flepi_block_index, + ) + exist_files = [] + for file_name in file_name_list: + file_path = os.path.join(data_path, file_name) + if os.path.exists(file_path): + exist_files.append(file_name) + print("flepimos-push find these existing files: " + " ".join(exist_files)) + # Track failed uploads/copies separately + failed_s3_uploads = [] + failed_fs_copies = [] + if s3_upload == "true": + try: + import boto3 + from botocore.exceptions import ClientError + except ModuleNotFoundError: + raise ModuleNotFoundError( + ( + "No module named 'boto3', which is required for " + "gempyor.flepimop_push.flepimop_push. Please install the aws target." + ) + ) + if s3_results_path == "": + raise ValueError( + "argument aws is setted to True, you must use --s3_results_path or environment variable S3_RESULTS_PATH." + ) + s3 = boto3.client("s3") + for file in exist_files: + s3_path = os.path.join(s3_results_path, file) + bucket = s3_path.split("/")[2] + object_name = s3_path[len(bucket) + 6 :] + try: + s3.upload_file(os.path.join(data_path, file), bucket, object_name) + print(f"Uploaded {file} to S3 successfully.") + except ClientError as e: + print(f"Failed to upload {file} to S3: {e}") + failed_s3_uploads.append(file) + + if fs_results_path != "": + for file in exist_files: + dst = os.path.join(fs_results_path, file) + os.makedirs(os.path.dirname(dst), exist_ok=True) + try: + shutil.copy(os.path.join(data_path, file), dst) + print(f"Copied {file} to local filesystem successfully.") + except IOError as e: + print(f"Failed to copy {file} to local filesystem: {e}") + failed_fs_copies.append(file) + + # Print failed files for S3 uploads + if failed_s3_uploads: + print("The following files failed to upload to S3:") + for file in failed_s3_uploads: + print(file) + + # Print failed files for local filesystem copies + if failed_fs_copies: + print("The following files failed to copy to the local filesystem:") + for file in failed_fs_copies: + print(file) + + # Success message if no failures + if not failed_s3_uploads and not failed_fs_copies: + print("flepimop-push successfully pushed all existing files.") diff --git a/flepimop/gempyor_pkg/src/gempyor/resume_pull.py b/flepimop/gempyor_pkg/src/gempyor/resume_pull.py new file mode 100644 index 000000000..2118d9db8 --- /dev/null +++ b/flepimop/gempyor_pkg/src/gempyor/resume_pull.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python +""" +Script for fetching resume files based on various input parameters. + +Overview: +This script is designed to fetch resume files based on various input parameters. +It uses Click for command-line interface (CLI) options and handles the fetching process either by downloading from an S3 bucket or moving files locally. + +Dependencies: +- click: A package for creating command-line interfaces. +- os: A module for interacting with the operating system. +- gempyor.utils: A module containing utility functions create_resume_file_names_map, download_file_from_s3, and move_file_at_local. + +CLI Options: +The script uses Click to define the following command-line options: + +--resume_location: +- Environment Variables: LAST_JOB_OUTPUT, RESUME_LOCATION +- Type: STRING +- Required: Yes +- Description: The path for the last run's output. + +--discard_seeding: +- Environment Variable: RESUME_DISCARD_SEEDING +- Type: BOOL +- Required: Yes +- Description: Boolean value indicating whether to discard seeding or not. +- valid values: true, 1, y, yes, True, False, false, f, 0, no, n + +--block_index: +- Environment Variable: FLEPI_BLOCK_INDEX +- Type: STRING +- Required: Yes +- Description: The block index for the FLEPI. + +--resume_run_index: +- Environment Variable: RESUME_RUN_INDEX +- Type: STRING +- Required: Yes +- Description: The run index for resuming. + +--flepi_run_index: +- Environment Variable: FLEPI_RUN_INDEX +- Type: STRING +- Required: Yes +- Description: The run index for the FLEPI. + +--flepi_prefix: +- Environment Variable: FLEPI_PREFIX +- Type: STRING +- Required: Yes +- Description: The prefix for the FLEPI. + +Function: fetching_resume_files + +Parameters: +- resume_location (str): Path to the last run's output. +- discard_seeding (bool): Whether to discard seeding. +- flepi_block_index (str): Block index for FLEPI. +- resume_run_index (str): Run index for resuming. +- flepi_run_index (str): Run index for FLEPI. +- flepi_prefix (str): Prefix for FLEPI. + +Description: +The function fetching_resume_files fetches resume files based on the provided parameters. It checks if the resume_location is an S3 path and decides to download from S3 or move files locally accordingly. + +Workflow: +1. Retrieves the environment variable SLURM_ARRAY_TASK_ID for the slot index. +2. Converts the discard_seeding boolean to a string "true" if it is True. +3. Creates a resume file name map using the create_resume_file_names_map function. +4. Checks if resume_location starts with "s3://": + - If yes, downloads the file from S3 using download_file_from_s3. + - If no, moves the file locally using move_file_at_local. +5. After pulling the input files, it will do a check. If input src_file does not exist, it will output these files. + If the input files exists, it is not pulled or copied to destination. It will raise FileExistsErrors. + +Example Usage: +To use this script, you can run it from the command line with the required options: +```sh +python script_name.py --resume_location "path/to/resume" --discard_seeding True --block_index "block123" --resume_run_index "run456" --flepi_run_index "run789" --flepi_prefix "prefix" +""" +import click +import os +from .utils import create_resume_file_names_map, download_file_from_s3, move_file_at_local + + +@click.command() +@click.option( + "--resume_location", + "resume_location", + envvar=["LAST_JOB_OUTPUT", "RESUME_LOCATION"], + type=click.STRING, + required=True, + help="the path for the last run's output", +) +@click.option( + "--discard_seeding", + "discard_seeding", + envvar="RESUME_DISCARD_SEEDING", + type=click.BOOL, + required=True, + help="required bool value for discarding seeding or not", +) +@click.option("--block_index", "flepi_block_index", envvar="FLEPI_BLOCK_INDEX", type=click.INT, required=True) +@click.option( + "--resume_run_index", "resume_run_index", envvar="RESUME_RUN_INDEX", type=click.STRING, required=True, +) +@click.option("--flepi_run_index", "flepi_run_index", envvar="FLEPI_RUN_INDEX", type=click.STRING, required=True) +@click.option("--flepi_prefix", "flepi_prefix", envvar="FLEPI_PREFIX", type=click.STRING, required=True) +def fetching_resume_files( + resume_location, discard_seeding, flepi_block_index, resume_run_index, flepi_run_index, flepi_prefix +): + flepi_slot_index = os.environ["SLURM_ARRAY_TASK_ID"] + if discard_seeding is True: + discard_seeding = "true" + + resume_file_name_map = create_resume_file_names_map( + resume_discard_seeding=discard_seeding, + flepi_block_index=str(flepi_block_index), + resume_run_index=resume_run_index, + flepi_prefix=flepi_prefix, + flepi_slot_index=flepi_slot_index, + flepi_run_index=flepi_run_index, + last_job_output=resume_location, + ) + if resume_location.startswith("s3://"): + download_file_from_s3(resume_file_name_map) + pull_check_for_s3(resume_file_name_map) + else: + move_file_at_local(resume_file_name_map) + pull_check(resume_file_name_map) + + +# Todo: Unit test +def pull_check_for_s3(file_name_map: dict[str, str]) -> None: + """ + Verifies the existence of specified files in an S3 bucket and checks if corresponding local files are present. + If a file in the S3 bucket does not exist or the local file is missing, it raises appropriate errors or prints a message. + + Parameters: + file_name_map (dict[str, str]): A dictionary where the keys are S3 URIs (Uniform Resource Identifiers) and the values are the corresponding local file paths. + + Dependencies: + - boto3: The AWS SDK for Python, used to interact with AWS services such as S3. + - botocore: The low-level core functionality of boto3. + - os: The standard library module for interacting with the operating system, used here to check for file existence. + + Functionality: + 1. Initialize S3 Client: The function initializes an S3 client using `boto3.client('s3')`. + 2. Iterate through S3 URIs: For each S3 URI in the `file_name_map` dictionary: + - Parse the Bucket and Object Key: Extracts the bucket name and object key from the S3 URI. + - Check if Object Exists in S3: Uses the `head_object` method to check if the object exists in the specified S3 bucket. + - Check Local File Existence: If the object exists in S3, it checks if the corresponding local file exists using `os.path.exists`. + - Handle Errors: + - If the object does not exist in S3, it catches the `ClientError` and prints a message indicating the missing S3 object. + - If the local file does not exist, it raises a `FileExistsError` indicating the local file is missing. + + Example Usage: + file_name_map = { + "s3://my-bucket/path/to/file1.txt": "/local/path/to/file1.txt", + "s3://my-bucket/path/to/file2.txt": "/local/path/to/file2.txt" + } + + pull_check_for_s3(file_name_map) + + Exceptions: + - FileExistsError: Raised if the corresponding local file for an existing S3 object is missing. + - botocore.exceptions.ClientError: Caught and handled to print a message if the S3 object does not exist. Other client errors are re-raised. + + Notes: + - Ensure that AWS credentials are configured properly for boto3 to access the S3 bucket. + - This function assumes that the S3 URIs provided are in the format `s3://bucket-name/path/to/object`. + """ + try: + import boto3 + from botocore.exceptions import ClientError + except ModuleNotFoundError: + raise ModuleNotFoundError(( + "No module named 'boto3', which is required for " + "gempyor.utils.download_file_from_s3. Please install the aws target." + )) + s3 = boto3.client("s3") + for s3_uri in file_name_map: + bucket = s3_uri.split("/")[2] + object = s3_uri[len(bucket) + 6 :] + try: + s3.head_object(Bucket=bucket, Key=object) + if os.path.exists(file_name_map[s3_uri]) is False: + raise FileExistsError(f"For {s3_uri}, it is not copied to {file_name_map[s3_uri]}.") + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "404": + print(f"Input {s3_uri} does not exist.") + else: + raise + + +# Todo: Unit Test +def pull_check(file_name_map: dict[str, str]) -> None: + """ + Verifies the existence of specified source files and checks if corresponding destination files are present. + If a source file does not exist or the destination file is missing, it raises appropriate errors or prints a message. + + Parameters: + file_name_map (dict[str, str]): A dictionary where the keys are source file paths and the values are the corresponding destination file paths. + + Dependencies: + - os: The standard library module for interacting with the operating system, used here to check for file existence. + + Functionality: + 1. Iterate through Source Files: For each source file path in the `file_name_map` dictionary: + - Check if Source File Exists: Uses `os.path.exists` to check if the source file exists. + - Check Destination File Existence: If the source file exists, it checks if the corresponding destination file exists using `os.path.exists`. + - Handle Errors: + - If the source file does not exist, it prints a message indicating the missing source file. + - If the destination file does not exist, it raises a `FileExistsError` indicating the destination file is missing. + + Example Usage: + file_name_map = { + "/path/to/source1.txt": "/path/to/destination1.txt", + "/path/to/source2.txt": "/path/to/destination2.txt" + } + + pull_check(file_name_map) + + Exceptions: + - FileExistsError: Raised if the corresponding destination file for an existing source file is missing. + + Notes: + - Ensure that the paths provided are valid and accessible on the file system. + """ + for src_file in file_name_map: + if os.path.exists(src_file): + if os.path.exists(file_name_map[src_file]) is False: + raise FileExistsError(f"For {src_file}, it is not copied to {file_name_map[src_file]}.") + else: + print(f"Input {src_file} does not exist.") diff --git a/flepimop/gempyor_pkg/src/gempyor/utils.py b/flepimop/gempyor_pkg/src/gempyor/utils.py index 25c33d574..e743126c4 100644 --- a/flepimop/gempyor_pkg/src/gempyor/utils.py +++ b/flepimop/gempyor_pkg/src/gempyor/utils.py @@ -909,7 +909,7 @@ def create_resume_file_names_map( liketype=liketype, ) input_file_name = output_file_name - if os.environ.get("FLEPI_BLOCK_INDEX") == "1": + if flepi_block_index == "1": input_file_name = create_resume_input_filename( resume_run_index=resume_run_index, flepi_prefix=flepi_prefix, diff --git a/flepimop/gempyor_pkg/tests/file_paths/test_create_file_name_for_push.py b/flepimop/gempyor_pkg/tests/file_paths/test_create_file_name_for_push.py new file mode 100644 index 000000000..42b73f68f --- /dev/null +++ b/flepimop/gempyor_pkg/tests/file_paths/test_create_file_name_for_push.py @@ -0,0 +1,52 @@ +from unittest.mock import patch +from gempyor.file_paths import create_file_name_for_push + + +class TestCreateFileNameForPush: + # Mock implementation of create_file_name for testing + def mocked_create_file_name( + self, + run_id, + prefix, + inference_filename_prefix, + inference_filepath_suffix, + index, + ftype, + extension, + ): + return f"{prefix}_{run_id}_{inference_filename_prefix}_{inference_filepath_suffix}_{index}_{ftype}.{extension}" + + # Test method for create_file_name_for_push + @patch("gempyor.file_paths.create_file_name") + def test_create_file_name_for_push(self, mock_create_file_name): + mock_create_file_name.side_effect = self.mocked_create_file_name + + flepi_run_index = "run123" + prefix = "testprefix" + flepi_slot_index = "42" + flepi_block_index = "3" + + expected_file_names = [ + f"testprefix_run123_000000042._chimeric/intermediate_3_seir.parquet", + f"testprefix_run123_000000042._chimeric/intermediate_3_hosp.parquet", + f"testprefix_run123_000000042._chimeric/intermediate_3_llik.parquet", + f"testprefix_run123_000000042._chimeric/intermediate_3_spar.parquet", + f"testprefix_run123_000000042._chimeric/intermediate_3_snpi.parquet", + f"testprefix_run123_000000042._chimeric/intermediate_3_hnpi.parquet", + f"testprefix_run123_000000042._chimeric/intermediate_3_hpar.parquet", + f"testprefix_run123_000000042._chimeric/intermediate_3_init.parquet", + f"testprefix_run123_000000042._chimeric/intermediate_3_seed.csv", + ] + + result = create_file_name_for_push( + flepi_run_index=flepi_run_index, + prefix=prefix, + flepi_slot_index=flepi_slot_index, + flepi_block_index=flepi_block_index, + ) + + # Assert the result matches the expected file names + assert result == expected_file_names + + # Assert that create_file_name was called the expected number of times + assert mock_create_file_name.call_count == 9 diff --git a/flepimop/gempyor_pkg/tests/resume_pull/test_flepimop_pull.py b/flepimop/gempyor_pkg/tests/resume_pull/test_flepimop_pull.py new file mode 100644 index 000000000..f27d5ef15 --- /dev/null +++ b/flepimop/gempyor_pkg/tests/resume_pull/test_flepimop_pull.py @@ -0,0 +1,68 @@ +import os +import pytest +from click.testing import CliRunner +from unittest.mock import patch +from gempyor.resume_pull import fetching_resume_files + + +@pytest.fixture +def runner(): + return CliRunner() + + +class TestFetchingResumeFiles: + @pytest.fixture(autouse=True) + def set_env(self): + with patch.dict(os.environ, {"SLURM_ARRAY_TASK_ID": "1"}): + yield + + def test_s3_resume_location(self, runner): + with patch("gempyor.resume_pull.download_file_from_s3") as mock_download, patch( + "gempyor.resume_pull.pull_check_for_s3" + ) as mock_pull_check_for_s3: + result = runner.invoke( + fetching_resume_files, + [ + "--resume_location", + "s3://some/location", + "--discard_seeding", + "true", + "--block_index", + 1, + "--resume_run_index", + "1", + "--flepi_run_index", + "1", + "--flepi_prefix", + "prefix123", + ], + ) + assert result.exit_code == 0 + mock_download.assert_called_once() + mock_pull_check_for_s3.assert_called_once() + + def test_local_resume_location(self, runner): + with patch("gempyor.resume_pull.move_file_at_local") as mock_move: + result = runner.invoke( + fetching_resume_files, + [ + "--resume_location", + "local/path", + "--discard_seeding", + "true", + "--block_index", + 1, + "--resume_run_index", + "run123", + "--flepi_run_index", + "run123", + "--flepi_prefix", + "prefix123", + ], + ) + assert result.exit_code == 0 + mock_move.assert_called_once() + + +if __name__ == "__main__": + pytest.main()