From 3a8a1105a183547b1720edf351c7b9fa52ffee4b Mon Sep 17 00:00:00 2001 From: william Date: Wed, 9 Oct 2024 11:29:09 +1100 Subject: [PATCH] drop incomplete record before processing --- .../stacks/metadata-manager/README.md | 42 ++++++++++--------- .../app/management/commands/load_from_csv.py | 1 + .../commands/trigger_sync_handler.py | 3 +- .../stacks/metadata-manager/deploy/README.md | 14 +++++-- .../handler/load_custom_metadata_csv.py | 8 +++- .../handler/sync_tracking_sheet.py | 9 +++- .../proc/service/load_csv_srv.py | 28 ++++++++++++- .../proc/service/tracking_sheet_srv.py | 23 +++++++++- .../proc/tests/test_tracking_sheet_srv.py | 30 ++++++++++++- 9 files changed, 125 insertions(+), 33 deletions(-) diff --git a/lib/workload/stateless/stacks/metadata-manager/README.md b/lib/workload/stateless/stacks/metadata-manager/README.md index fc84b820f..7018fde4b 100644 --- a/lib/workload/stateless/stacks/metadata-manager/README.md +++ b/lib/workload/stateless/stacks/metadata-manager/README.md @@ -104,22 +104,24 @@ In the near future, we might introduce different ways to load data into the appl loading data from the Google tracking sheet and mapping it to its respective model as follows. -| Sheet Header | Table | Field Name | -|-------------------|--------------|--------------------| -| SubjectID | `Individual` | individual_id | -| ExternalSubjectID | `Subject` | subject_id | -| SampleID | `Sample` | sample_id | -| ExternalSampleID | `Sample` | external_sample_id | -| Source | `Sample` | source | -| LibraryID | `Library` | library_id | -| Phenotype | `Library` | phenotype | -| Workflow | `Library` | workflow | -| Quality | `Library` | quality | -| Type | `Library` | type | -| Coverage (X) | `Library` | coverage | -| Assay | `Library` | assay | -| ProjectName | `Project` | project_id | -| ProjectOwner | `Contact` | contact_id | +| Sheet Header | Table | Field Name | +|--------------------|--------------|--------------------| +| *SubjectID | `Individual` | individual_id | +| *ExternalSubjectID | `Subject` | subject_id | +| *SampleID | `Sample` | sample_id | +| ExternalSampleID | `Sample` | external_sample_id | +| Source | `Sample` | source | +| *LibraryID | `Library` | library_id | +| Phenotype | `Library` | phenotype | +| Workflow | `Library` | workflow | +| Quality | `Library` | quality | +| Type | `Library` | type | +| Coverage (X) | `Library` | coverage | +| Assay | `Library` | assay | +| *ProjectName | `Project` | project_id | +| *ProjectOwner | `Contact` | contact_id | + +All asterisked (*) header are required fields to process a record. Some important notes of the sync: @@ -144,12 +146,12 @@ The application also supports loading data from a custom CSV file. The CSV file | Sheet Header | Table | Field Name | |----------------------|--------------|--------------------| | Individual_id | `Individual` | individual_id | -| individual_id_source | `Individual` | subject_id | -| subject_id | `Subject` | subject_id | +| individual_id_source | `Individual` | source | +| *subject_id | `Subject` | subject_id | | sample_id | `Sample` | sample_id | | external_sample_id | `Sample` | external_sample_id | | source | `Sample` | source | -| library_id | `Library` | library_id | +| *library_id | `Library` | library_id | | phenotype | `Library` | phenotype | | workflow | `Library` | workflow | | quality | `Library` | quality | @@ -159,6 +161,8 @@ The application also supports loading data from a custom CSV file. The CSV file | project_name | `Project` | project_id | | project_owner | `Contact` | contact_id | +All asterisked (*) header are required fields to process a record. + The CSV file should be in a presigned URL format, where the loader will read and insert to the database. To trigger the loader please look at `./deploy/README.md` for more info. diff --git a/lib/workload/stateless/stacks/metadata-manager/app/management/commands/load_from_csv.py b/lib/workload/stateless/stacks/metadata-manager/app/management/commands/load_from_csv.py index 774ab4e3b..6374a70bf 100644 --- a/lib/workload/stateless/stacks/metadata-manager/app/management/commands/load_from_csv.py +++ b/lib/workload/stateless/stacks/metadata-manager/app/management/commands/load_from_csv.py @@ -14,6 +14,7 @@ class Command(BaseCommand): def handle(self, *args, **options): event = { "url": "SOME_URL", + "is_emit_eb_events": False } print(f"Trigger lambda handler for sync tracking sheet. Event {libjson.dumps(event)}") diff --git a/lib/workload/stateless/stacks/metadata-manager/app/management/commands/trigger_sync_handler.py b/lib/workload/stateless/stacks/metadata-manager/app/management/commands/trigger_sync_handler.py index 72feb0d52..c00bf6af5 100644 --- a/lib/workload/stateless/stacks/metadata-manager/app/management/commands/trigger_sync_handler.py +++ b/lib/workload/stateless/stacks/metadata-manager/app/management/commands/trigger_sync_handler.py @@ -13,7 +13,8 @@ class Command(BaseCommand): def handle(self, *args, **options): event = { - "year": 2024 + "year": 2024, + "is_emit_eb_events": False } print(f"Trigger lambda handler for sync tracking sheet. Event {libjson.dumps(event)}") diff --git a/lib/workload/stateless/stacks/metadata-manager/deploy/README.md b/lib/workload/stateless/stacks/metadata-manager/deploy/README.md index ad5a1de9e..136f09e95 100644 --- a/lib/workload/stateless/stacks/metadata-manager/deploy/README.md +++ b/lib/workload/stateless/stacks/metadata-manager/deploy/README.md @@ -40,11 +40,14 @@ To query in a local terminal gsheet_sync_lambda_arn=$(aws ssm get-parameter --name '/orcabus/metadata-manager/sync-gsheet-lambda-arn' --with-decryption | jq -r .Parameter.Value) ``` -The lambda handler will accept a single year from which sheet to run from the GSheet workbook. If no year is specified, it will run the current year. +The lambda handler accepts a json with two parameters: +- `year` - a single year from which to run the sheet from the GSheet workbook. Default is the current year. +- `is_emit_eb_events` - determines whether or not to emit events. Default is `true`. ```json { - "year": "2024" + "year": "2024", + "is_emit_eb_events": false } ``` @@ -72,11 +75,14 @@ To query in a local terminal load_custom_csv_lambda_arn=$(aws ssm get-parameter --name '/orcabus/metadata-manager/load-custom-csv-lambda-arn' --with-decryption | jq -r .Parameter.Value) ``` -The lambda handler will accept a json which only accepts a single key `url` which is the presigned url of the csv file. +The lambda handler accepts a json with two parameters: +- `url` - a presigned url of the csv file +- `is_emit_eb_events` - determines whether or not to emit events. Default is `true`. ```json { - "url": "https://example.com/csv" + "url": "https://example.com/csv", + "is_emit_eb_events": false } ``` diff --git a/lib/workload/stateless/stacks/metadata-manager/handler/load_custom_metadata_csv.py b/lib/workload/stateless/stacks/metadata-manager/handler/load_custom_metadata_csv.py index b35b969d3..fdbf16a1c 100644 --- a/lib/workload/stateless/stacks/metadata-manager/handler/load_custom_metadata_csv.py +++ b/lib/workload/stateless/stacks/metadata-manager/handler/load_custom_metadata_csv.py @@ -8,7 +8,7 @@ django.setup() from proc.service.utils import sanitize_lab_metadata_df, warn_drop_duplicated_library -from proc.service.load_csv_srv import load_metadata_csv, download_csv_to_pandas +from proc.service.load_csv_srv import load_metadata_csv, download_csv_to_pandas, drop_incomplete_csv_records logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -21,10 +21,14 @@ def handler(event, _context): if csv_url is None: raise ValueError("URL is required") + is_emit_eb_events: bool = event.get('is_emit_eb_events', True) + csv_df = download_csv_to_pandas(csv_url) sanitize_df = sanitize_lab_metadata_df(csv_df) duplicate_clean_df = warn_drop_duplicated_library(sanitize_df) - result = load_metadata_csv(duplicate_clean_df) + clean_df = drop_incomplete_csv_records(duplicate_clean_df) + + result = load_metadata_csv(clean_df, is_emit_eb_events) logger.info(f'persist report: {libjson.dumps(result)}') return result diff --git a/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py b/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py index 5df53a0c3..9b83db632 100644 --- a/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py +++ b/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py @@ -8,7 +8,8 @@ django.setup() from proc.service.utils import warn_drop_duplicated_library -from proc.service.tracking_sheet_srv import download_tracking_sheet, sanitize_lab_metadata_df, persist_lab_metadata +from proc.service.tracking_sheet_srv import download_tracking_sheet, sanitize_lab_metadata_df, persist_lab_metadata, \ + drop_incomplete_tracking_sheet_records logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -22,10 +23,14 @@ def handler(event, context): if isinstance(year, list): raise ValueError("Year cannot be an array") + is_emit_eb_events: bool = event.get('is_emit_eb_events', True) + tracking_sheet_df = download_tracking_sheet(year) sanitize_df = sanitize_lab_metadata_df(tracking_sheet_df) duplicate_clean_df = warn_drop_duplicated_library(sanitize_df) - result = persist_lab_metadata(duplicate_clean_df, year) + clean_df = drop_incomplete_tracking_sheet_records(duplicate_clean_df) + + result = persist_lab_metadata(clean_df, year, is_emit_eb_events) logger.info(f'persist report: {libjson.dumps(result)}') return result diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/service/load_csv_srv.py b/lib/workload/stateless/stacks/metadata-manager/proc/service/load_csv_srv.py index 8bd75d16a..cb24bb5db 100644 --- a/lib/workload/stateless/stacks/metadata-manager/proc/service/load_csv_srv.py +++ b/lib/workload/stateless/stacks/metadata-manager/proc/service/load_csv_srv.py @@ -3,6 +3,8 @@ import pandas as pd from django.core.exceptions import ObjectDoesNotExist from django.db import transaction +from libumccr import libjson +from libumccr.aws import libeb from app.models import Subject, Sample, Library, Project, Contact, Individual from app.models.library import Quality, LibraryType, Phenotype, WorkflowType, sanitize_library_coverage @@ -17,12 +19,13 @@ @transaction.atomic -def load_metadata_csv(df: pd.DataFrame): +def load_metadata_csv(df: pd.DataFrame, is_emit_eb_events: bool = True): """ Persist metadata records from a pandas dataframe into the db. No record deletion is performed in this method. Args: df (pd.DataFrame): The source of truth for the metadata in this particular year + is_emit_eb_events: Emit event bridge events for update/create (only for library records for now) """ logger.info(f"Start processing LabMetadata") @@ -253,7 +256,13 @@ def load_metadata_csv(df: pd.DataFrame): # Only clean for the past 15 minutes as this is what the maximum lambda cutoff clean_model_history(minutes=15) - logger.warning(f"Invalid record: {invalid_data}") + if len(invalid_data) > 0: + logger.warning(f"Invalid record: {invalid_data}") + + if len(event_bus_entries) > 0 and is_emit_eb_events: + logger.info(f'Dispatch event bridge entries: {libjson.dumps(event_bus_entries)}') + libeb.dispatch_events(event_bus_entries) + logger.info(f"Processed LabMetadata: {json.dumps(stats)}") return stats @@ -263,3 +272,18 @@ def download_csv_to_pandas(url: str) -> pd.DataFrame: Download csv file from a given url and return it as a pandas dataframe """ return pd.read_csv(url) + + +def drop_incomplete_csv_records(df: pd.DataFrame): + """ + For custom csv, we are dropping records when found empty on any of these columns defined below. + + CSV header: subject_id, library_id + """ + + # The fields are sanitized to camel_case in the sanitize_lab_metadata_df + df = df.drop(df[df.library_id.isnull()].index, errors='ignore') + df = df.drop(df[df.subject_id.isnull()].index, errors='ignore') + + df = df.reset_index(drop=True) + return df diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py b/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py index ff439c2ac..8c3c35dee 100644 --- a/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py +++ b/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py @@ -26,13 +26,14 @@ @transaction.atomic -def persist_lab_metadata(df: pd.DataFrame, sheet_year: str): +def persist_lab_metadata(df: pd.DataFrame, sheet_year: str, is_emit_eb_events: bool = True): """ Persist metadata records from a pandas dataframe into the db Args: df (pd.DataFrame): The source of truth for the metadata in this particular year sheet_year (type): The year for the metadata df supplied + is_emit_eb_events: Emit event bridge events for update/create (only for library records for now) """ logger.info(f"Start processing LabMetadata") @@ -268,7 +269,7 @@ def persist_lab_metadata(df: pd.DataFrame, sheet_year: str): if len(invalid_data) > 0: logger.warning(f"Invalid record: {invalid_data}") - if len(event_bus_entries) > 0: + if len(event_bus_entries) > 0 and is_emit_eb_events: logger.info(f'Dispatch event bridge entries: {libjson.dumps(event_bus_entries)}') libeb.dispatch_events(event_bus_entries) @@ -293,3 +294,21 @@ def download_tracking_sheet(year: str) -> pd.DataFrame: df: pd.DataFrame = pd.concat(frames) return df + +def drop_incomplete_tracking_sheet_records(df: pd.DataFrame): + """ + For loading from the tracking sheet, we are dropping record that is found empty on any of these fields defined below + + Tracking sheet header: ExternalSubjectID, SubjectID, SampleID, LibraryID, ProjectOwner, ProjectName + """ + + # The fields are sanitized to camel_case in the sanitize_lab_metadata_df + df = df.drop(df[df.library_id.isnull()].index, errors='ignore') + df = df.drop(df[df.external_subject_id.isnull()].index, errors='ignore') + df = df.drop(df[df.subject_id.isnull()].index, errors='ignore') + df = df.drop(df[df.sample_id.isnull()].index, errors='ignore') + df = df.drop(df[df.project_owner.isnull()].index, errors='ignore') + df = df.drop(df[df.project_name.isnull()].index, errors='ignore') + + df = df.reset_index(drop=True) + return df diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/tests/test_tracking_sheet_srv.py b/lib/workload/stateless/stacks/metadata-manager/proc/tests/test_tracking_sheet_srv.py index 9eba5ffb0..ecc462584 100644 --- a/lib/workload/stateless/stacks/metadata-manager/proc/tests/test_tracking_sheet_srv.py +++ b/lib/workload/stateless/stacks/metadata-manager/proc/tests/test_tracking_sheet_srv.py @@ -5,9 +5,11 @@ from unittest.mock import MagicMock from django.test import TestCase +from django.core.exceptions import ObjectDoesNotExist from app.models import Library, Sample, Subject, Project, Contact, Individual -from proc.service.tracking_sheet_srv import sanitize_lab_metadata_df, persist_lab_metadata +from proc.service.tracking_sheet_srv import sanitize_lab_metadata_df, persist_lab_metadata, \ + drop_incomplete_tracking_sheet_records from .utils import check_put_event_entries_format, check_put_event_value, is_expected_event_in_output TEST_EVENT_BUS_NAME = "TEST_BUS" @@ -293,6 +295,32 @@ def test_with_deleted_model(self) -> None: self.assertEqual(deleted_lib.count(), 0, 'these library query should all be deleted') self.assertEqual(result.get("library").get("delete_count"), 2, "2 library should be deleted") + + def test_skip_incomplete_records(self) -> None: + """ + python manage.py test \ + proc.tests.test_tracking_sheet_srv.TrackingSheetSrvUnitTests.test_skip_incomplete_records + """ + + mock_record = RECORD_1.copy() + mock_record['SubjectID'] = '' + mock_sheet_data = [mock_record] + + metadata_pd = pd.json_normalize(mock_sheet_data) + metadata_pd = sanitize_lab_metadata_df(metadata_pd) + metadata_pd = drop_incomplete_tracking_sheet_records(metadata_pd) + persist_lab_metadata(metadata_pd, SHEET_YEAR) + + def is_library_exists(library_id): + try: + Library.objects.get(library_id=library_id) + return True + except ObjectDoesNotExist: + return False + + self.assertFalse(is_library_exists(RECORD_1.get("LibraryID")), "library should not be created") + + def test_save_choice_from_human_readable_label(self) -> None: """ python manage.py test \