Skip to content

Commit

Permalink
drop incomplete record before processing
Browse files Browse the repository at this point in the history
  • Loading branch information
williamputraintan committed Oct 9, 2024
1 parent 7e68a11 commit 3a8a110
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 33 deletions.
42 changes: 23 additions & 19 deletions lib/workload/stateless/stacks/metadata-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,24 @@ In the near future, we might introduce different ways to load data into the appl
loading data
from the Google tracking sheet and mapping it to its respective model as follows.

| Sheet Header | Table | Field Name |
|-------------------|--------------|--------------------|
| SubjectID | `Individual` | individual_id |
| ExternalSubjectID | `Subject` | subject_id |
| SampleID | `Sample` | sample_id |
| ExternalSampleID | `Sample` | external_sample_id |
| Source | `Sample` | source |
| LibraryID | `Library` | library_id |
| Phenotype | `Library` | phenotype |
| Workflow | `Library` | workflow |
| Quality | `Library` | quality |
| Type | `Library` | type |
| Coverage (X) | `Library` | coverage |
| Assay | `Library` | assay |
| ProjectName | `Project` | project_id |
| ProjectOwner | `Contact` | contact_id |
| Sheet Header | Table | Field Name |
|--------------------|--------------|--------------------|
| *SubjectID | `Individual` | individual_id |
| *ExternalSubjectID | `Subject` | subject_id |
| *SampleID | `Sample` | sample_id |
| ExternalSampleID | `Sample` | external_sample_id |
| Source | `Sample` | source |
| *LibraryID | `Library` | library_id |
| Phenotype | `Library` | phenotype |
| Workflow | `Library` | workflow |
| Quality | `Library` | quality |
| Type | `Library` | type |
| Coverage (X) | `Library` | coverage |
| Assay | `Library` | assay |
| *ProjectName | `Project` | project_id |
| *ProjectOwner | `Contact` | contact_id |

All asterisked (*) header are required fields to process a record.

Some important notes of the sync:

Expand All @@ -144,12 +146,12 @@ The application also supports loading data from a custom CSV file. The CSV file
| Sheet Header | Table | Field Name |
|----------------------|--------------|--------------------|
| Individual_id | `Individual` | individual_id |
| individual_id_source | `Individual` | subject_id |
| subject_id | `Subject` | subject_id |
| individual_id_source | `Individual` | source |
| *subject_id | `Subject` | subject_id |
| sample_id | `Sample` | sample_id |
| external_sample_id | `Sample` | external_sample_id |
| source | `Sample` | source |
| library_id | `Library` | library_id |
| *library_id | `Library` | library_id |
| phenotype | `Library` | phenotype |
| workflow | `Library` | workflow |
| quality | `Library` | quality |
Expand All @@ -159,6 +161,8 @@ The application also supports loading data from a custom CSV file. The CSV file
| project_name | `Project` | project_id |
| project_owner | `Contact` | contact_id |

All asterisked (*) header are required fields to process a record.

The CSV file should be in a presigned URL format, where the loader will read and insert to the database.
To trigger the loader please look at `./deploy/README.md` for more info.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Command(BaseCommand):
def handle(self, *args, **options):
event = {
"url": "SOME_URL",
"is_emit_eb_events": False
}

