diff --git a/CHANGELOG.md b/CHANGELOG.md index 619648b20..12ed1eaf9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ # Upcoming +## v.0.6.3 + ## Bug Fixes * Fixed a setup bug introduced in `v0.6.2` where installation process created a directory instead of a file for test configuration file [PR #1070](https://github.com/catalystneuro/neuroconv/pull/1070) * The method `get_extractor` now works for `MockImagingInterface` [PR #1076](https://github.com/catalystneuro/neuroconv/pull/1076) @@ -7,6 +9,7 @@ ## Deprecations ## Features +* Added automated EFS volume creation and mounting to the `submit_aws_job` helper function. [PR #1018](https://github.com/catalystneuro/neuroconv/pull/1018) ## Improvements @@ -26,6 +29,7 @@ * Added `get_stream_names` to `OpenEphysRecordingInterface`: [PR #1039](https://github.com/catalystneuro/neuroconv/pull/1039) * Most data interfaces and converters now use Pydantic to validate their inputs, including existence of file and folder paths. [PR #1022](https://github.com/catalystneuro/neuroconv/pull/1022) * All remaining data interfaces and converters now use Pydantic to validate their inputs, including existence of file and folder paths. [PR #1055](https://github.com/catalystneuro/neuroconv/pull/1055) +* Added automated EFS volume creation and mounting to the `submit_aws_job` helper function. [PR #1018](https://github.com/catalystneuro/neuroconv/pull/1018) ### Improvements diff --git a/setup.py b/setup.py index 5beebbf5f..53314e7e3 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ def read_requirements(file): extras_require = defaultdict(list) -extras_require["full"] = ["dandi>=0.58.1", "hdf5plugin"] +extras_require["full"] = ["dandi>=0.58.1", "hdf5plugin", "boto3"] for modality in ["ophys", "ecephys", "icephys", "behavior", "text"]: modality_path = root / "src" / "neuroconv" / "datainterfaces" / modality diff --git a/src/neuroconv/tools/aws/_submit_aws_batch_job.py b/src/neuroconv/tools/aws/_submit_aws_batch_job.py index 0d36bee7f..9e3ba0488 100644 --- a/src/neuroconv/tools/aws/_submit_aws_batch_job.py +++ b/src/neuroconv/tools/aws/_submit_aws_batch_job.py @@ -14,6 +14,7 @@ def submit_aws_batch_job( docker_image: str, commands: Optional[list[str]] = None, environment_variables: Optional[dict[str, str]] = None, + efs_volume_name: Optional[str] = None, job_dependencies: Optional[list[dict[str, str]]] = None, status_tracker_table_name: str = "neuroconv_batch_status_tracker", iam_role_name: str = "neuroconv_batch_role", @@ -42,6 +43,9 @@ def submit_aws_batch_job( E.g., `commands=["echo", "'Hello, World!'"]`. environment_variables : dict, optional A dictionary of environment variables to pass to the Docker container. + efs_volume_name : str, optional + The name of an EFS volume to be created and attached to the job. + The path exposed to the container will always be `/mnt/efs`. job_dependencies : list of dict A list of job dependencies for this job to trigger. Structured as follows: [ @@ -88,6 +92,7 @@ def submit_aws_batch_job( import boto3 region = region or "us-east-2" + subregion = region + "a" # For anything that requires subregion, always default to "a" aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None) aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None) @@ -116,6 +121,12 @@ def submit_aws_batch_job( aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, ) + efs_client = boto3.client( + service_name="efs", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) # Get the tracking table and IAM role table = _create_or_get_status_tracker_table( @@ -131,10 +142,12 @@ def submit_aws_batch_job( job_queue_name=job_queue_name, compute_environment_name=compute_environment_name, batch_client=batch_client ) + efs_id = _create_or_get_efs_id(efs_volume_name=efs_volume_name, efs_client=efs_client, region=region) job_definition_name = job_definition_name or _generate_job_definition_name( docker_image=docker_image, minimum_worker_ram_in_gib=minimum_worker_ram_in_gib, minimum_worker_cpus=minimum_worker_cpus, + efs_id=efs_id, ) job_definition_arn = _ensure_job_definition_exists_and_get_arn( job_definition_name=job_definition_name, @@ -143,6 +156,7 @@ def submit_aws_batch_job( minimum_worker_cpus=minimum_worker_cpus, role_info=iam_role_info, batch_client=batch_client, + efs_id=efs_id, ) # Submit job and update status tracker @@ -160,6 +174,7 @@ def submit_aws_batch_job( container_overrides["environment"] = [{key: value} for key, value in environment_variables.items()] if commands is not None: container_overrides["command"] = commands + job_submission_info = batch_client.submit_job( jobName=job_name, dependsOn=job_dependencies, @@ -180,6 +195,7 @@ def submit_aws_batch_job( table.put_item(Item=table_submission_info) info = dict(job_submission_info=job_submission_info, table_submission_info=table_submission_info) + return info @@ -305,8 +321,8 @@ def _ensure_compute_environment_exists( "type": "EC2", "allocationStrategy": "BEST_FIT", # Note: not currently supporting spot due to interruptibility "instanceTypes": ["optimal"], - "minvCpus": 1, - "maxvCpus": 8, # Not: not currently exposing control over this since these are mostly I/O intensive + "minvCpus": 0, # Note: if not zero, will always keep an instance running in active state on standby + "maxvCpus": 8, # Note: not currently exposing control over this since these are mostly I/O intensive "instanceRole": "ecsInstanceRole", # Security groups and subnets last updated on 8/4/2024 "securityGroupIds": ["sg-001699e5b7496b226"], @@ -391,9 +407,20 @@ def _ensure_job_queue_exists( computeEnvironmentOrder=[ dict(order=1, computeEnvironment=compute_environment_name), ], + # Note: boto3 annotates the reason as a generic string + # But really it is Literal[ + # "MISCONFIGURATION:COMPUTE_ENVIRONMENT_MAX_RESOURCE", "MISCONFIGURATION:JOB_RESOURCE_REQUIREMENT" + # ] + # And we should have limits on both jobStateTimeLimitActions=[ dict( - reason="Avoid zombie jobs.", + reason="MISCONFIGURATION:COMPUTE_ENVIRONMENT_MAX_RESOURCE", + state="RUNNABLE", + maxTimeSeconds=minimum_time_to_kill_in_seconds, + action="CANCEL", + ), + dict( + reason="MISCONFIGURATION:JOB_RESOURCE_REQUIREMENT", state="RUNNABLE", maxTimeSeconds=minimum_time_to_kill_in_seconds, action="CANCEL", @@ -418,11 +445,71 @@ def _ensure_job_queue_exists( return None +def _create_or_get_efs_id( + efs_volume_name: Optional[str], efs_client: "boto3.client.efs", region: str = "us-east-2" +) -> Optional[str]: # pragma: no cover + if efs_volume_name is None: + return None + + if region != "us-east-2": + raise NotImplementedError("EFS volumes are only supported in us-east-2 for now.") + + available_efs_volumes = efs_client.describe_file_systems() + matching_efs_volumes = [ + file_system + for file_system in available_efs_volumes["FileSystems"] + for tag in file_system["Tags"] + if tag["Key"] == "Name" and tag["Value"] == efs_volume_name + ] + + if len(matching_efs_volumes) > 1: + efs_volume = matching_efs_volumes[0] + efs_id = efs_volume["FileSystemId"] + + return efs_id + + # Existing volume not found - must create a fresh one and set mount targets on it + efs_volume = efs_client.create_file_system( + PerformanceMode="generalPurpose", # Only type supported in one-zone + Encrypted=False, + ThroughputMode="elastic", + # TODO: figure out how to make job spawn only on subregion for OneZone discount + # AvailabilityZoneName=subregion, + Backup=False, + Tags=[{"Key": "Name", "Value": efs_volume_name}], + ) + efs_id = efs_volume["FileSystemId"] + + # Takes a while to spin up - cannot assign mount targets until it is ready + # TODO: in a follow-up replace with more robust checking mechanism + time.sleep(60) + + # TODO: in follow-up, figure out how to fetch this automatically and from any region + # (might even resolve those previous OneZone issues) + region_to_subnet_id = { + "us-east-2a": "subnet-0890a93aedb42e73e", + "us-east-2b": "subnet-0e20bbcfb951b5387", + "us-east-2c": "subnet-0680e07980538b786", + } + for subnet_id in region_to_subnet_id.values(): + efs_client.create_mount_target( + FileSystemId=efs_id, + SubnetId=subnet_id, + SecurityGroups=[ + "sg-001699e5b7496b226", + ], + ) + time.sleep(60) # Also takes a while to create the mount targets so add some buffer time + + return efs_id + + def _generate_job_definition_name( *, docker_image: str, minimum_worker_ram_in_gib: int, minimum_worker_cpus: int, + efs_id: Optional[str] = None, ) -> str: # pragma: no cover """ Generate a job definition name for the AWS Batch job. @@ -449,6 +536,8 @@ def _generate_job_definition_name( job_definition_name += f"_{parsed_docker_image_name}-image" job_definition_name += f"_{minimum_worker_ram_in_gib}-GiB-RAM" job_definition_name += f"_{minimum_worker_cpus}-CPU" + if efs_id is not None: + job_definition_name += f"_{efs_id}" if docker_tag is None or docker_tag == "latest": date = datetime.now().strftime("%Y-%m-%d") job_definition_name += f"_created-on-{date}" @@ -464,6 +553,7 @@ def _ensure_job_definition_exists_and_get_arn( minimum_worker_cpus: int, role_info: dict, batch_client: "boto3.client.Batch", + efs_id: Optional[str] = None, max_retries: int = 12, ) -> str: # pragma: no cover """ @@ -494,6 +584,9 @@ def _ensure_job_definition_exists_and_get_arn( The IAM role information for the job. batch_client : boto3.client.Batch The AWS Batch client to use for the job. + efs_id : str, optional + The EFS volume information for the job. + The path exposed to the container will always be `/mnt/efs`. max_retries : int, default: 12 If the job definition does not already exist, then this is the maximum number of times to synchronously check for its successful creation before erroring. @@ -534,6 +627,20 @@ def _ensure_job_definition_exists_and_get_arn( minimum_time_to_kill_in_days = 1 # Note: eventually consider exposing this for very long jobs? minimum_time_to_kill_in_seconds = minimum_time_to_kill_in_days * 24 * 60 * 60 + volumes = [] + mountPoints = [] + if efs_id is not None: + volumes = [ + { + "name": "neuroconv_batch_efs_mounted", + "efsVolumeConfiguration": { + "fileSystemId": efs_id, + "transitEncryption": "DISABLED", + }, + }, + ] + mountPoints = [{"containerPath": "/mnt/efs/", "readOnly": False, "sourceVolume": "neuroconv_batch_efs_mounted"}] + # batch_client.register_job_definition() is not synchronous and so we need to wait a bit afterwards batch_client.register_job_definition( jobDefinitionName=job_definition_name, @@ -542,9 +649,13 @@ def _ensure_job_definition_exists_and_get_arn( containerProperties=dict( image=docker_image, resourceRequirements=resource_requirements, - jobRoleArn=role_info["Role"]["Arn"], - executionRoleArn=role_info["Role"]["Arn"], + # TODO: investigate if any IAM role is explicitly needed in conjunction with the credentials + # jobRoleArn=role_info["Role"]["Arn"], + # executionRoleArn=role_info["Role"]["Arn"], + volumes=volumes, + mountPoints=mountPoints, ), + platformCapabilities=["EC2"], ) job_definition_request = batch_client.describe_job_definitions(jobDefinitions=[job_definition_with_revision]) diff --git a/tests/test_minimal/test_tools/aws_tools.py b/tests/test_minimal/test_tools/aws_tools.py index 69de5d31a..2e7598178 100644 --- a/tests/test_minimal/test_tools/aws_tools.py +++ b/tests/test_minimal/test_tools/aws_tools.py @@ -1,3 +1,4 @@ +import datetime import os import time @@ -5,6 +6,8 @@ from neuroconv.tools.aws import submit_aws_batch_job +_RETRY_STATES = ["RUNNABLE", "PENDING", "STARTING", "RUNNING"] + def test_submit_aws_batch_job(): region = "us-east-2" @@ -35,14 +38,24 @@ def test_submit_aws_batch_job(): time.sleep(60) job_id = info["job_submission_info"]["jobId"] + job = None + max_retries = 10 + retry = 0 + while retry < max_retries: + job_description_response = batch_client.describe_jobs(jobs=[job_id]) + assert job_description_response["ResponseMetadata"]["HTTPStatusCode"] == 200 + + jobs = job_description_response["jobs"] + assert len(jobs) == 1 - all_jobs_response = batch_client.describe_jobs(jobs=[job_id]) - assert all_jobs_response["ResponseMetadata"]["HTTPStatusCode"] == 200 + job = jobs[0] - jobs = all_jobs_response["jobs"] - assert len(jobs) == 1 + if job["status"] in _RETRY_STATES: + retry += 1 + time.sleep(60) + else: + break - job = jobs[0] assert job["jobName"] == job_name assert "neuroconv_batch_queue" in job["jobQueue"] assert "neuroconv_batch_ubuntu-latest-image_4-GiB-RAM_4-CPU" in job["jobDefinition"] @@ -106,18 +119,29 @@ def test_submit_aws_batch_job_with_dependencies(): ) # Wait for AWS to process the jobs - time.sleep(120) + time.sleep(60) job_id_1 = job_info_1["job_submission_info"]["jobId"] job_id_2 = job_info_2["job_submission_info"]["jobId"] + job_1 = None + max_retries = 10 + retry = 0 + while retry < max_retries: + all_job_descriptions_response = batch_client.describe_jobs(jobs=[job_id_1, job_id_2]) + assert all_job_descriptions_response["ResponseMetadata"]["HTTPStatusCode"] == 200 + + jobs_by_id = {job["jobId"]: job for job in all_job_descriptions_response["jobs"]} + assert len(jobs_by_id) == 2 - all_jobs_response = batch_client.describe_jobs(jobs=[job_id_1, job_id_2]) - assert all_jobs_response["ResponseMetadata"]["HTTPStatusCode"] == 200 + job_1 = jobs_by_id[job_id_1] + job_2 = jobs_by_id[job_id_2] - jobs_by_id = {job["jobId"]: job for job in all_jobs_response["jobs"]} - assert len(jobs_by_id) == 2 + if job_1["status"] in _RETRY_STATES or job_2["status"] in _RETRY_STATES: + retry += 1 + time.sleep(60) + else: + break - job_1 = jobs_by_id[job_id_1] assert job_1["jobName"] == job_name_1 assert "neuroconv_batch_queue" in job_1["jobQueue"] assert "neuroconv_batch_ubuntu-latest-image_4-GiB-RAM_4-CPU" in job_1["jobDefinition"] @@ -156,3 +180,120 @@ def test_submit_aws_batch_job_with_dependencies(): table.update_item( Key={"id": table_submission_id_2}, AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed."}} ) + + +def test_submit_aws_batch_job_with_efs_mount(): + """ + It was confirmed manually that a job using this definition will fail if the /mnt/efs/ directory does not exist. + + It is, however, prohibitively difficult to automatically check if the file exists on the EFS volume. + + If desired, you can manually check the EFS volume by following these instructions: + https://repost.aws/knowledge-center/efs-mount-automount-unmount-steps + """ + region = "us-east-2" + aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None) + aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None) + + dynamodb_resource = boto3.resource( + service_name="dynamodb", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + batch_client = boto3.client( + service_name="batch", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + efs_client = boto3.client( + service_name="efs", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + + job_name = "test_submit_aws_batch_job_with_efs" + docker_image = "ubuntu:latest" + date = datetime.datetime.now().date().strftime("%y%m%d") + commands = ["touch", f"/mnt/efs/test_{date}.txt"] + + # TODO: to reduce costs even more, find a good combinations of memory/CPU to minimize size of instance + efs_volume_name = f"test_neuroconv_batch_with_efs_{date}" + info = submit_aws_batch_job( + job_name=job_name, + docker_image=docker_image, + commands=commands, + efs_volume_name=efs_volume_name, + ) + + # Wait for AWS to process the job + time.sleep(60) + + job_id = info["job_submission_info"]["jobId"] + job = None + max_retries = 10 + retry = 0 + while retry < max_retries: + job_description_response = batch_client.describe_jobs(jobs=[job_id]) + assert job_description_response["ResponseMetadata"]["HTTPStatusCode"] == 200 + + jobs = job_description_response["jobs"] + assert len(jobs) == 1 + + job = jobs[0] + + if job["status"] in _RETRY_STATES: + retry += 1 + time.sleep(60) + else: + break + + # Check EFS specific details + efs_volumes = efs_client.describe_file_systems() + matching_efs_volumes = [ + file_system + for file_system in efs_volumes["FileSystems"] + for tag in file_system["Tags"] + if tag["Key"] == "Name" and tag["Value"] == efs_volume_name + ] + assert len(matching_efs_volumes) == 1 + efs_volume = matching_efs_volumes[0] + efs_id = efs_volume["FileSystemId"] + + # Check normal job completion + assert job["jobName"] == job_name + assert "neuroconv_batch_queue" in job["jobQueue"] + assert "fs-" in job["jobDefinition"] + assert job["status"] == "SUCCEEDED" + + status_tracker_table_name = "neuroconv_batch_status_tracker" + table = dynamodb_resource.Table(name=status_tracker_table_name) + table_submission_id = info["table_submission_info"]["id"] + + table_item_response = table.get_item(Key={"id": table_submission_id}) + assert table_item_response["ResponseMetadata"]["HTTPStatusCode"] == 200 + + table_item = table_item_response["Item"] + assert table_item["job_name"] == job_name + assert table_item["job_id"] == job_id + assert table_item["status"] == "Job submitted..." + + table.update_item( + Key={"id": table_submission_id}, + AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed - cleaning up..."}}, + ) + + # Cleanup EFS after testing is complete - must clear mount targets first, then wait before deleting the volume + # TODO: cleanup job definitions? (since built daily) + mount_targets = efs_client.describe_mount_targets(FileSystemId=efs_id) + for mount_target in mount_targets["MountTargets"]: + efs_client.delete_mount_target(MountTargetId=mount_target["MountTargetId"]) + + time.sleep(60) + efs_client.delete_file_system(FileSystemId=efs_id) + + table.update_item( + Key={"id": table_submission_id}, AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed."}} + )