diff --git a/lib/workload/stateless/stacks/metadata-manager/README.md b/lib/workload/stateless/stacks/metadata-manager/README.md index 559423a14..55bf6f3f1 100644 --- a/lib/workload/stateless/stacks/metadata-manager/README.md +++ b/lib/workload/stateless/stacks/metadata-manager/README.md @@ -61,7 +61,7 @@ on the model of the record. ## How things work -### How Syncing The Data Works +### How Tracking Sheet Syncing Works In the near future, we might introduce different ways to load data into the application. For the time being, we are loading data diff --git a/lib/workload/stateless/stacks/metadata-manager/app/management/commands/load_from_csv.py b/lib/workload/stateless/stacks/metadata-manager/app/management/commands/load_from_csv.py new file mode 100644 index 000000000..99017aa1c --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/app/management/commands/load_from_csv.py @@ -0,0 +1,22 @@ +import logging +from django.core.management import BaseCommand +from libumccr import libjson + +from handler.load_custom_metadata_csv import handler + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +class Command(BaseCommand): + help = "Trigger lambda handler for to sync metadata from csv url" + + def handle(self, *args, **options): + event = { + "url" :"SOME_URL", + } + + print(f"Trigger lambda handler for sync tracking sheet. Event {libjson.dumps(event)}") + result = handler(event, {}) + + print(f"result: {libjson.dumps(result)}") diff --git a/lib/workload/stateless/stacks/metadata-manager/app/migrations/0001_initial.py b/lib/workload/stateless/stacks/metadata-manager/app/migrations/0001_initial.py index 2f92ab840..977c8b52d 100644 --- a/lib/workload/stateless/stacks/metadata-manager/app/migrations/0001_initial.py +++ b/lib/workload/stateless/stacks/metadata-manager/app/migrations/0001_initial.py @@ -81,14 +81,106 @@ class Migration(migrations.Migration): }, ), migrations.CreateModel( - name='Subject', + name='HistoricalContact', fields=[ ('orcabus_id', models.CharField(editable=False, primary_key=True, serialize=False, unique=True, validators=[django.core.validators.RegexValidator(code='invalid_orcabus_id', message='ULID is expected to be 26 characters long', regex='[\\w]{26}$')])), ('subject_id', models.CharField(blank=True, null=True, unique=True)), ], options={ - 'abstract': False, + 'verbose_name': 'historical contact', + 'verbose_name_plural': 'historical contacts', + 'ordering': ('-history_date', '-history_id'), + 'get_latest_by': ('history_date', 'history_id'), + }, + bases=(simple_history.models.HistoricalChanges, models.Model), + ), + migrations.CreateModel( + name='HistoricalIndividual', + fields=[ + ('orcabus_id', models.CharField(db_index=True, editable=False, validators=[django.core.validators.RegexValidator(code='invalid_orcabus_id', message='ULID is expected to be 26 characters long', regex='[\\w]{26}$')])), + ('individual_id', models.CharField(blank=True, db_index=True, null=True)), + ('source', models.CharField(blank=True, null=True)), + ('history_id', models.AutoField(primary_key=True, serialize=False)), + ('history_date', models.DateTimeField(db_index=True)), + ('history_change_reason', models.CharField(max_length=100, null=True)), + ('history_type', models.CharField(choices=[('+', 'Created'), ('~', 'Changed'), ('-', 'Deleted')], max_length=1)), + ('history_user', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='+', to=settings.AUTH_USER_MODEL)), + ], + options={ + 'verbose_name': 'historical individual', + 'verbose_name_plural': 'historical individuals', + 'ordering': ('-history_date', '-history_id'), + 'get_latest_by': ('history_date', 'history_id'), + }, + bases=(simple_history.models.HistoricalChanges, models.Model), + ), + migrations.CreateModel( + name='HistoricalLibrary', + fields=[ + ('orcabus_id', models.CharField(db_index=True, editable=False, validators=[django.core.validators.RegexValidator(code='invalid_orcabus_id', message='ULID is expected to be 26 characters long', regex='[\\w]{26}$')])), + ('library_id', models.CharField(blank=True, db_index=True, null=True)), + ('phenotype', models.CharField(blank=True, choices=[('normal', 'Normal'), ('tumor', 'Tumor'), ('negative-control', 'Negative Control')], null=True)), + ('workflow', models.CharField(blank=True, choices=[('clinical', 'Clinical'), ('research', 'Research'), ('qc', 'Qc'), ('control', 'Control'), ('bcl', 'Bcl'), ('manual', 'Manual')], null=True)), + ('quality', models.CharField(blank=True, choices=[('very-poor', 'VeryPoor'), ('poor', 'Poor'), ('good', 'Good'), ('borderline', 'Borderline')], null=True)), + ('type', models.CharField(blank=True, choices=[('10X', 'Ten X'), ('BiModal', 'Bimodal'), ('ctDNA', 'Ct Dna'), ('ctTSO', 'Ct Tso'), ('exome', 'Exome'), ('MeDIP', 'Me Dip'), ('Metagenm', 'Metagenm'), ('MethylSeq', 'Methyl Seq'), ('TSO-DNA', 'TSO_DNA'), ('TSO-RNA', 'TSO_RNA'), ('WGS', 'Wgs'), ('WTS', 'Wts'), ('other', 'Other')], null=True)), + ('assay', models.CharField(blank=True, null=True)), + ('coverage', models.FloatField(blank=True, null=True)), + ('history_id', models.AutoField(primary_key=True, serialize=False)), + ('history_date', models.DateTimeField(db_index=True)), + ('history_change_reason', models.CharField(max_length=100, null=True)), + ('history_type', models.CharField(choices=[('+', 'Created'), ('~', 'Changed'), ('-', 'Deleted')], max_length=1)), + ('history_user', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='+', to=settings.AUTH_USER_MODEL)), + ('sample', models.ForeignKey(blank=True, db_constraint=False, null=True, on_delete=django.db.models.deletion.DO_NOTHING, related_name='+', to='app.sample')), + ], + options={ + 'verbose_name': 'historical library', + 'verbose_name_plural': 'historical librarys', + 'ordering': ('-history_date', '-history_id'), + 'get_latest_by': ('history_date', 'history_id'), }, + bases=(simple_history.models.HistoricalChanges, models.Model), + ), + migrations.CreateModel( + name='HistoricalProject', + fields=[ + ('orcabus_id', models.CharField(db_index=True, editable=False, validators=[django.core.validators.RegexValidator(code='invalid_orcabus_id', message='ULID is expected to be 26 characters long', regex='[\\w]{26}$')])), + ('project_id', models.CharField(blank=True, db_index=True, null=True)), + ('name', models.CharField(blank=True, null=True)), + ('description', models.CharField(blank=True, null=True)), + ('history_id', models.AutoField(primary_key=True, serialize=False)), + ('history_date', models.DateTimeField(db_index=True)), + ('history_change_reason', models.CharField(max_length=100, null=True)), + ('history_type', models.CharField(choices=[('+', 'Created'), ('~', 'Changed'), ('-', 'Deleted')], max_length=1)), + ('history_user', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='+', to=settings.AUTH_USER_MODEL)), + ], + options={ + 'verbose_name': 'historical project', + 'verbose_name_plural': 'historical projects', + 'ordering': ('-history_date', '-history_id'), + 'get_latest_by': ('history_date', 'history_id'), + }, + bases=(simple_history.models.HistoricalChanges, models.Model), + ), + migrations.CreateModel( + name='HistoricalSample', + fields=[ + ('orcabus_id', models.CharField(db_index=True, editable=False, validators=[django.core.validators.RegexValidator(code='invalid_orcabus_id', message='ULID is expected to be 26 characters long', regex='[\\w]{26}$')])), + ('sample_id', models.CharField(blank=True, db_index=True, null=True)), + ('external_sample_id', models.CharField(blank=True, null=True)), + ('source', models.CharField(blank=True, choices=[('ascites', 'Ascites'), ('blood', 'Blood'), ('bone-marrow', 'BoneMarrow'), ('buccal', 'Buccal'), ('cell-line', 'Cell_line'), ('cfDNA', 'Cfdna'), ('cyst-fluid', 'Cyst Fluid'), ('DNA', 'Dna'), ('eyebrow-hair', 'Eyebrow Hair'), ('FFPE', 'Ffpe'), ('FNA', 'Fna'), ('OCT', 'Oct'), ('organoid', 'Organoid'), ('PDX-tissue', 'Pdx Tissue'), ('plasma-serum', 'Plasma Serum'), ('RNA', 'Rna'), ('tissue', 'Tissue'), ('skin', 'Skin'), ('water', 'Water')], null=True)), + ('history_id', models.AutoField(primary_key=True, serialize=False)), + ('history_date', models.DateTimeField(db_index=True)), + ('history_change_reason', models.CharField(max_length=100, null=True)), + ('history_type', models.CharField(choices=[('+', 'Created'), ('~', 'Changed'), ('-', 'Deleted')], max_length=1)), + ('history_user', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='+', to=settings.AUTH_USER_MODEL)), + ], + options={ + 'verbose_name': 'historical sample', + 'verbose_name_plural': 'historical samples', + 'ordering': ('-history_date', '-history_id'), + 'get_latest_by': ('history_date', 'history_id'), + }, + bases=(simple_history.models.HistoricalChanges, models.Model), ), migrations.CreateModel( name='HistoricalContact', @@ -228,6 +320,31 @@ class Migration(migrations.Migration): ('project', models.ForeignKey(db_column='project_orcabus_id', on_delete=django.db.models.deletion.CASCADE, to='app.project')), ], ), + migrations.CreateModel( + name='HistoricalLibrary_project_set', + fields=[ + ('id', models.BigIntegerField(auto_created=True, blank=True, db_index=True, verbose_name='ID')), + ('m2m_history_id', models.AutoField(primary_key=True, serialize=False)), + ('history', models.ForeignKey(db_constraint=False, on_delete=django.db.models.deletion.DO_NOTHING, to='app.historicallibrary')), + ('library', models.ForeignKey(blank=True, db_constraint=False, db_tablespace='', null=True, on_delete=django.db.models.deletion.DO_NOTHING, related_name='+', to='app.library')), + ('project', models.ForeignKey(blank=True, db_constraint=False, db_tablespace='', null=True, on_delete=django.db.models.deletion.DO_NOTHING, related_name='+', to='app.project')), + ], + options={ + 'verbose_name': 'HistoricalLibrary_project_set', + }, + bases=(simple_history.models.HistoricalChanges, models.Model), + ), + migrations.CreateModel( + name='Subject', + fields=[ + ('orcabus_id', models.CharField(editable=False, primary_key=True, serialize=False, unique=True, validators=[django.core.validators.RegexValidator(code='invalid_orcabus_id', message='ULID is expected to be 26 characters long', regex='[\\w]{26}$')])), + ('subject_id', models.CharField(blank=True, null=True, unique=True)), + ('individual_set', models.ManyToManyField(blank=True, related_name='subject_set', to='app.individual')), + ], + options={ + 'abstract': False, + }, + ), migrations.AddField( model_name='library', name='project_set', diff --git a/lib/workload/stateless/stacks/metadata-manager/deploy/README.md b/lib/workload/stateless/stacks/metadata-manager/deploy/README.md index 554400d13..502113b24 100644 --- a/lib/workload/stateless/stacks/metadata-manager/deploy/README.md +++ b/lib/workload/stateless/stacks/metadata-manager/deploy/README.md @@ -58,3 +58,35 @@ aws lambda invoke \ --cli-binary-format raw-in-base64-out \ res.json ``` + +### CustomCsvLambda + +- Load tracking sheet data from csv presigned url + +To manually trigger the sync, the lambda ARN is stored in the SSM Parameter Store named +`/orcabus/metadata-manager/load-custom-csv-lambda-arn`. + +To query in a local terminal + +```sh +load_custom_csv_lambda_arn=$(aws ssm get-parameter --name '/orcabus/metadata-manager/load-custom-csv-lambda-arn' --with-decryption | jq -r .Parameter.Value) +``` + +The lambda handler will accept a json which only accepts a single key `url` which is the presigned url of the csv file. + +```json +{ + "url": "https://example.com/csv" +} +``` + +Invoking lambda cmd: + +```sh +aws lambda invoke \ + --function-name load_custom_csv_lambda_arn \ + --invocation-type Event \ + --payload '{ "url": "https://the.url.csv" }' \ + --cli-binary-format raw-in-base64-out \ + res.json +``` diff --git a/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/index.ts b/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/index.ts new file mode 100644 index 000000000..46ccf74ef --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/index.ts @@ -0,0 +1,54 @@ +import path from 'path'; +import { Construct } from 'constructs'; +import { Duration } from 'aws-cdk-lib'; +import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha'; +import { ISecret } from 'aws-cdk-lib/aws-secretsmanager'; +import { StringParameter } from 'aws-cdk-lib/aws-ssm'; +import { + DockerImageFunction, + DockerImageFunctionProps, + DockerImageCode, +} from 'aws-cdk-lib/aws-lambda'; + +type LambdaProps = { + /** + * The basic common lambda properties that it should inherit from + */ + basicLambdaConfig: Partial; + /** + * The secret for the db connection where the lambda will need access to + */ + dbConnectionSecret: ISecret; +}; + +export class LambdaLoadCustomCSVConstruct extends Construct { + private readonly lambda: PythonFunction; + + constructor(scope: Construct, id: string, lambdaProps: LambdaProps) { + super(scope, id); + + this.lambda = new DockerImageFunction(this, 'LoadCustomCSVLambda', { + environment: { + ...lambdaProps.basicLambdaConfig.environment, + }, + securityGroups: lambdaProps.basicLambdaConfig.securityGroups, + vpc: lambdaProps.basicLambdaConfig.vpc, + vpcSubnets: lambdaProps.basicLambdaConfig.vpcSubnets, + architecture: lambdaProps.basicLambdaConfig.architecture, + code: DockerImageCode.fromImageAsset(path.join(__dirname, '../../../'), { + file: 'deploy/construct/lambda-load-custom-csv/lambda.Dockerfile', + }), + timeout: Duration.minutes(15), + memorySize: 4096, + }); + + lambdaProps.dbConnectionSecret.grantRead(this.lambda); + + // We need to store this lambda ARN somewhere so that we could refer when need to load this manually + const ssmParameter = new StringParameter(this, 'LoadCustomCSVLambdaArnParameterStore', { + parameterName: '/orcabus/metadata-manager/load-custom-csv-lambda-arn', + description: 'The ARN of the lambda that load metadata from a presigned URL CSV file', + stringValue: this.lambda.functionArn, + }); + } +} diff --git a/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/lambda.Dockerfile b/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/lambda.Dockerfile new file mode 100644 index 000000000..f880a59ed --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/lambda.Dockerfile @@ -0,0 +1,12 @@ +FROM public.ecr.aws/lambda/python:3.12 + +WORKDIR ${LAMBDA_TASK_ROOT} + +# COPY all files +COPY . . + +# Install the specified packages +RUN pip install -r deps/requirements-full.txt + +# Specify handler +CMD [ "handler.load_custom_metadata_csv.handler" ] diff --git a/lib/workload/stateless/stacks/metadata-manager/docs/architecture.drawio.svg b/lib/workload/stateless/stacks/metadata-manager/docs/architecture.drawio.svg index 29207b691..c2fc82cb4 100644 --- a/lib/workload/stateless/stacks/metadata-manager/docs/architecture.drawio.svg +++ b/lib/workload/stateless/stacks/metadata-manager/docs/architecture.drawio.svg @@ -1,4 +1,4 @@ -
API Lambda
API Lambda
sync-ghseet
sync-ghseet
migration
migration
API Gateway
API Gateway
Scheduled Event
(PLANNED)
Scheduled Eve...
PostgresSQL Instance
(Shared with other microservices)
PostgresSQL I...
Secret ManagerĀ 
(Access db connection string)
Secret Manage...
Text is not SVG - cannot display
\ No newline at end of file +
API Lambda
API Lambda
sync-ghseet
sync-ghseet
csv-loader
csv-loader
API Gateway
API Gateway
Scheduled Event
(PROD only)
Scheduled Eve...
PostgresSQL Instance
(Shared with other microservices)
PostgresSQL I...
Secret ManagerĀ 
(Access db connection string)
Secret Manage...
Manual trigger
Manual tri...
migration
migration
Text is not SVG - cannot display
\ No newline at end of file diff --git a/lib/workload/stateless/stacks/metadata-manager/handler/load_custom_metadata_csv.py b/lib/workload/stateless/stacks/metadata-manager/handler/load_custom_metadata_csv.py new file mode 100644 index 000000000..a54a7662a --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/handler/load_custom_metadata_csv.py @@ -0,0 +1,35 @@ +import django +import os +import logging + +from libumccr import libjson + +from proc.service.utils import sanitize_lab_metadata_df, warn_drop_duplicated_library + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings.base') +django.setup() + +from proc.service.load_csv_srv import load_metadata_csv, download_csv_to_pandas + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +def handler(event, _context): + logger.info(f'event: {libjson.dumps(event)}') + + csv_url = event.get('url', None) + if csv_url is None: + raise ValueError("URL is required") + + csv_df = download_csv_to_pandas(csv_url) + sanitize_df = sanitize_lab_metadata_df(csv_df) + duplicate_clean_df = warn_drop_duplicated_library(sanitize_df) + result = load_metadata_csv(duplicate_clean_df) + + logger.info(f'persist report: {libjson.dumps(result)}') + return result + + +if __name__ == '__main__': + handler({}, {}) diff --git a/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py b/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py index 4a32eca47..b12bec70e 100644 --- a/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py +++ b/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py @@ -7,8 +7,8 @@ os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings.base') django.setup() -from proc.service.tracking_sheet_srv import download_tracking_sheet, sanitize_lab_metadata_df, persist_lab_metadata, \ - warn_drop_duplicated_library +from proc.service.tracking_sheet_srv import download_tracking_sheet, persist_lab_metadata +from proc.service.utils import sanitize_lab_metadata_df, warn_drop_duplicated_library logger = logging.getLogger() logger.setLevel(logging.INFO) diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/service/load_csv_srv.py b/lib/workload/stateless/stacks/metadata-manager/proc/service/load_csv_srv.py new file mode 100644 index 000000000..a62178f41 --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/proc/service/load_csv_srv.py @@ -0,0 +1,243 @@ +import json +import logging +import pandas as pd +from django.core.exceptions import ObjectDoesNotExist +from django.db import transaction + +from app.models import Subject, Sample, Library, Project, Contact, Individual +from app.models.library import Quality, LibraryType, Phenotype, WorkflowType, sanitize_library_coverage +from app.models.sample import Source +from app.models.utils import get_value_from_human_readable_label +from proc.service.utils import clean_model_history + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +@transaction.atomic +def load_metadata_csv(df: pd.DataFrame): + """ + Persist metadata records from a pandas dataframe into the db. No record deletion is performed in this method. + + Args: + df (pd.DataFrame): The source of truth for the metadata in this particular year + + """ + logger.info(f"Start processing LabMetadata") + + # Used for statistics + invalid_data = [] + stats = { + "library": { + "create_count": 0, + "update_count": 0, + }, + "sample": { + "create_count": 0, + "update_count": 0, + + }, + "subject": { + "create_count": 0, + "update_count": 0, + }, + "individual": { + "create_count": 0, + "update_count": 0, + }, + "project": { + "create_count": 0, + "update_count": 0, + }, + "contact": { + "create_count": 0, + "update_count": 0, + "delete_count": 0, + }, + 'invalid_record_count': 0, + } + + # this the where records are updated, inserted, linked based on library_id + for record in df.to_dict('records'): + try: + # 1. update or create all data in the model from the given record + + # ------------------------------ + # Individual + # ------------------------------ + idv = None + individual_id = record.get('individual_id') + source = record.get('source') + + if individual_id and source: + + idv, is_idv_created, is_idv_updated = Individual.objects.update_or_create_if_needed( + search_key={ + "individual_id": individual_id, + "source": source + }, + data={ + "individual_id": individual_id, + "source": source + } + ) + if is_idv_created: + stats['individual']['create_count'] += 1 + if is_idv_updated: + stats['individual']['update_count'] += 1 + + # ------------------------------ + # Subject + # ------------------------------ + + subject_id = record.get('subject_id') + subject, is_sub_created, is_sub_updated = Subject.objects.update_or_create_if_needed( + search_key={"subject_id": subject_id}, + data={ + "subject_id": subject_id, + } + ) + + if is_sub_created: + stats['subject']['create_count'] += 1 + if is_sub_updated: + stats['subject']['update_count'] += 1 + + if idv: + # link individual to external subject + try: + subject.individual_set.get(orcabus_id=idv.orcabus_id) + except ObjectDoesNotExist: + subject.individual_set.add(idv) + + # We update the stats when new idv is linked to sbj, only if this is not recorded as + # update/create in previous upsert method + if not is_sub_created and not is_sub_updated: + stats['subject']['update_count'] += 1 + + # ------------------------------ + # Sample + # ------------------------------ + sample = None + sample_id = record.get('sample_id') + if sample_id: + sample, is_smp_created, is_smp_updated = Sample.objects.update_or_create_if_needed( + search_key={"sample_id": sample_id}, + data={ + "sample_id": record.get('sample_id'), + "external_sample_id": record.get('external_sample_id'), + "source": get_value_from_human_readable_label(Source.choices, record.get('source')), + } + ) + if is_smp_created: + stats['sample']['create_count'] += 1 + if is_smp_updated: + stats['sample']['update_count'] += 1 + + # ------------------------------ + # Contact + # ------------------------------ + contact = None + contact_id = record.get('project_owner') + + if contact_id: + contact, is_ctc_created, is_ctc_updated = Contact.objects.update_or_create_if_needed( + search_key={"contact_id": record.get('project_owner')}, + data={ + "contact_id": record.get('project_owner'), + } + ) + if is_ctc_created: + stats['contact']['create_count'] += 1 + if is_ctc_updated: + stats['contact']['update_count'] += 1 + + # ------------------------------ + # Project: Upsert project with contact as part of the project + # ------------------------------ + project = None + + project_id = record.get('project_name') + if project_id: + project, is_prj_created, is_prj_updated = Project.objects.update_or_create_if_needed( + search_key={"project_id": record.get('project_name')}, + data={ + "project_id": record.get('project_name'), + } + ) + if is_prj_created: + stats['project']['create_count'] += 1 + if is_prj_updated: + stats['project']['update_count'] += 1 + + # link project to its contact of exist + if contact: + try: + project.contact_set.get(orcabus_id=contact.orcabus_id) + except ObjectDoesNotExist: + project.contact_set.add(contact) + + # We update the stats when new ctc is linked to prj, only if this is not recorded as + # update/create in previous upsert method + if not is_prj_created and not is_prj_updated: + stats['project']['update_count'] += 1 + + # ------------------------------ + # Library: Upsert library record with related sample, subject, project + # ------------------------------ + library, is_lib_created, is_lib_updated = Library.objects.update_or_create_if_needed( + search_key={"library_id": record.get('library_id')}, + data={ + 'library_id': record.get('library_id'), + 'phenotype': get_value_from_human_readable_label(Phenotype.choices, record.get('phenotype')), + 'workflow': get_value_from_human_readable_label(WorkflowType.choices, record.get('workflow')), + 'quality': get_value_from_human_readable_label(Quality.choices, record.get('quality')), + 'type': get_value_from_human_readable_label(LibraryType.choices, record.get('type')), + 'assay': record.get('assay'), + 'coverage': sanitize_library_coverage(record.get('coverage')), + + # relationships + 'sample_id': sample.orcabus_id, + 'subject_id': subject.orcabus_id, + } + ) + if is_lib_created: + stats['library']['create_count'] += 1 + if is_lib_updated: + stats['library']['update_count'] += 1 + + # link library to its project + if project: + try: + library.project_set.get(orcabus_id=project.orcabus_id) + except ObjectDoesNotExist: + library.project_set.add(project) + + # We update the stats when new project is linked to library, only if this is not recorded as + # update/create in previous upsert method + if not is_lib_created and not is_lib_updated: + stats['library']['update_count'] += 1 + + except Exception as e: + if any(record.values()): + stats['invalid_record_count'] += 1 + invalid_data.append({ + "reason": e, + "data": record + }) + continue + + # clean up history for django-simple-history model if any + # Only clean for the past 15 minutes as this is what the maximum lambda cutoff + clean_model_history(minutes=15) + + logger.warning(f"Invalid record: {invalid_data}") + logger.info(f"Processed LabMetadata: {json.dumps(stats)}") + return stats + + +def download_csv_to_pandas(url: str) -> pd.DataFrame: + """ + Download csv file from a given url and return it as a pandas dataframe + """ + return pd.read_csv(url) diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py b/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py index 285838dbd..5ac89384f 100644 --- a/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py +++ b/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py @@ -1,5 +1,4 @@ import os -import re import json import pandas as pd @@ -266,73 +265,3 @@ def download_tracking_sheet(year: str) -> pd.DataFrame: return df -def sanitize_lab_metadata_df(df: pd.DataFrame): - """ - sanitize record by renaming columns, and clean df cells - """ - - df = clean_columns(df) - df = df.map(_clean_data_cell) - - # dropping any rows that library_id == '' - df = df.drop(df[df.library_id.isnull()].index, errors='ignore') - - # dropping column that has empty column heading - df = df.drop('', axis='columns', errors='ignore') - - df = df.reset_index(drop=True) - return df - - -def warn_drop_duplicated_library(df: pd.DataFrame) -> pd.DataFrame: - """ - log warning messages if duplicated library_id found - """ - # some warning for duplicates - dup_lib_list = df[df.duplicated(subset=['library_id'], keep='last')]["library_id"].tolist() - if len(dup_lib_list) > 0: - logger.warning(f"data contain duplicate libraries: {', '.join(dup_lib_list)}") - - return df.drop_duplicates(subset=['library_id'], keep='last') - - -def clean_columns(df: pd.DataFrame) -> pd.DataFrame: - """ - clean a dataframe of labmetadata from a tracking sheet to correspond to the django object model - we do this by editing the columns to match the django object - """ - # remove unnamed - df = df.loc[:, ~df.columns.str.contains('^Unnamed')] - - # simplify verbose column names - df = df.rename(columns={'Coverage (X)': 'coverage', "TruSeq Index, unless stated": "truseqindex"}) - - # convert PascalCase headers to snake_case and fix ID going to _i_d - pattern = re.compile(r'(? pd.DataFrame: + """ + log warning messages if duplicated library_id found + """ + # some warning for duplicates + dup_lib_list = df[df.duplicated(subset=['library_id'], keep='last')]["library_id"].tolist() + if len(dup_lib_list) > 0: + logger.warning(f"data contain duplicate libraries: {', '.join(dup_lib_list)}") + + return df.drop_duplicates(subset=['library_id'], keep='last') + + +def clean_columns(df: pd.DataFrame) -> pd.DataFrame: + """ + clean a dataframe from a tracking sheet to correspond to the django object model + we do this by editing the columns to match the django object + """ + # remove unnamed + df = df.loc[:, ~df.columns.str.contains('^Unnamed')] + + # simplify verbose column names + df = df.rename(columns={'Coverage (X)': 'coverage', "TruSeq Index, unless stated": "truseqindex"}) + + # convert PascalCase headers to snake_case and fix ID going to _i_d + pattern = re.compile(r'(?