Skip to content

Commit

Permalink
Fix (MM): Library synchronisation issue (#542)
Browse files Browse the repository at this point in the history
  • Loading branch information
williamputraintan authored Sep 13, 2024
1 parent d2bfd98 commit 0530e02
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 61 deletions.
12 changes: 7 additions & 5 deletions lib/workload/stateless/stacks/metadata-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ This is the current (WIP) schema that reflects the current implementation.

To modify the diagram, open the `docs/schema.drawio.svg` with [diagrams.net](https://app.diagrams.net/?src=about).

`orcabus_id` is the unique identifier for each record in the database. It is generated by the application where the first 3 characters are the model prefix followed by [ULID](https://pypi.org/project/ulid-py/) separated by a dot (.).
`orcabus_id` is the unique identifier for each record in the database. It is generated by the application where the
first 3 characters are the model prefix followed by [ULID](https://pypi.org/project/ulid-py/) separated by a dot (.).
The prefix is as follows:

- Library model are `lib`
- Specimen model are `spc`
- Subject model are `sbj`
Expand Down Expand Up @@ -72,13 +74,13 @@ from the Google tracking sheet and mapping it to its respective model as follows
| ProjectOwner | `Library` | project_owner |
| ProjectName | `Library` | project_name |


Some important notes of the sync:

1. The sync will only run from the current year.
2. The tracking sheet is the single source of truth, any deletion/update on any record (including the record that has
been
loaded) will also apply to the existing data.
2. The tracking sheet is the single source of truth for the current year. Any deletion or update to existing records
will be applied based on their internal IDs (`library_id`, `specimen_id`, and `subject_id`). For the library
model, the deletion will only occur based on the current year's prefix. For example, syncing the 2024 tracking
sheet will only query libraries with `library_id` starting with `L24` to determine whether to delete it.
3. `LibraryId` is treated as a unique value in the tracking sheet, so for any duplicated value (including from other
tabs) it will only recognize the last appearance.
4. In cases where multiple records share the same unique identifier (such as SampleId), only the data from the most
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Command(BaseCommand):

def handle(self, *args, **options):
event = {
"years": [2024]
"year": 2024
}

print(f"Trigger lambda handler for sync tracking sheet. Event {libjson.dumps(event)}")
Expand Down
27 changes: 27 additions & 0 deletions lib/workload/stateless/stacks/metadata-manager/app/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,33 @@ def exclude_params(params):

return qs

def update_or_create_if_needed(self, search_key: dict, data: dict) -> tuple[models.Model, bool, bool]:
"""
The regular django update_or_create method will always update the record even if there is no change. This
method is a wrapper that will check and only update or create when necessary.
Args:
search_key (dict): The search key to find the object
data (dict): The latest data to update or create if needed
Returns:
tuple: A tuple containing:
- obj (Model): The object that is updated or created
- is_created (bool): A boolean if the object is created
- is_updated (bool): A boolean if the object is updated
"""

try:
# We wanted the exact match of the data, else we need to update this
obj = self.get(**data)
return obj, False, False
except self.model.DoesNotExist:
# If the search key doesn't exist it will create a new one, else it will update the record no matter what
obj, is_created = self.update_or_create(**search_key, defaults=data)

# obj, is_created, is_updated (if the object is created, it is not updated)
return obj, is_created, not is_created


class BaseModel(models.Model):
class Meta:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,21 @@ To query in a local terminal
gsheet_sync_lambda_arn=$(aws ssm get-parameter --name '/orcabus/metadata-manager/sync-gsheet-lambda-arn' --with-decryption | jq -r .Parameter.Value)
```

The lambda handler will accept an array of years from which sheet to run from the GSheet workbook. If no year is specified, it will run the current year.
The lambda handler will accept a single year from which sheet to run from the GSheet workbook. If no year is specified, it will run the current year.

```json
{
"year":["2024"]
"year": "2024"
}
```

Note that if you specify more than one year at a single invoke (e.g. `["2020", "2021"]`), there are high chances that lambda
would timeout and the sync is not completed properly.
Invoking lambda cmd:

```sh
aws lambda invoke \
--function-name $gsheet_sync_lambda_arn \
--invocation-type Event \
--payload '{ "year": ["2024"] }' \
--payload '{ "year": "2024" }' \
--cli-binary-format raw-in-base64-out \
res.json
```
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ def handler(event, context):
logger.info("Start processing update from google tracking sheet")
logger.info(f'event: {libjson.dumps(event)}')

year_array = event.get('years', [datetime.date.today().year])
year = event.get('year', datetime.date.today().year)
if isinstance(year, list):
raise ValueError("Year cannot be an array")

tracking_sheet_df = download_tracking_sheet(year_array)
tracking_sheet_df = download_tracking_sheet(year)
sanitize_df = sanitize_lab_metadata_df(tracking_sheet_df)
duplicate_clean_df = warn_drop_duplicated_library(sanitize_df)
result = persist_lab_metadata(duplicate_clean_df)
result = persist_lab_metadata(duplicate_clean_df, year)

logger.info(f'persist report: {libjson.dumps(result)}')
return result
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import re
import json
from typing import List

import pandas as pd
Expand All @@ -25,12 +26,14 @@


@transaction.atomic
def persist_lab_metadata(df: pd.DataFrame):
def persist_lab_metadata(df: pd.DataFrame, sheet_year: str):
"""
Persist metadata records from a pandas dataframe into the db
:param df: dataframe to persist
:return: result statistics - count of LabMetadata rows created
Args:
df (pd.DataFrame): The source of truth for the metadata in this particular year
sheet_year (type): The year for the metadata df supplied
"""
logger.info(f"Start processing LabMetadata")

Expand All @@ -47,19 +50,18 @@ def persist_lab_metadata(df: pd.DataFrame):

rows_invalid = list()

# If the df do not contain to what has existed in the db, it will be deleted
for lib in Library.objects.exclude(library_id__in=df['library_id'].tolist()).iterator():
# The data frame is to be the source of truth for the particular year
# So we need to remove db records which are not in the data frame
# Only doing this for library records and (dangling) specimen/subject may be removed on a separate process

# For the library_id we need craft the library_id prefix to match the year
# E.g. year 2024, library_id prefix is 'L24' as what the Lab tracking sheet convention
library_prefix = f'L{sheet_year[-2:]}'
for lib in Library.objects.filter(library_id__startswith=library_prefix).exclude(
library_id__in=df['library_id'].tolist()).iterator():
library_deleted.append(lib)
lib.delete()

for spc in Specimen.objects.exclude(specimen_id__in=df['sample_id'].tolist()).iterator():
specimen_deleted.append(spc)
spc.delete()

for sbj in Subject.objects.exclude(subject_id__in=df['subject_id'].tolist()).iterator():
subject_deleted.append(sbj)
sbj.delete()

# Update: 12/07/2024. 'Subject' -> 'Specimen' is now ONE to Many, therefore the process of unliking the many to
# many is not needed. The following code is commented for future reference when the 'Individual' concept is
# introduced (which will have many-to-many as 'Individual' <-> 'Subject').
Expand Down Expand Up @@ -98,32 +100,33 @@ def persist_lab_metadata(df: pd.DataFrame):
for record in df.to_dict('records'):
try:
# 1. update or create all data in the model from the given record
subject, is_sub_created = Subject.objects.update_or_create(
subject_id=record.get('subject_id'),
defaults={
subject, is_sub_created, is_sub_updated = Subject.objects.update_or_create_if_needed(
search_key={"subject_id": record.get('subject_id')},
data={
"subject_id": record.get('subject_id')
}
)
if is_sub_created:
subject_created.append(subject)
else:
if is_sub_updated:
subject_updated.append(subject)

specimen, is_spc_created = Specimen.objects.update_or_create(
specimen_id=record.get('sample_id'),
defaults={
specimen, is_spc_created, is_spc_updated = Specimen.objects.update_or_create_if_needed(
search_key={"specimen_id": record.get('sample_id')},
data={
"specimen_id": record.get('sample_id'),
"source": get_value_from_human_readable_label(Source.choices, record.get('source')),
'subject_id': subject.orcabus_id
}
)
if is_spc_created:
specimen_created.append(specimen)
else:
if is_spc_updated:
specimen_updated.append(specimen)
library, is_lib_created = Library.objects.update_or_create(
library_id=record.get('library_id'),
defaults={

library, is_lib_created, is_lib_updated = Library.objects.update_or_create_if_needed(
search_key={"library_id": record.get('library_id')},
data={
'library_id': record.get('library_id'),
'phenotype': get_value_from_human_readable_label(Phenotype.choices, record.get('phenotype')),
'workflow': get_value_from_human_readable_label(WorkflowType.choices, record.get('workflow')),
Expand All @@ -138,7 +141,7 @@ def persist_lab_metadata(df: pd.DataFrame):
)
if is_lib_created:
library_created.append(library)
else:
if is_lib_updated:
library_updated.append(library)

# 2. linking or updating model to each other based on the record (update if it does not match)
Expand All @@ -155,10 +158,11 @@ def persist_lab_metadata(df: pd.DataFrame):

except Exception as e:
if any(record.values()): # silent off blank row
logger.warning(f"Invalid record: {libjson.dumps(record)} Exception: {e}")
logger.warning(f"Invalid record ({e}): {json.dumps(record, indent=2)}")
rows_invalid.append(record)
continue

# clean up history for django-simple-history model if any
clean_model_history()

return {
Expand All @@ -183,25 +187,20 @@ def persist_lab_metadata(df: pd.DataFrame):
}


def download_tracking_sheet(year_array: List[int]) -> pd.DataFrame:
def download_tracking_sheet(year: int) -> pd.DataFrame:
"""
Download the full original metadata from Google tracking sheet
"""
sheet_id = libssm.get_secret(SSM_NAME_TRACKING_SHEET_ID)
account_info = libssm.get_secret(SSM_NAME_GDRIVE_ACCOUNT)

frames = []
for i in year_array:
year_str = str(i)
logger.info(f"Downloading {year_str} sheet")
sheet_df = libgdrive.download_sheet(account_info, sheet_id, year_str)
sheet_df = sanitize_lab_metadata_df(sheet_df)

# the year might be in the future therefore it does not exist
if sheet_df.empty:
break
year_str = str(year)
logger.info(f"Downloading {year_str} sheet")
sheet_df = libgdrive.download_sheet(account_info, sheet_id, year_str)
sheet_df = sanitize_lab_metadata_df(sheet_df)

frames.append(sheet_df)
frames.append(sheet_df)

df: pd.DataFrame = pd.concat(frames)
return df
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from proc.service.tracking_sheet_srv import sanitize_lab_metadata_df, persist_lab_metadata

SHEET_YEAR = "2010"

RECORD_1 = {
"LibraryID": "L10001",
"SampleID": "PRJ10001",
Expand Down Expand Up @@ -106,26 +108,26 @@ def test_persist_lab_metadata(self):

metadata_pd = pd.json_normalize(mock_sheet_data)
metadata_pd = sanitize_lab_metadata_df(metadata_pd)
result = persist_lab_metadata(metadata_pd)
result = persist_lab_metadata(metadata_pd, SHEET_YEAR)

self.assertEqual(result.get("invalid_record_count"), 0, "non invalid record should exist")

self.assertEqual(result.get("library").get("new_count"), 3, "3 new library should be created")
self.assertEqual(result.get("library").get("update_count"), 0, "0 update in library")

self.assertEqual(result.get("specimen").get("new_count"), 2, "2 new specimen should be created")
self.assertEqual(result.get("specimen").get("update_count"), 1, "1 update in specimen")
self.assertEqual(result.get("specimen").get("update_count"), 0, "no update in specimen")

self.assertEqual(result.get("subject").get("new_count"), 1, "1 new subject should be created")
self.assertEqual(result.get("subject").get("update_count"), 2, "2 update in subject")
self.assertEqual(result.get("subject").get("update_count"), 0, "no update in subject")

lib_1 = Library.objects.get(library_id=RECORD_1.get("LibraryID"))
self.assertEqual(lib_1.type, RECORD_1.get("Type"), "incorrect value (Type) stored")
self.assertEqual(lib_1.phenotype, RECORD_1.get("Phenotype"), "incorrect value (Phenotype) stored")
self.assertEqual(lib_1.assay, RECORD_1.get("Assay"), "incorrect value (Assay) stored")
self.assertEqual(lib_1.workflow, RECORD_1.get("Workflow"), "incorrect value (Workflow) stored")
self.assertEqual(lib_1.project_owner, RECORD_1.get("ProjectOwner"), "incorrect value (ProjectOwner) stored")
self.assertEqual(lib_1.project_name, RECORD_1.get("ProjectName"),"incorrect value (ProjectName) stored")
self.assertEqual(lib_1.project_name, RECORD_1.get("ProjectName"), "incorrect value (ProjectName) stored")
self.assertEqual(lib_1.specimen.specimen_id, RECORD_1.get("SampleID"), "incorrect specimen linked")

spc_1 = Specimen.objects.get(specimen_id=RECORD_1.get("SampleID"))
Expand All @@ -148,6 +150,32 @@ def test_persist_lab_metadata(self):
self.assertEqual(lib.specimen.subject.subject_id, RECORD_1.get("SubjectID"),
"library is not linked to the same subject")

def test_new_df_in_different_year(self) -> None:
"""
python manage.py test proc.tests.test_tracking_sheet_srv.TrackingSheetSrvUnitTests.test_new_df_in_different_year
"""

metadata_pd = pd.json_normalize([RECORD_1])
metadata_pd = sanitize_lab_metadata_df(metadata_pd)
persist_lab_metadata(metadata_pd, SHEET_YEAR)

new_lib_id = 'L24001'
mock_record = RECORD_1.copy()
mock_record['LibraryID'] = new_lib_id
metadata_pd = pd.json_normalize([mock_record])
metadata_pd = sanitize_lab_metadata_df(metadata_pd)
persist_lab_metadata(metadata_pd, '2024')

lib_all = Library.objects.all()
self.assertEqual(lib_all.count(), 2, "2 library should be created")

lib_1 = Library.objects.get(library_id=RECORD_1.get("LibraryID"))
self.assertIsNotNone(lib_1)

lib_change = Library.objects.get(library_id=new_lib_id)
self.assertIsNotNone(lib_change)


def test_persist_lab_metadata_alter_sbj(self):
"""
test where lib moved to different spc, and spc to different sbj
Expand All @@ -158,11 +186,11 @@ def test_persist_lab_metadata_alter_sbj(self):

metadata_pd = pd.json_normalize([RECORD_3])
metadata_pd = sanitize_lab_metadata_df(metadata_pd)
persist_lab_metadata(metadata_pd)
persist_lab_metadata(metadata_pd, SHEET_YEAR)

metadata_pd = pd.json_normalize([RECORD_3_DIFF_SBJ])
metadata_pd = sanitize_lab_metadata_df(metadata_pd)
persist_lab_metadata(metadata_pd)
persist_lab_metadata(metadata_pd, SHEET_YEAR)

sbj_4 = Subject.objects.get(subject_id=RECORD_3_DIFF_SBJ['SubjectID'])
self.assertIsNotNone(sbj_4)
Expand All @@ -172,7 +200,7 @@ def test_persist_lab_metadata_alter_sbj(self):

metadata_pd = pd.json_normalize([RECORD_3_DIFF_SPC])
metadata_pd = sanitize_lab_metadata_df(metadata_pd)
persist_lab_metadata(metadata_pd)
persist_lab_metadata(metadata_pd, SHEET_YEAR)

lib_3 = Library.objects.get(library_id=RECORD_3['LibraryID'])
self.assertEqual(lib_3.specimen.specimen_id, RECORD_3_DIFF_SPC['SampleID'],
Expand All @@ -186,13 +214,13 @@ def test_with_deleted_model(self) -> None:

metadata_pd = pd.json_normalize(mock_sheet_data)
metadata_pd = sanitize_lab_metadata_df(metadata_pd)
persist_lab_metadata(metadata_pd)
persist_lab_metadata(metadata_pd, SHEET_YEAR)

mock_sheet_data = [RECORD_3]

metadata_pd = pd.json_normalize(mock_sheet_data)
metadata_pd = sanitize_lab_metadata_df(metadata_pd)
result = persist_lab_metadata(metadata_pd)
result = persist_lab_metadata(metadata_pd, SHEET_YEAR)

deleted_lib = Library.objects.filter(library_id__in=[RECORD_1.get('LibraryID'), RECORD_2.get('LibraryID')])
self.assertEqual(deleted_lib.count(), 0, 'these library query should all be deleted')
Expand All @@ -210,7 +238,7 @@ def test_save_choice_from_human_readable_label(self) -> None:

metadata_pd = pd.json_normalize(mock_sheet_data)
metadata_pd = sanitize_lab_metadata_df(metadata_pd)
persist_lab_metadata(metadata_pd)
persist_lab_metadata(metadata_pd, SHEET_YEAR)

spc = Specimen.objects.get(specimen_id=mock_record.get("SampleID"))
self.assertIsNotNone(spc)
Expand Down

0 comments on commit 0530e02

Please sign in to comment.