From a1ed59f2b2a7ba5b941cb8899451779e960f99fb Mon Sep 17 00:00:00 2001 From: Rocky Breslow <1774125+rbreslow@users.noreply.github.com> Date: Thu, 12 May 2022 15:24:16 -0400 Subject: [PATCH 1/4] Add Terraform configuration for AWS Batch - Create Batch resources like a job queue and compute environment. - Create a job definition for the ETL; thread through requisite environment variables. - Draft IAM resources that allow the ETL to read objects from S3. --- deployment/terraform/batch.tf | 109 ++++++++++++++++ .../cloud-config/batch-container-instance | 18 +++ deployment/terraform/database.tf | 6 +- deployment/terraform/firewall.tf | 62 +++++++++ deployment/terraform/iam.tf | 122 ++++++++++++++++++ .../job-definitions/image-deid-etl.json.tmpl | 51 ++++++++ deployment/terraform/variables.tf | 92 +++++++++++++ 7 files changed, 456 insertions(+), 4 deletions(-) create mode 100644 deployment/terraform/batch.tf create mode 100644 deployment/terraform/cloud-config/batch-container-instance create mode 100644 deployment/terraform/iam.tf create mode 100644 deployment/terraform/job-definitions/image-deid-etl.json.tmpl diff --git a/deployment/terraform/batch.tf b/deployment/terraform/batch.tf new file mode 100644 index 0000000..468fb18 --- /dev/null +++ b/deployment/terraform/batch.tf @@ -0,0 +1,109 @@ +# +# Security Group resources +# +resource "aws_security_group" "batch" { + name_prefix = "sgBatchContainerInstance-" + vpc_id = var.vpc_id + + tags = { + Name = "sgBatchContainerInstance" + } + + lifecycle { + create_before_destroy = true + } +} + +# +# Batch resources +# +resource "aws_launch_template" "default" { + name_prefix = "ltBatchContainerInstance-" + + block_device_mappings { + device_name = "/dev/xvda" + + ebs { + volume_size = var.batch_root_block_device_size + volume_type = var.batch_root_block_device_type + } + } + + user_data = base64encode(file("cloud-config/batch-container-instance")) +} + +resource "aws_batch_compute_environment" "default" { + compute_environment_name_prefix = "batch${local.short}-" + type = "MANAGED" + state = "ENABLED" + service_role = aws_iam_role.batch_service_role.arn + + compute_resources { + type = "SPOT" + allocation_strategy = var.batch_spot_fleet_allocation_strategy + bid_percentage = var.batch_spot_fleet_bid_percentage + + ec2_configuration { + image_type = "ECS_AL2" + } + + ec2_key_pair = aws_key_pair.bastion.key_name + + min_vcpus = var.batch_min_vcpus + max_vcpus = var.batch_max_vcpus + + launch_template { + launch_template_id = aws_launch_template.default.id + version = aws_launch_template.default.latest_version + } + + spot_iam_fleet_role = aws_iam_role.spot_fleet_service_role.arn + instance_role = aws_iam_instance_profile.ecs_instance_role.arn + + instance_type = var.batch_instance_types + + security_group_ids = [aws_security_group.batch.id] + subnets = var.vpc_private_subnet_ids + + tags = { + Name = "BatchWorker" + Project = var.project + Environment = var.environment + } + } + + depends_on = [aws_iam_role_policy_attachment.batch_service_role_policy] + + lifecycle { + create_before_destroy = true + } +} + +resource "aws_batch_job_queue" "default" { + name = "queue${local.short}" + priority = 1 + state = "ENABLED" + compute_environments = [aws_batch_compute_environment.default.arn] +} + +resource "aws_batch_job_definition" "default" { + name = "job${local.short}" + type = "container" + + container_properties = templatefile("${path.module}/job-definitions/image-deid-etl.json.tmpl", { + image_url = "${module.ecr.repository_url}:${var.image_tag}" + + image_deid_etl_vcpus = var.image_deid_etl_vcpus + image_deid_etl_memory = var.image_deid_etl_memory + + database_url = "postgresql://${var.rds_database_username}:${var.rds_database_password}@${module.database.hostname}:${module.database.port}/${var.rds_database_name}" + + flywheel_api_key = var.flywheel_api_key + flywheel_group = var.flywheel_group + orthanc_credentials = var.orthanc_credentials + orthanc_host = var.orthanc_host + orthanc_port = var.orthanc_port + + image_deid_etl_log_level = var.image_deid_etl_log_level + }) +} diff --git a/deployment/terraform/cloud-config/batch-container-instance b/deployment/terraform/cloud-config/batch-container-instance new file mode 100644 index 0000000..8e192d2 --- /dev/null +++ b/deployment/terraform/cloud-config/batch-container-instance @@ -0,0 +1,18 @@ +Content-Type: multipart/mixed; boundary="==BOUNDARY==" +MIME-Version: 1.0 + +--==BOUNDARY== +Content-Type: text/cloud-boothook; charset="us-ascii" + +# Manually mount unformatted instance store volumes. Mounting in a cloud-boothook +# makes it more likely the drive is mounted before the Docker daemon and ECS agent +# start, which helps mitigate potential race conditions. +# +# See: +# - https://docs.aws.amazon.com/AmazonECS/latest/developerguide/bootstrap_container_instance.html#bootstrap_docker_daemon +# - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/amazon-linux-ami-basics.html#supported-user-data-formats +mkfs.ext4 -E nodiscard /dev/nvme1n1 +mkdir -p /media/ephemeral0 +mount -t ext4 -o defaults,nofail,discard /dev/nvme1n1 /media/ephemeral0 + +--==BOUNDARY== \ No newline at end of file diff --git a/deployment/terraform/database.tf b/deployment/terraform/database.tf index c46b979..fbc8019 100644 --- a/deployment/terraform/database.tf +++ b/deployment/terraform/database.tf @@ -7,7 +7,7 @@ resource "aws_db_subnet_group" "default" { subnet_ids = var.vpc_private_subnet_ids tags = { - Name = "dbsngDatabaseServer" + Name = "dbsngDatabaseServer" } } @@ -57,9 +57,7 @@ resource "aws_db_parameter_group" "default" { } tags = { - Name = "dbpgDatabaseServer" - Project = var.project - Environment = var.environment + Name = "dbpgDatabaseServer" } lifecycle { diff --git a/deployment/terraform/firewall.tf b/deployment/terraform/firewall.tf index 58d19a1..b789684 100644 --- a/deployment/terraform/firewall.tf +++ b/deployment/terraform/firewall.tf @@ -11,6 +11,16 @@ resource "aws_security_group_rule" "bastion_rds_egress" { source_security_group_id = module.database.database_security_group_id } +resource "aws_security_group_rule" "bastion_ssh_egress" { + type = "egress" + from_port = 22 + to_port = 22 + protocol = "tcp" + + security_group_id = aws_security_group.bastion.id + source_security_group_id = aws_security_group.batch.id +} + # # RDS security group resources # @@ -24,3 +34,55 @@ resource "aws_security_group_rule" "rds_bastion_ingress" { source_security_group_id = aws_security_group.bastion.id } +resource "aws_security_group_rule" "rds_batch_ingress" { + type = "ingress" + from_port = module.database.port + to_port = module.database.port + protocol = "tcp" + + security_group_id = module.database.database_security_group_id + source_security_group_id = aws_security_group.batch.id +} + +# +# Batch container instance security group resources +# +resource "aws_security_group_rule" "batch_http_egress" { + type = "egress" + from_port = 80 + to_port = 80 + protocol = "tcp" + cidr_blocks = ["0.0.0.0/0"] + + security_group_id = aws_security_group.batch.id +} + +resource "aws_security_group_rule" "batch_https_egress" { + type = "egress" + from_port = 443 + to_port = 443 + protocol = "tcp" + cidr_blocks = ["0.0.0.0/0"] + + security_group_id = aws_security_group.batch.id +} + +resource "aws_security_group_rule" "batch_rds_egress" { + type = "egress" + from_port = module.database.port + to_port = module.database.port + protocol = "tcp" + + security_group_id = aws_security_group.batch.id + source_security_group_id = module.database.database_security_group_id +} + +resource "aws_security_group_rule" "batch_bastion_ingress" { + type = "ingress" + from_port = 22 + to_port = 22 + protocol = "tcp" + + security_group_id = aws_security_group.batch.id + source_security_group_id = aws_security_group.bastion.id +} diff --git a/deployment/terraform/iam.tf b/deployment/terraform/iam.tf new file mode 100644 index 0000000..5cf9437 --- /dev/null +++ b/deployment/terraform/iam.tf @@ -0,0 +1,122 @@ +# +# Batch IAM resources +# +data "aws_iam_policy_document" "batch_assume_role" { + statement { + effect = "Allow" + + principals { + type = "Service" + identifiers = ["batch.amazonaws.com"] + } + + actions = ["sts:AssumeRole"] + } +} + +resource "aws_iam_role" "batch_service_role" { + name_prefix = "batch${local.short}ServiceRole-" + assume_role_policy = data.aws_iam_policy_document.batch_assume_role.json +} + +resource "aws_iam_role_policy_attachment" "batch_service_role_policy" { + role = aws_iam_role.batch_service_role.name + policy_arn = var.aws_batch_service_role_policy_arn +} + +# +# Spot Fleet IAM resources +# +data "aws_iam_policy_document" "spot_fleet_assume_role" { + statement { + effect = "Allow" + + principals { + type = "Service" + identifiers = ["spotfleet.amazonaws.com"] + } + + actions = ["sts:AssumeRole"] + } +} + +resource "aws_iam_role" "spot_fleet_service_role" { + name_prefix = "fleet${local.short}ServiceRole-" + assume_role_policy = data.aws_iam_policy_document.spot_fleet_assume_role.json +} + +resource "aws_iam_role_policy_attachment" "spot_fleet_service_role_policy" { + role = aws_iam_role.spot_fleet_service_role.name + policy_arn = var.aws_spot_fleet_service_role_policy_arn +} + +# +# EC2 IAM resources +# +data "aws_iam_policy_document" "ec2_assume_role" { + statement { + effect = "Allow" + + principals { + type = "Service" + identifiers = ["ec2.amazonaws.com"] + } + + actions = ["sts:AssumeRole"] + } +} + +resource "aws_iam_role" "ecs_instance_role" { + name_prefix = "ecs${local.short}InstanceRole-" + assume_role_policy = data.aws_iam_policy_document.ec2_assume_role.json +} + +resource "aws_iam_role_policy_attachment" "ec2_service_role_policy" { + role = aws_iam_role.ecs_instance_role.name + policy_arn = var.aws_ec2_service_role_policy_arn +} + +resource "aws_iam_instance_profile" "ecs_instance_role" { + name = aws_iam_role.ecs_instance_role.name + role = aws_iam_role.ecs_instance_role.name +} + +# We need to data references to get a handle on these resources because +# they are managed out of state. +data "aws_kms_key" "d3b_phi_data" { + key_id = var.d3b_phi_data_kms_key_arn +} + +data "aws_s3_bucket" "d3b_phi_data" { + bucket = var.d3b_phi_data_bucket_name +} + +data "aws_iam_policy_document" "scoped_etl_read" { + statement { + effect = "Allow" + + actions = [ + "kms:Decrypt" + ] + + resources = [data.aws_kms_key.d3b_phi_data.arn] + } + + statement { + effect = "Allow" + + actions = [ + "s3:GetObject", + ] + + resources = [ + "${data.aws_s3_bucket.d3b_phi_data.arn}/*", + ] + } +} + +resource "aws_iam_role_policy" "scoped_etl_read" { + name_prefix = "S3ScopedEtlReadPolicy-" + role = aws_iam_role.ecs_instance_role.id + policy = data.aws_iam_policy_document.scoped_etl_read.json +} diff --git a/deployment/terraform/job-definitions/image-deid-etl.json.tmpl b/deployment/terraform/job-definitions/image-deid-etl.json.tmpl new file mode 100644 index 0000000..e1ee71a --- /dev/null +++ b/deployment/terraform/job-definitions/image-deid-etl.json.tmpl @@ -0,0 +1,51 @@ +{ + "image": "${image_url}", + "vcpus": ${image_deid_etl_vcpus}, + "memory": ${image_deid_etl_memory}, + "environment": [ + { + "name": "DATABASE_URL", + "value": "${database_url}" + }, + { + "name": "FLYWHEEL_API_KEY", + "value": "${flywheel_api_key}" + }, + { + "name": "FLYWHEEL_GROUP", + "value": "${flywheel_group}" + }, + { + "name": "ORTHANC_CREDENTIALS", + "value": "${orthanc_credentials}" + }, + { + "name": "ORTHANC_HOST", + "value": "${orthanc_host}" + }, + { + "name": "ORTHANC_PORT", + "value": "${orthanc_port}" + }, + { + "name": "IMAGE_DEID_ETL_LOG_LEVEL", + "value": "${image_deid_etl_log_level}" + } + ], + "volumes": [ + { + "host": { + "sourcePath": "/media/ephemeral0" + }, + "name": "ephemeral0" + } + ], + "mountPoints": [ + { + "containerPath": "/tmp", + "readOnly": false, + "sourceVolume": "ephemeral0" + } + ], + "privileged": false +} \ No newline at end of file diff --git a/deployment/terraform/variables.tf b/deployment/terraform/variables.tf index 2abac57..6b156e0 100644 --- a/deployment/terraform/variables.tf +++ b/deployment/terraform/variables.tf @@ -43,6 +43,41 @@ variable "bastion_public_key" { type = string } +variable "batch_root_block_device_size" { + type = number + default = 32 +} + +variable "batch_root_block_device_type" { + type = string + default = "gp3" +} + +variable "batch_spot_fleet_allocation_strategy" { + type = string + default = "SPOT_CAPACITY_OPTIMIZED" +} + +variable "batch_spot_fleet_bid_percentage" { + type = number + default = 64 +} + +variable "batch_min_vcpus" { + type = number + default = 0 +} + +variable "batch_max_vcpus" { + type = number + default = 256 +} + +variable "batch_instance_types" { + type = list(string) + default = ["c5d", "m5d", "z1d"] +} + variable "rds_allocated_storage" { type = number default = 32 @@ -209,3 +244,60 @@ variable "image_tag" { default = "latest" } +variable "image_deid_etl_vcpus" { + type = number + default = 1 +} + +variable "image_deid_etl_memory" { + type = number + default = 1024 +} + +variable "flywheel_api_key" { + type = string +} + +variable "flywheel_group" { + type = string +} + +variable "orthanc_credentials" { + type = string +} + +variable "orthanc_host" { + type = string +} + +variable "orthanc_port" { + type = number +} + +variable "image_deid_etl_log_level" { + type = string + default = "INFO" +} + +variable "d3b_phi_data_kms_key_arn" { + type = string +} + +variable "d3b_phi_data_bucket_name" { + type = string +} + +variable "aws_batch_service_role_policy_arn" { + type = string + default = "arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole" +} + +variable "aws_spot_fleet_service_role_policy_arn" { + type = string + default = "arn:aws:iam::aws:policy/service-role/AmazonEC2SpotFleetTaggingRole" +} + +variable "aws_ec2_service_role_policy_arn" { + type = string + default = "arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role" +} From bdcfc739aa9d6146bd10e601036d97352bcbb770 Mon Sep 17 00:00:00 2001 From: Rocky Breslow <1774125+rbreslow@users.noreply.github.com> Date: Thu, 12 May 2022 15:24:42 -0400 Subject: [PATCH 2/4] Write deployment documentation --- deployment/README.md | 65 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 deployment/README.md diff --git a/deployment/README.md b/deployment/README.md new file mode 100644 index 0000000..363d7c0 --- /dev/null +++ b/deployment/README.md @@ -0,0 +1,65 @@ +# Deployment + +- [AWS Credentials](#aws-credentials) +- [Publish Container Images](#publish-container-images) +- [Terraform](#terraform) +- [Database Migrations](#database-migrations) + +## AWS Credentials + +Follow [these](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-sso.html#sso-configure-profile) instructions to configure a named AWS profile: + +- Use https://d-906762f877.awsapps.com/start as the SSO start URL. +- Use `us-east-1` as the SSO region. + +Use the `aws sso login` command to refresh your login if your credentials expire: + +```console +$ aws sso login --profile my-profile +``` + +## Publish Container Images + +Build a container image for the Python application (`cibuild`) and publish it to Amazon ECR (`cipublish`): + +```console +$ ./scripts/cibuild +... + => => naming to docker.io/library/image-deid-etl:da845bf +$ ./scripts/cipublish +``` + +## Terraform + +Launch an instance of the included Terraform container image: + +```console +$ docker-compose -f docker-compose.ci.yml run --rm terraform +bash-5.1# +``` + +Once inside the context of the container image, set `GIT_COMMIT` to the tag of a published container image (e.g., `da845bf`): + +```console +bash-5.1# export GIT_COMMIT=da845bf +``` + +Finally, use `infra` to generate and apply a Terraform plan: + +```console +bash-5.1# ./scripts/infra plan +bash-5.1# ./scripts/infra apply +``` + +## Database Migrations + +Execute database migrations by submitting a Batch job that invokes the application's `initdb` command: + +- Select the most recent job definition for [jobImageDeidEtl](https://console.aws.amazon.com/batch/home?region=us-east-1#job-definition). +- Select **Submit new job**. +- Select the following: + - **Name**: Any one-off description of the work you're performing, e.g.: `initialize-the-database`. + - **Job queue**: `queueImageDeidEtl`. + - **Command**: `image-deid-etl initdb`. +- Click **Submit**. +- Monitor the log output of the submitted job by viewing the job detail and clicking the link under **Log group name**. From 4b002fdc59db910ee1cb3d904782e477fc2931a3 Mon Sep 17 00:00:00 2001 From: Rocky Breslow <1774125+rbreslow@users.noreply.github.com> Date: Thu, 12 May 2022 15:27:01 -0400 Subject: [PATCH 3/4] Add flag to CLI that offloads processing to Batch --- docker-compose.yml | 2 + src/image_deid_etl/image_deid_etl/__main__.py | 45 +++++++++++++++++-- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 1af5ba5..0556418 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,6 +16,8 @@ services: build: ./src/image_deid_etl env_file: .env environment: + - AWS_JOB_QUEUE=${AWS_JOB_QUEUE:-queueImageDeidEtl} + - AWS_JOB_DEFINITION=${AWS_JOB_DEFINITION:-jobImageDeidEtl} - DATABASE_URL=postgresql://image-deid-etl:image-deid-etl@database:5432/image-deid-etl - IMAGE_DEID_ETL_LOG_LEVEL=INFO volumes: diff --git a/src/image_deid_etl/image_deid_etl/__main__.py b/src/image_deid_etl/image_deid_etl/__main__.py index 5116233..0d86d0d 100644 --- a/src/image_deid_etl/image_deid_etl/__main__.py +++ b/src/image_deid_etl/image_deid_etl/__main__.py @@ -4,7 +4,9 @@ import os import sys import tempfile +import time +import boto3 import flywheel from sqlalchemy.exc import IntegrityError @@ -111,6 +113,35 @@ def validate(args) -> int: def run(args) -> int: + if args.batch: + batch = boto3.client("batch") + + aws_job_queue = os.getenv("AWS_JOB_QUEUE") + if aws_job_queue is None: + raise ImproperlyConfigured("You must supply a value for AWS_JOB_QUEUE.") + + aws_job_definition = os.getenv("AWS_JOB_DEFINITION") + if aws_job_definition is None: + raise ImproperlyConfigured( + "You must supply a value for AWS_JOB_DEFINITION." + ) + + for uuid in args.uuid: + response = batch.submit_job( + jobName=f"ProcessStudy_{uuid}", + jobQueue=aws_job_queue, + jobDefinition=aws_job_definition, + containerOverrides={"command": ["image-deid-etl", "run", uuid]}, + ) + + region = batch.meta.region_name + job_id = response["jobId"] + url = f"https://console.aws.amazon.com/batch/home?region={region}#jobs/detail/{job_id}" + + logger.info(f"Job started! View here:\n{url}") + + return 0 + local_path = f"{args.program}/{args.site}/" for uuid in args.uuid: @@ -159,8 +190,8 @@ def run(args) -> int: def upload2fw(args) -> int: - # This is a hack so that the Flywheel CLI can consume credentials - # from the environment. + # This is a hack so that the Flywheel CLI can consume credentials from the + # environment. with tempfile.TemporaryDirectory() as flywheel_user_home: logger.info(f"Writing fake Flywheel CLI credentials to {flywheel_user_home}...") # The Flywheel CLI will look for its config directory at this path. @@ -174,8 +205,9 @@ def upload2fw(args) -> int: source_path = f"{args.program}/{args.site}/NIfTIs/" if not os.path.exists(source_path): - # It appears that this can happen when sub_mapping is empty. - raise FileNotFoundError(f"{source_path} directory does not exist.") + raise FileNotFoundError( + f"{source_path} directory does not exist. Is sub_mapping empty?" + ) for fw_project in next(os.walk(source_path))[1]: # for each project dir proj_path = os.path.join(source_path, fw_project) @@ -264,6 +296,11 @@ def main() -> int: help="download images and run deidentification", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) + parser_run.add_argument( + "--batch", + action="store_true", + help="skip local processing and submit job(s) to AWS Batch", + ) parser_run.add_argument( "--skip-modalities", nargs="*", From 5c27460d6e128d49da0bf04855cd997f1bc042da Mon Sep 17 00:00:00 2001 From: Rocky Breslow <1774125+rbreslow@users.noreply.github.com> Date: Tue, 17 May 2022 11:54:55 -0400 Subject: [PATCH 4/4] Read subject ID mapping via Pandas API This attempts to fix a "Connection reset by peer" error we're receiving that may be a result of the call to io.BytesIO. I'd like to simplify things to use the native Pandas API for reading from S3 to see if the error goes away. Also, we're now reading these paths in from the environment. --- .env.sample | 2 ++ README.md | 2 ++ deployment/terraform/batch.tf | 3 +++ .../job-definitions/image-deid-etl.json.tmpl | 8 +++++++ deployment/terraform/variables.tf | 4 ++++ src/image_deid_etl/image_deid_etl/__main__.py | 10 +++++++-- .../image_deid_etl/main_pipeline.py | 21 +++++++++---------- src/image_deid_etl/requirements.txt | 3 ++- src/image_deid_etl/setup.cfg | 3 ++- 9 files changed, 41 insertions(+), 15 deletions(-) diff --git a/.env.sample b/.env.sample index 418957b..fafb8f9 100644 --- a/.env.sample +++ b/.env.sample @@ -4,3 +4,5 @@ FLYWHEEL_GROUP="" ORTHANC_CREDENTIALS="" ORTHANC_HOST="" ORTHANC_PORT=80 +PHI_BUCKET_NAME="phi-data-bucket" +SUBJECT_ID_MAPPING_PATH="s3://phi-data-bucket/subject_id_mapping.csv" diff --git a/README.md b/README.md index b44488b..f41bd7c 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,8 @@ Then, customize its contents with a text editor: - For `FLYWHEEL_GROUP`, specify either `d3b` or an alternative group created for testing (e.g., your name). - For `ORTHANC_CREDENTIALS`, use your Orthanc username and password specified like `username:password`. - For `ORTHANC_HOST`, specify the hostname (minus `http(s)://`) that you use to access Orthanc. +- For `PHI_DATA_BUCKET_NAME`, specify the bucket name where the ETL should backup NIfTI files. +- For `SUBJECT_ID_MAPPING_PATH`, specify the [path](https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html) to the CSV file containing subject ID mappings. Next, run `update` to build the container image and initialize the database: diff --git a/deployment/terraform/batch.tf b/deployment/terraform/batch.tf index 468fb18..6760208 100644 --- a/deployment/terraform/batch.tf +++ b/deployment/terraform/batch.tf @@ -104,6 +104,9 @@ resource "aws_batch_job_definition" "default" { orthanc_host = var.orthanc_host orthanc_port = var.orthanc_port + phi_data_bucket_name = var.d3b_phi_data_bucket_name + subject_id_mapping_path = var.subject_id_mapping_path + image_deid_etl_log_level = var.image_deid_etl_log_level }) } diff --git a/deployment/terraform/job-definitions/image-deid-etl.json.tmpl b/deployment/terraform/job-definitions/image-deid-etl.json.tmpl index e1ee71a..185f297 100644 --- a/deployment/terraform/job-definitions/image-deid-etl.json.tmpl +++ b/deployment/terraform/job-definitions/image-deid-etl.json.tmpl @@ -27,6 +27,14 @@ "name": "ORTHANC_PORT", "value": "${orthanc_port}" }, + { + "name": "PHI_DATA_BUCKET_NAME", + "value": "${phi_data_bucket_name}" + }, + { + "name": "SUBJECT_ID_MAPPING_PATH", + "value": "${subject_id_mapping_path}" + }, { "name": "IMAGE_DEID_ETL_LOG_LEVEL", "value": "${image_deid_etl_log_level}" diff --git a/deployment/terraform/variables.tf b/deployment/terraform/variables.tf index 6b156e0..67dfbf7 100644 --- a/deployment/terraform/variables.tf +++ b/deployment/terraform/variables.tf @@ -274,6 +274,10 @@ variable "orthanc_port" { type = number } +variable "subject_id_mapping_path" { + type = string +} + variable "image_deid_etl_log_level" { type = string default = "INFO" diff --git a/src/image_deid_etl/image_deid_etl/__main__.py b/src/image_deid_etl/image_deid_etl/__main__.py index 0d86d0d..6a6808e 100644 --- a/src/image_deid_etl/image_deid_etl/__main__.py +++ b/src/image_deid_etl/image_deid_etl/__main__.py @@ -4,7 +4,6 @@ import os import sys import tempfile -import time import boto3 import flywheel @@ -31,6 +30,11 @@ "You must supply a valid Flywheel group in FLYWHEEL_GROUP." ) +PHI_DATA_BUCKET_NAME = os.getenv("PHI_DATA_BUCKET_NAME") +if PHI_DATA_BUCKET_NAME is None: + raise ImproperlyConfigured( + "You must supply a valid S3 bucket in PHI_DATA_BUCKET_NAME." + ) # Configure Python's logging module. The Django project does a fantastic job explaining how logging works: # https://docs.djangoproject.com/en/4.0/topics/logging/ @@ -229,7 +233,9 @@ def add_fw_metadata(args) -> int: def s3_backup_niftis(args) -> int: local_path = f"{args.program}/{args.site}/" - s3_path = f"s3://d3b-phi-data-prd/imaging/radiology/{args.program}/{args.site}/" + s3_path = ( + f"s3://{PHI_DATA_BUCKET_NAME}/imaging/radiology/{args.program}/{args.site}/" + ) return os.system("aws s3 sync " + local_path + "NIfTIs/ " + s3_path + "NIfTIs/") diff --git a/src/image_deid_etl/image_deid_etl/main_pipeline.py b/src/image_deid_etl/image_deid_etl/main_pipeline.py index 78f6b13..473a471 100644 --- a/src/image_deid_etl/image_deid_etl/main_pipeline.py +++ b/src/image_deid_etl/image_deid_etl/main_pipeline.py @@ -1,9 +1,8 @@ -# NOTE: cbtn-all is HARD-CODED in cbtn_subject_info, will need to change this when ADAPT sets up routine CSV updating import logging -import boto3 -import pandas as pd -import io +import pandas + +from image_deid_etl.exceptions import ImproperlyConfigured from image_deid_etl.external_data_handling import * from image_deid_etl.images_no_save import * @@ -11,12 +10,12 @@ todays_date = datetime.now().strftime('%Y-%m-%d') -# point to cbtn-all CSV file from s3 using boto3 & default AWS profile -table_fn = 'cbtn-all_identified_2022-03-17.csv' -bucket_name = 'd3b-phi-data-prd' -obj_path = f'imaging/{table_fn}' -s3_client = boto3.client('s3') -obj = s3_client.get_object(Bucket=bucket_name, Key=obj_path) +SUBJECT_ID_MAPPING_PATH = os.getenv("SUBJECT_ID_MAPPING_PATH") +if SUBJECT_ID_MAPPING_PATH is None: + raise ImproperlyConfigured( + "You must supply a valid string path in SUBJECT_ID_MAPPING_PATH." + ) + def subject_info(local_path, program, file_dir, validate=0): # site_name = local_path.split('/')[1] @@ -29,7 +28,7 @@ def subject_info(local_path, program, file_dir, validate=0): if program == 'cbtn': # get CBTN Subject IDs try: - cbtn_all_df = pd.read_csv(io.BytesIO(obj['Body'].read())) + cbtn_all_df = pandas.read_csv(SUBJECT_ID_MAPPING_PATH) except IndexError as error: logger.error("Missing CBTN subject ID .csv file from internal EIG database: %r", error) try: diff --git a/src/image_deid_etl/requirements.txt b/src/image_deid_etl/requirements.txt index b326088..2a04b16 100644 --- a/src/image_deid_etl/requirements.txt +++ b/src/image_deid_etl/requirements.txt @@ -1,3 +1,4 @@ +boto3==1.21.34 flywheel-sdk==15.8.0 nibabel==3.2.2 numpy==1.22.3 @@ -8,6 +9,6 @@ psycopg2==2.9.3 pydicom==2.2.2 python-magic==0.4.25 requests==2.27.1 +s3fs>=0.4.0 # Relaxing the version constraint to avoid botocore incompatibilities. sqlalchemy==1.4.32 urllib3==1.26.8 -boto3==1.21.34 diff --git a/src/image_deid_etl/setup.cfg b/src/image_deid_etl/setup.cfg index d1d8467..585ce0e 100644 --- a/src/image_deid_etl/setup.cfg +++ b/src/image_deid_etl/setup.cfg @@ -9,6 +9,7 @@ url = https://github.com/d3b-center/image-deid-etl packages = find: python_requires = >=3.9,<3.10 install_requires = + boto3 flywheel-sdk nibabel numpy @@ -19,9 +20,9 @@ install_requires = pydicom python-magic requests + s3fs SQLAlchemy urllib3 - boto3 [options.entry_points] console_scripts =