From 88426210d6d3141e7f94a99f8a929c24e41b66cb Mon Sep 17 00:00:00 2001 From: William Putra Intan <61998484+williamputraintan@users.noreply.github.com> Date: Mon, 23 Sep 2024 14:22:56 +1000 Subject: [PATCH] Feat (MM): Custom CSV Importer for MM (#566) --- .../stacks/metadata-manager/README.md | 27 ++ .../app/management/commands/load_from_csv.py | 22 ++ .../metadata-manager/app/models/library.py | 13 + .../stacks/metadata-manager/deploy/README.md | 32 +++ .../construct/lambda-load-custom-csv/index.ts | 54 ++++ .../lambda-load-custom-csv/lambda.Dockerfile | 12 + .../stacks/metadata-manager/deploy/stack.ts | 8 + .../docs/architecture.drawio.svg | 2 +- .../handler/load_custom_metadata_csv.py | 35 +++ .../proc/service/load_csv_srv.py | 243 ++++++++++++++++++ .../proc/service/tracking_sheet_srv.py | 78 +----- .../metadata-manager/proc/service/utils.py | 62 +++++ 12 files changed, 511 insertions(+), 77 deletions(-) create mode 100644 lib/workload/stateless/stacks/metadata-manager/app/management/commands/load_from_csv.py create mode 100644 lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/index.ts create mode 100644 lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/lambda.Dockerfile create mode 100644 lib/workload/stateless/stacks/metadata-manager/handler/load_custom_metadata_csv.py create mode 100644 lib/workload/stateless/stacks/metadata-manager/proc/service/load_csv_srv.py diff --git a/lib/workload/stateless/stacks/metadata-manager/README.md b/lib/workload/stateless/stacks/metadata-manager/README.md index 559423a14..e1dbf3ff8 100644 --- a/lib/workload/stateless/stacks/metadata-manager/README.md +++ b/lib/workload/stateless/stacks/metadata-manager/README.md @@ -100,6 +100,33 @@ Some important notes of the sync: Please refer to the [tracking-sheet-service](proc/service/tracking_sheet_srv.py) implementation. +### Custom CSV File Loader + +The application also supports loading data from a custom CSV file. The CSV file should have the following columns: + +| Sheet Header | Table | Field Name | +|----------------------|--------------|--------------------| +| Individual_id | `Individual` | individual_id | +| individual_id_source | `Individual` | subject_id | +| subject_id | `Subject` | subject_id | +| sample_id | `Sample` | sample_id | +| external_sample_id | `Sample` | external_sample_id | +| source | `Sample` | source | +| library_id | `Library` | library_id | +| phenotype | `Library` | phenotype | +| workflow | `Library` | workflow | +| quality | `Library` | quality | +| type | `Library` | type | +| coverage | `Library` | coverage | +| assay | `Library` | assay | +| project_name | `Project` | project_id | +| project_owner | `Contact` | contact_id | + +The CSV file should be in a presigned URL format, where the loader will read and insert to the database. +To trigger the loader please look at `./deploy/README.md` for more info. + +Please refer to the [load-csv-service](proc/service/load_csv_srv.py) implementation. + ### Audit Data The application is configured with [django-simple-history](https://django-simple-history.readthedocs.io/en/latest/) diff --git a/lib/workload/stateless/stacks/metadata-manager/app/management/commands/load_from_csv.py b/lib/workload/stateless/stacks/metadata-manager/app/management/commands/load_from_csv.py new file mode 100644 index 000000000..774ab4e3b --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/app/management/commands/load_from_csv.py @@ -0,0 +1,22 @@ +import logging +from django.core.management import BaseCommand +from libumccr import libjson + +from handler.load_custom_metadata_csv import handler + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +class Command(BaseCommand): + help = "Trigger lambda handler for to sync metadata from csv url" + + def handle(self, *args, **options): + event = { + "url": "SOME_URL", + } + + print(f"Trigger lambda handler for sync tracking sheet. Event {libjson.dumps(event)}") + result = handler(event, {}) + + print(f"result: {libjson.dumps(result)}") diff --git a/lib/workload/stateless/stacks/metadata-manager/app/models/library.py b/lib/workload/stateless/stacks/metadata-manager/app/models/library.py index 4e77f834d..2868d617c 100644 --- a/lib/workload/stateless/stacks/metadata-manager/app/models/library.py +++ b/lib/workload/stateless/stacks/metadata-manager/app/models/library.py @@ -112,3 +112,16 @@ class Library(BaseModel): # history history = HistoricalRecords(m2m_fields=[project_set]) + + +def sanitize_library_coverage(value: str): + """ + convert value that is valid in the tracking sheet to return a value that is recognizable by the Django Model + """ + try: + # making coverage is float-able type + lib_coverage = float(value) + return f'{lib_coverage}' + + except (ValueError, TypeError): + return None diff --git a/lib/workload/stateless/stacks/metadata-manager/deploy/README.md b/lib/workload/stateless/stacks/metadata-manager/deploy/README.md index 554400d13..ad5a1de9e 100644 --- a/lib/workload/stateless/stacks/metadata-manager/deploy/README.md +++ b/lib/workload/stateless/stacks/metadata-manager/deploy/README.md @@ -58,3 +58,35 @@ aws lambda invoke \ --cli-binary-format raw-in-base64-out \ res.json ``` + +### CustomCsvLambda + +- Load tracking sheet data from csv presigned url + +To manually trigger the sync, the lambda ARN is stored in the SSM Parameter Store named +`/orcabus/metadata-manager/load-custom-csv-lambda-arn`. + +To query in a local terminal + +```sh +load_custom_csv_lambda_arn=$(aws ssm get-parameter --name '/orcabus/metadata-manager/load-custom-csv-lambda-arn' --with-decryption | jq -r .Parameter.Value) +``` + +The lambda handler will accept a json which only accepts a single key `url` which is the presigned url of the csv file. + +```json +{ + "url": "https://example.com/csv" +} +``` + +Invoking lambda cmd: + +```sh +aws lambda invoke \ + --function-name $load_custom_csv_lambda_arn \ + --invocation-type Event \ + --payload '{ "url": "https://the.url.csv" }' \ + --cli-binary-format raw-in-base64-out \ + res.json +``` diff --git a/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/index.ts b/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/index.ts new file mode 100644 index 000000000..46ccf74ef --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/index.ts @@ -0,0 +1,54 @@ +import path from 'path'; +import { Construct } from 'constructs'; +import { Duration } from 'aws-cdk-lib'; +import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha'; +import { ISecret } from 'aws-cdk-lib/aws-secretsmanager'; +import { StringParameter } from 'aws-cdk-lib/aws-ssm'; +import { + DockerImageFunction, + DockerImageFunctionProps, + DockerImageCode, +} from 'aws-cdk-lib/aws-lambda'; + +type LambdaProps = { + /** + * The basic common lambda properties that it should inherit from + */ + basicLambdaConfig: Partial; + /** + * The secret for the db connection where the lambda will need access to + */ + dbConnectionSecret: ISecret; +}; + +export class LambdaLoadCustomCSVConstruct extends Construct { + private readonly lambda: PythonFunction; + + constructor(scope: Construct, id: string, lambdaProps: LambdaProps) { + super(scope, id); + + this.lambda = new DockerImageFunction(this, 'LoadCustomCSVLambda', { + environment: { + ...lambdaProps.basicLambdaConfig.environment, + }, + securityGroups: lambdaProps.basicLambdaConfig.securityGroups, + vpc: lambdaProps.basicLambdaConfig.vpc, + vpcSubnets: lambdaProps.basicLambdaConfig.vpcSubnets, + architecture: lambdaProps.basicLambdaConfig.architecture, + code: DockerImageCode.fromImageAsset(path.join(__dirname, '../../../'), { + file: 'deploy/construct/lambda-load-custom-csv/lambda.Dockerfile', + }), + timeout: Duration.minutes(15), + memorySize: 4096, + }); + + lambdaProps.dbConnectionSecret.grantRead(this.lambda); + + // We need to store this lambda ARN somewhere so that we could refer when need to load this manually + const ssmParameter = new StringParameter(this, 'LoadCustomCSVLambdaArnParameterStore', { + parameterName: '/orcabus/metadata-manager/load-custom-csv-lambda-arn', + description: 'The ARN of the lambda that load metadata from a presigned URL CSV file', + stringValue: this.lambda.functionArn, + }); + } +} diff --git a/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/lambda.Dockerfile b/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/lambda.Dockerfile new file mode 100644 index 000000000..f880a59ed --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/lambda.Dockerfile @@ -0,0 +1,12 @@ +FROM public.ecr.aws/lambda/python:3.12 + +WORKDIR ${LAMBDA_TASK_ROOT} + +# COPY all files +COPY . . + +# Install the specified packages +RUN pip install -r deps/requirements-full.txt + +# Specify handler +CMD [ "handler.load_custom_metadata_csv.handler" ] diff --git a/lib/workload/stateless/stacks/metadata-manager/deploy/stack.ts b/lib/workload/stateless/stacks/metadata-manager/deploy/stack.ts index 7817f5bf9..c582a6aa4 100644 --- a/lib/workload/stateless/stacks/metadata-manager/deploy/stack.ts +++ b/lib/workload/stateless/stacks/metadata-manager/deploy/stack.ts @@ -11,6 +11,7 @@ import { LambdaMigrationConstruct } from './construct/lambda-migration'; import { LambdaAPIConstruct } from './construct/lambda-api'; import { ApiGatewayConstructProps } from '../../../../components/api-gateway'; import { PostgresManagerStack } from '../../../../stateful/stacks/postgres-manager/deploy/stack'; +import { LambdaLoadCustomCSVConstruct } from './construct/lambda-load-custom-csv'; export type MetadataManagerStackProps = { /** @@ -82,6 +83,7 @@ export class MetadataManagerStack extends Stack { // 1. To handle API calls // 2. To do migrations // 3. To sync db with external sources (e.g. metadata in gsheet) + // 4. To load-db from external csv presigned url file // (1) new LambdaAPIConstruct(this, 'APILambda', { @@ -103,5 +105,11 @@ export class MetadataManagerStack extends Stack { dbConnectionSecret: dbSecret, isDailySync: props.isDailySync, }); + + // (4) + new LambdaLoadCustomCSVConstruct(this, 'CustomCsvLoaderLambda', { + basicLambdaConfig: basicLambdaConfig, + dbConnectionSecret: dbSecret, + }); } } diff --git a/lib/workload/stateless/stacks/metadata-manager/docs/architecture.drawio.svg b/lib/workload/stateless/stacks/metadata-manager/docs/architecture.drawio.svg index 29207b691..c2fc82cb4 100644 --- a/lib/workload/stateless/stacks/metadata-manager/docs/architecture.drawio.svg +++ b/lib/workload/stateless/stacks/metadata-manager/docs/architecture.drawio.svg @@ -1,4 +1,4 @@ -
API Lambda
API Lambda
sync-ghseet
sync-ghseet
migration
migration
API Gateway
API Gateway
Scheduled Event
(PLANNED)
Scheduled Eve...
PostgresSQL Instance
(Shared with other microservices)
PostgresSQL I...
Secret ManagerĀ 
(Access db connection string)
Secret Manage...
Text is not SVG - cannot display
\ No newline at end of file +
API Lambda
API Lambda
sync-ghseet
sync-ghseet
csv-loader
csv-loader
API Gateway
API Gateway
Scheduled Event
(PROD only)
Scheduled Eve...
PostgresSQL Instance
(Shared with other microservices)
PostgresSQL I...
Secret ManagerĀ 
(Access db connection string)
Secret Manage...
Manual trigger
Manual tri...
migration
migration
Text is not SVG - cannot display
\ No newline at end of file diff --git a/lib/workload/stateless/stacks/metadata-manager/handler/load_custom_metadata_csv.py b/lib/workload/stateless/stacks/metadata-manager/handler/load_custom_metadata_csv.py new file mode 100644 index 000000000..a54a7662a --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/handler/load_custom_metadata_csv.py @@ -0,0 +1,35 @@ +import django +import os +import logging + +from libumccr import libjson + +from proc.service.utils import sanitize_lab_metadata_df, warn_drop_duplicated_library + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings.base') +django.setup() + +from proc.service.load_csv_srv import load_metadata_csv, download_csv_to_pandas + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +def handler(event, _context): + logger.info(f'event: {libjson.dumps(event)}') + + csv_url = event.get('url', None) + if csv_url is None: + raise ValueError("URL is required") + + csv_df = download_csv_to_pandas(csv_url) + sanitize_df = sanitize_lab_metadata_df(csv_df) + duplicate_clean_df = warn_drop_duplicated_library(sanitize_df) + result = load_metadata_csv(duplicate_clean_df) + + logger.info(f'persist report: {libjson.dumps(result)}') + return result + + +if __name__ == '__main__': + handler({}, {}) diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/service/load_csv_srv.py b/lib/workload/stateless/stacks/metadata-manager/proc/service/load_csv_srv.py new file mode 100644 index 000000000..f1a49164b --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/proc/service/load_csv_srv.py @@ -0,0 +1,243 @@ +import json +import logging +import pandas as pd +from django.core.exceptions import ObjectDoesNotExist +from django.db import transaction + +from app.models import Subject, Sample, Library, Project, Contact, Individual +from app.models.library import Quality, LibraryType, Phenotype, WorkflowType, sanitize_library_coverage +from app.models.sample import Source +from app.models.utils import get_value_from_human_readable_label +from proc.service.utils import clean_model_history + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +@transaction.atomic +def load_metadata_csv(df: pd.DataFrame): + """ + Persist metadata records from a pandas dataframe into the db. No record deletion is performed in this method. + + Args: + df (pd.DataFrame): The source of truth for the metadata in this particular year + + """ + logger.info(f"Start processing LabMetadata") + + # Used for statistics + invalid_data = [] + stats = { + "library": { + "create_count": 0, + "update_count": 0, + }, + "sample": { + "create_count": 0, + "update_count": 0, + + }, + "subject": { + "create_count": 0, + "update_count": 0, + }, + "individual": { + "create_count": 0, + "update_count": 0, + }, + "project": { + "create_count": 0, + "update_count": 0, + }, + "contact": { + "create_count": 0, + "update_count": 0, + "delete_count": 0, + }, + 'invalid_record_count': 0, + } + + # this the where records are updated, inserted, linked based on library_id + for record in df.to_dict('records'): + try: + # 1. update or create all data in the model from the given record + + # ------------------------------ + # Individual + # ------------------------------ + idv = None + individual_id = record.get('individual_id') + idv_source = record.get('individual_id_source') + + if individual_id and idv_source: + + idv, is_idv_created, is_idv_updated = Individual.objects.update_or_create_if_needed( + search_key={ + "individual_id": individual_id, + "source": idv_source + }, + data={ + "individual_id": individual_id, + "source": idv_source + } + ) + if is_idv_created: + stats['individual']['create_count'] += 1 + if is_idv_updated: + stats['individual']['update_count'] += 1 + + # ------------------------------ + # Subject + # ------------------------------ + + subject_id = record.get('subject_id') + subject, is_sub_created, is_sub_updated = Subject.objects.update_or_create_if_needed( + search_key={"subject_id": subject_id}, + data={ + "subject_id": subject_id, + } + ) + + if is_sub_created: + stats['subject']['create_count'] += 1 + if is_sub_updated: + stats['subject']['update_count'] += 1 + + if idv: + # link individual to external subject + try: + subject.individual_set.get(orcabus_id=idv.orcabus_id) + except ObjectDoesNotExist: + subject.individual_set.add(idv) + + # We update the stats when new idv is linked to sbj, only if this is not recorded as + # update/create in previous upsert method + if not is_sub_created and not is_sub_updated: + stats['subject']['update_count'] += 1 + + # ------------------------------ + # Sample + # ------------------------------ + sample = None + sample_id = record.get('sample_id') + if sample_id: + sample, is_smp_created, is_smp_updated = Sample.objects.update_or_create_if_needed( + search_key={"sample_id": sample_id}, + data={ + "sample_id": sample_id, + "external_sample_id": record.get('external_sample_id'), + "source": get_value_from_human_readable_label(Source.choices, record.get('source')), + } + ) + if is_smp_created: + stats['sample']['create_count'] += 1 + if is_smp_updated: + stats['sample']['update_count'] += 1 + + # ------------------------------ + # Contact + # ------------------------------ + contact = None + contact_id = record.get('project_owner') + + if contact_id: + contact, is_ctc_created, is_ctc_updated = Contact.objects.update_or_create_if_needed( + search_key={"contact_id": contact_id}, + data={ + "contact_id": contact_id, + } + ) + if is_ctc_created: + stats['contact']['create_count'] += 1 + if is_ctc_updated: + stats['contact']['update_count'] += 1 + + # ------------------------------ + # Project: Upsert project with contact as part of the project + # ------------------------------ + project = None + + project_id = record.get('project_name') + if project_id: + project, is_prj_created, is_prj_updated = Project.objects.update_or_create_if_needed( + search_key={"project_id":project_id}, + data={ + "project_id": project_id, + } + ) + if is_prj_created: + stats['project']['create_count'] += 1 + if is_prj_updated: + stats['project']['update_count'] += 1 + + # link project to its contact of exist + if contact: + try: + project.contact_set.get(orcabus_id=contact.orcabus_id) + except ObjectDoesNotExist: + project.contact_set.add(contact) + + # We update the stats when new ctc is linked to prj, only if this is not recorded as + # update/create in previous upsert method + if not is_prj_created and not is_prj_updated: + stats['project']['update_count'] += 1 + + # ------------------------------ + # Library: Upsert library record with related sample, subject, project + # ------------------------------ + library, is_lib_created, is_lib_updated = Library.objects.update_or_create_if_needed( + search_key={"library_id": record.get('library_id')}, + data={ + 'library_id': record.get('library_id'), + 'phenotype': get_value_from_human_readable_label(Phenotype.choices, record.get('phenotype')), + 'workflow': get_value_from_human_readable_label(WorkflowType.choices, record.get('workflow')), + 'quality': get_value_from_human_readable_label(Quality.choices, record.get('quality')), + 'type': get_value_from_human_readable_label(LibraryType.choices, record.get('type')), + 'assay': record.get('assay'), + 'coverage': sanitize_library_coverage(record.get('coverage')), + + # relationships + 'sample_id': sample.orcabus_id, + 'subject_id': subject.orcabus_id, + } + ) + if is_lib_created: + stats['library']['create_count'] += 1 + if is_lib_updated: + stats['library']['update_count'] += 1 + + # link library to its project + if project: + try: + library.project_set.get(orcabus_id=project.orcabus_id) + except ObjectDoesNotExist: + library.project_set.add(project) + + # We update the stats when new project is linked to library, only if this is not recorded as + # update/create in previous upsert method + if not is_lib_created and not is_lib_updated: + stats['library']['update_count'] += 1 + + except Exception as e: + if any(record.values()): + stats['invalid_record_count'] += 1 + invalid_data.append({ + "reason": e, + "data": record + }) + continue + + # clean up history for django-simple-history model if any + # Only clean for the past 15 minutes as this is what the maximum lambda cutoff + clean_model_history(minutes=15) + + logger.warning(f"Invalid record: {invalid_data}") + logger.info(f"Processed LabMetadata: {json.dumps(stats)}") + return stats + + +def download_csv_to_pandas(url: str) -> pd.DataFrame: + """ + Download csv file from a given url and return it as a pandas dataframe + """ + return pd.read_csv(url) diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py b/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py index 285838dbd..8805b3a74 100644 --- a/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py +++ b/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py @@ -1,9 +1,7 @@ import os -import re import json import pandas as pd -import numpy as np from django.core.exceptions import ObjectDoesNotExist from django.db import transaction @@ -13,10 +11,10 @@ import logging from app.models import Subject, Sample, Library, Project, Contact, Individual -from app.models.library import Quality, LibraryType, Phenotype, WorkflowType +from app.models.library import Quality, LibraryType, Phenotype, WorkflowType, sanitize_library_coverage from app.models.sample import Source from app.models.utils import get_value_from_human_readable_label -from proc.service.utils import clean_model_history +from proc.service.utils import clean_model_history, sanitize_lab_metadata_df logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -243,7 +241,6 @@ def persist_lab_metadata(df: pd.DataFrame, sheet_year: str): if len(invalid_data) > 0: logger.warning(f"Invalid record: {invalid_data}") - logger.info(f"Processed LabMetadata: {json.dumps(stats)}") return stats @@ -265,74 +262,3 @@ def download_tracking_sheet(year: str) -> pd.DataFrame: df: pd.DataFrame = pd.concat(frames) return df - -def sanitize_lab_metadata_df(df: pd.DataFrame): - """ - sanitize record by renaming columns, and clean df cells - """ - - df = clean_columns(df) - df = df.map(_clean_data_cell) - - # dropping any rows that library_id == '' - df = df.drop(df[df.library_id.isnull()].index, errors='ignore') - - # dropping column that has empty column heading - df = df.drop('', axis='columns', errors='ignore') - - df = df.reset_index(drop=True) - return df - - -def warn_drop_duplicated_library(df: pd.DataFrame) -> pd.DataFrame: - """ - log warning messages if duplicated library_id found - """ - # some warning for duplicates - dup_lib_list = df[df.duplicated(subset=['library_id'], keep='last')]["library_id"].tolist() - if len(dup_lib_list) > 0: - logger.warning(f"data contain duplicate libraries: {', '.join(dup_lib_list)}") - - return df.drop_duplicates(subset=['library_id'], keep='last') - - -def clean_columns(df: pd.DataFrame) -> pd.DataFrame: - """ - clean a dataframe of labmetadata from a tracking sheet to correspond to the django object model - we do this by editing the columns to match the django object - """ - # remove unnamed - df = df.loc[:, ~df.columns.str.contains('^Unnamed')] - - # simplify verbose column names - df = df.rename(columns={'Coverage (X)': 'coverage', "TruSeq Index, unless stated": "truseqindex"}) - - # convert PascalCase headers to snake_case and fix ID going to _i_d - pattern = re.compile(r'(? pd.DataFrame: + """ + log warning messages if duplicated library_id found + """ + # some warning for duplicates + dup_lib_list = df[df.duplicated(subset=['library_id'], keep='last')]["library_id"].tolist() + if len(dup_lib_list) > 0: + logger.warning(f"data contain duplicate libraries: {', '.join(dup_lib_list)}") + + return df.drop_duplicates(subset=['library_id'], keep='last') + + +def clean_columns(df: pd.DataFrame) -> pd.DataFrame: + """ + clean a dataframe from a tracking sheet to correspond to the django object model + we do this by editing the columns to match the django object + """ + # remove unnamed + df = df.loc[:, ~df.columns.str.contains('^Unnamed')] + + # simplify verbose column names + df = df.rename(columns={'Coverage (X)': 'coverage', "TruSeq Index, unless stated": "truseqindex"}) + + # convert PascalCase headers to snake_case and fix ID going to _i_d + pattern = re.compile(r'(?