print(f"Trigger lambda handler for sync tracking sheet. Event {libjson.dumps(event)}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class Command(BaseCommand):

def handle(self, *args, **options):
event = {
"year": 2024
"year": 2024,
"is_emit_eb_events": False
}

print(f"Trigger lambda handler for sync tracking sheet. Event {libjson.dumps(event)}")
Expand Down
14 changes: 10 additions & 4 deletions lib/workload/stateless/stacks/metadata-manager/deploy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ To query in a local terminal
gsheet_sync_lambda_arn=$(aws ssm get-parameter --name '/orcabus/metadata-manager/sync-gsheet-lambda-arn' --with-decryption | jq -r .Parameter.Value)
```

The lambda handler will accept a single year from which sheet to run from the GSheet workbook. If no year is specified, it will run the current year.
The lambda handler accepts a json with two parameters:
- `year` - a single year from which to run the sheet from the GSheet workbook. Default is the current year.
- `is_emit_eb_events` - determines whether or not to emit events. Default is `true`.

```json
{
"year": "2024"
"year": "2024",
"is_emit_eb_events": false
}
```

Expand Down Expand Up @@ -72,11 +75,14 @@ To query in a local terminal
load_custom_csv_lambda_arn=$(aws ssm get-parameter --name '/orcabus/metadata-manager/load-custom-csv-lambda-arn' --with-decryption | jq -r .Parameter.Value)
```

The lambda handler will accept a json which only accepts a single key `url` which is the presigned url of the csv file.
The lambda handler accepts a json with two parameters:
- `url` - a presigned url of the csv file
- `is_emit_eb_events` - determines whether or not to emit events. Default is `true`.

```json
{
"url": "https://example.com/csv"
"url": "https://example.com/csv",
"is_emit_eb_events": false
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
django.setup()

from proc.service.utils import sanitize_lab_metadata_df, warn_drop_duplicated_library
from proc.service.load_csv_srv import load_metadata_csv, download_csv_to_pandas
from proc.service.load_csv_srv import load_metadata_csv, download_csv_to_pandas, drop_incomplete_csv_records

logger = logging.getLogger()
logger.setLevel(logging.INFO)
Expand All @@ -21,10 +21,14 @@ def handler(event, _context):
if csv_url is None:
raise ValueError("URL is required")

is_emit_eb_events: bool = event.get('is_emit_eb_events', True)

csv_df = download_csv_to_pandas(csv_url)
sanitize_df = sanitize_lab_metadata_df(csv_df)
duplicate_clean_df = warn_drop_duplicated_library(sanitize_df)
result = load_metadata_csv(duplicate_clean_df)
clean_df = drop_incomplete_csv_records(duplicate_clean_df)

result = load_metadata_csv(clean_df, is_emit_eb_events)

logger.info(f'persist report: {libjson.dumps(result)}')
return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
django.setup()

from proc.service.utils import warn_drop_duplicated_library
from proc.service.tracking_sheet_srv import download_tracking_sheet, sanitize_lab_metadata_df, persist_lab_metadata
from proc.service.tracking_sheet_srv import download_tracking_sheet, sanitize_lab_metadata_df, persist_lab_metadata, \
drop_incomplete_tracking_sheet_records

logger = logging.getLogger()
logger.setLevel(logging.INFO)
Expand All @@ -22,10 +23,14 @@ def handler(event, context):
if isinstance(year, list):
raise ValueError("Year cannot be an array")

is_emit_eb_events: bool = event.get('is_emit_eb_events', True)

tracking_sheet_df = download_tracking_sheet(year)
sanitize_df = sanitize_lab_metadata_df(tracking_sheet_df)
duplicate_clean_df = warn_drop_duplicated_library(sanitize_df)
result = persist_lab_metadata(duplicate_clean_df, year)
clean_df = drop_incomplete_tracking_sheet_records(duplicate_clean_df)

result = persist_lab_metadata(clean_df, year, is_emit_eb_events)

logger.info(f'persist report: {libjson.dumps(result)}')
return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import pandas as pd
from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from libumccr import libjson
from libumccr.aws import libeb

from app.models import Subject, Sample, Library, Project, Contact, Individual
from app.models.library import Quality, LibraryType, Phenotype, WorkflowType, sanitize_library_coverage
Expand All @@ -17,12 +19,13 @@


@transaction.atomic
def load_metadata_csv(df: pd.DataFrame):
def load_metadata_csv(df: pd.DataFrame, is_emit_eb_events: bool = True):
"""
Persist metadata records from a pandas dataframe into the db. No record deletion is performed in this method.
Args:
df (pd.DataFrame): The source of truth for the metadata in this particular year
is_emit_eb_events: Emit event bridge events for update/create (only for library records for now)
"""
logger.info(f"Start processing LabMetadata")
Expand Down Expand Up @@ -253,7 +256,13 @@ def load_metadata_csv(df: pd.DataFrame):
# Only clean for the past 15 minutes as this is what the maximum lambda cutoff
clean_model_history(minutes=15)

logger.warning(f"Invalid record: {invalid_data}")
if len(invalid_data) > 0:
logger.warning(f"Invalid record: {invalid_data}")

if len(event_bus_entries) > 0 and is_emit_eb_events:
logger.info(f'Dispatch event bridge entries: {libjson.dumps(event_bus_entries)}')
libeb.dispatch_events(event_bus_entries)

logger.info(f"Processed LabMetadata: {json.dumps(stats)}")
return stats

Expand All @@ -263,3 +272,18 @@ def download_csv_to_pandas(url: str) -> pd.DataFrame:
Download csv file from a given url and return it as a pandas dataframe
"""
return pd.read_csv(url)


def drop_incomplete_csv_records(df: pd.DataFrame):
"""
For custom csv, we are dropping records when found empty on any of these columns defined below.
CSV header: subject_id, library_id
"""

# The fields are sanitized to camel_case in the sanitize_lab_metadata_df
df = df.drop(df[df.library_id.isnull()].index, errors='ignore')
df = df.drop(df[df.subject_id.isnull()].index, errors='ignore')

df = df.reset_index(drop=True)
return df
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@


@transaction.atomic
def persist_lab_metadata(df: pd.DataFrame, sheet_year: str):
def persist_lab_metadata(df: pd.DataFrame, sheet_year: str, is_emit_eb_events: bool = True):
"""
Persist metadata records from a pandas dataframe into the db
Args:
df (pd.DataFrame): The source of truth for the metadata in this particular year
sheet_year (type): The year for the metadata df supplied
is_emit_eb_events: Emit event bridge events for update/create (only for library records for now)
"""
logger.info(f"Start processing LabMetadata")
Expand Down Expand Up @@ -268,7 +269,7 @@ def persist_lab_metadata(df: pd.DataFrame, sheet_year: str):
if len(invalid_data) > 0:
logger.warning(f"Invalid record: {invalid_data}")

if len(event_bus_entries) > 0:
if len(event_bus_entries) > 0 and is_emit_eb_events:
logger.info(f'Dispatch event bridge entries: {libjson.dumps(event_bus_entries)}')
libeb.dispatch_events(event_bus_entries)

Expand All @@ -293,3 +294,21 @@ def download_tracking_sheet(year: str) -> pd.DataFrame:
df: pd.DataFrame = pd.concat(frames)
return df


def drop_incomplete_tracking_sheet_records(df: pd.DataFrame):
"""
For loading from the tracking sheet, we are dropping record that is found empty on any of these fields defined below
Tracking sheet header: ExternalSubjectID, SubjectID, SampleID, LibraryID, ProjectOwner, ProjectName
"""

# The fields are sanitized to camel_case in the sanitize_lab_metadata_df
df = df.drop(df[df.library_id.isnull()].index, errors='ignore')
df = df.drop(df[df.external_subject_id.isnull()].index, errors='ignore')
df = df.drop(df[df.subject_id.isnull()].index, errors='ignore')
df = df.drop(df[df.sample_id.isnull()].index, errors='ignore')
df = df.drop(df[df.project_owner.isnull()].index, errors='ignore')
df = df.drop(df[df.project_name.isnull()].index, errors='ignore')

df = df.reset_index(drop=True)
return df
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

from unittest.mock import MagicMock
from django.test import TestCase
from django.core.exceptions import ObjectDoesNotExist

from app.models import Library, Sample, Subject, Project, Contact, Individual
from proc.service.tracking_sheet_srv import sanitize_lab_metadata_df, persist_lab_metadata
from proc.service.tracking_sheet_srv import sanitize_lab_metadata_df, persist_lab_metadata, \
drop_incomplete_tracking_sheet_records
from .utils import check_put_event_entries_format, check_put_event_value, is_expected_event_in_output

TEST_EVENT_BUS_NAME = "TEST_BUS"
Expand Down Expand Up @@ -293,6 +295,32 @@ def test_with_deleted_model(self) -> None:
self.assertEqual(deleted_lib.count(), 0, 'these library query should all be deleted')
self.assertEqual(result.get("library").get("delete_count"), 2, "2 library should be deleted")


def test_skip_incomplete_records(self) -> None:
"""
python manage.py test \
proc.tests.test_tracking_sheet_srv.TrackingSheetSrvUnitTests.test_skip_incomplete_records
"""

mock_record = RECORD_1.copy()
mock_record['SubjectID'] = ''
mock_sheet_data = [mock_record]

metadata_pd = pd.json_normalize(mock_sheet_data)
metadata_pd = sanitize_lab_metadata_df(metadata_pd)
metadata_pd = drop_incomplete_tracking_sheet_records(metadata_pd)
persist_lab_metadata(metadata_pd, SHEET_YEAR)

def is_library_exists(library_id):
try:
Library.objects.get(library_id=library_id)
return True
except ObjectDoesNotExist:
return False

self.assertFalse(is_library_exists(RECORD_1.get("LibraryID")), "library should not be created")


def test_save_choice_from_human_readable_label(self) -> None:
"""
python manage.py test \
Expand Down

0 comments on commit 3a8a110

Please sign in to comment.