diff --git a/lib/workload/stateless/stacks/metadata-manager/app/models/base.py b/lib/workload/stateless/stacks/metadata-manager/app/models/base.py index 10327fb84..658fabd78 100644 --- a/lib/workload/stateless/stacks/metadata-manager/app/models/base.py +++ b/lib/workload/stateless/stacks/metadata-manager/app/models/base.py @@ -79,6 +79,33 @@ def exclude_params(params): return qs + def update_or_create_if_needed(self, search_key: dict, data: dict) -> tuple[models.Model, bool, bool]: + """ + The regular django update_or_create method will always update the record even if there is no change. This + method is a wrapper that will check and only update or create when necessary. + + Args: + search_key (dict): The search key to find the object + data (dict): The latest data to update or create if needed + + Returns: + tuple: A tuple containing: + - obj (Model): The object that is updated or created + - is_created (bool): A boolean if the object is created + - is_updated (bool): A boolean if the object is updated + """ + + try: + # We wanted the exact match of the data, else we need to update this + obj = self.get(**data) + return obj, False, False + except self.model.DoesNotExist: + # If the search key doesn't exist it will create a new one, else it will update the record no matter what + obj, is_created = self.update_or_create(**search_key, defaults=data) + + # obj, is_created, is_updated (if the object is created, it is not updated) + return obj, is_created, not is_created + class BaseModel(models.Model): class Meta: diff --git a/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py b/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py index 46e0908c2..f6ad69361 100644 --- a/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py +++ b/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py @@ -18,12 +18,14 @@ def handler(event, context): logger.info("Start processing update from google tracking sheet") logger.info(f'event: {libjson.dumps(event)}') - year_array = event.get('years', [datetime.date.today().year]) + year = event.get('year', datetime.date.today().year) + if isinstance(year, list): + raise ValueError("Year cannot be an array") - tracking_sheet_df = download_tracking_sheet(year_array) + tracking_sheet_df = download_tracking_sheet(year) sanitize_df = sanitize_lab_metadata_df(tracking_sheet_df) duplicate_clean_df = warn_drop_duplicated_library(sanitize_df) - result = persist_lab_metadata(duplicate_clean_df) + result = persist_lab_metadata(duplicate_clean_df, year) logger.info(f'persist report: {libjson.dumps(result)}') return result diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py b/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py index c7bb8745c..0991a09a4 100644 --- a/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py +++ b/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py @@ -1,5 +1,6 @@ import os import re +import json from typing import List import pandas as pd @@ -25,12 +26,14 @@ @transaction.atomic -def persist_lab_metadata(df: pd.DataFrame): +def persist_lab_metadata(df: pd.DataFrame, sheet_year: str): """ Persist metadata records from a pandas dataframe into the db - :param df: dataframe to persist - :return: result statistics - count of LabMetadata rows created + Args: + df (pd.DataFrame): The source of truth for the metadata in this particular year + sheet_year (type): The year for the metadata df supplied + """ logger.info(f"Start processing LabMetadata") @@ -47,19 +50,17 @@ def persist_lab_metadata(df: pd.DataFrame): rows_invalid = list() - # If the df do not contain to what has existed in the db, it will be deleted - for lib in Library.objects.exclude(library_id__in=df['library_id'].tolist()).iterator(): + # The data frame is to be the source of truth for the particular year + # So we need to remove db records which are not in the data frame + # Only doing this for library records and (dangling) specimen/subject may be removed on a separate process + + # For the library_id we need craft the library_id prefix to match the year + library_prefix = f'L{sheet_year[-2:]}' + for lib in Library.objects.filter(library_id__startswith=library_prefix).exclude( + library_id__in=df['library_id'].tolist()).iterator(): library_deleted.append(lib) lib.delete() - for spc in Specimen.objects.exclude(specimen_id__in=df['sample_id'].tolist()).iterator(): - specimen_deleted.append(spc) - spc.delete() - - for sbj in Subject.objects.exclude(subject_id__in=df['subject_id'].tolist()).iterator(): - subject_deleted.append(sbj) - sbj.delete() - # Update: 12/07/2024. 'Subject' -> 'Specimen' is now ONE to Many, therefore the process of unliking the many to # many is not needed. The following code is commented for future reference when the 'Individual' concept is # introduced (which will have many-to-many as 'Individual' <-> 'Subject'). @@ -98,20 +99,20 @@ def persist_lab_metadata(df: pd.DataFrame): for record in df.to_dict('records'): try: # 1. update or create all data in the model from the given record - subject, is_sub_created = Subject.objects.update_or_create( - subject_id=record.get('subject_id'), - defaults={ + subject, is_sub_created, is_sub_updated = Subject.objects.update_or_create_if_needed( + search_key={"subject_id": record.get('subject_id')}, + data={ "subject_id": record.get('subject_id') } ) if is_sub_created: subject_created.append(subject) - else: + if is_sub_updated: subject_updated.append(subject) - specimen, is_spc_created = Specimen.objects.update_or_create( - specimen_id=record.get('sample_id'), - defaults={ + specimen, is_spc_created, is_spc_updated = Specimen.objects.update_or_create_if_needed( + search_key={"specimen_id": record.get('sample_id')}, + data={ "specimen_id": record.get('sample_id'), "source": get_value_from_human_readable_label(Source.choices, record.get('source')), 'subject_id': subject.orcabus_id @@ -119,11 +120,12 @@ def persist_lab_metadata(df: pd.DataFrame): ) if is_spc_created: specimen_created.append(specimen) - else: + if is_spc_updated: specimen_updated.append(specimen) - library, is_lib_created = Library.objects.update_or_create( - library_id=record.get('library_id'), - defaults={ + + library, is_lib_created, is_lib_updated = Library.objects.update_or_create_if_needed( + search_key={"library_id": record.get('library_id')}, + data={ 'library_id': record.get('library_id'), 'phenotype': get_value_from_human_readable_label(Phenotype.choices, record.get('phenotype')), 'workflow': get_value_from_human_readable_label(WorkflowType.choices, record.get('workflow')), @@ -138,7 +140,7 @@ def persist_lab_metadata(df: pd.DataFrame): ) if is_lib_created: library_created.append(library) - else: + if is_lib_updated: library_updated.append(library) # 2. linking or updating model to each other based on the record (update if it does not match) @@ -155,10 +157,11 @@ def persist_lab_metadata(df: pd.DataFrame): except Exception as e: if any(record.values()): # silent off blank row - logger.warning(f"Invalid record: {libjson.dumps(record)} Exception: {e}") + logger.warning(f"Invalid record ({e}): {json.dumps(record, indent=2)}") rows_invalid.append(record) continue + # clean up history for django-simple-history model if any clean_model_history() return { @@ -183,7 +186,7 @@ def persist_lab_metadata(df: pd.DataFrame): } -def download_tracking_sheet(year_array: List[int]) -> pd.DataFrame: +def download_tracking_sheet(year: int) -> pd.DataFrame: """ Download the full original metadata from Google tracking sheet """ @@ -191,17 +194,12 @@ def download_tracking_sheet(year_array: List[int]) -> pd.DataFrame: account_info = libssm.get_secret(SSM_NAME_GDRIVE_ACCOUNT) frames = [] - for i in year_array: - year_str = str(i) - logger.info(f"Downloading {year_str} sheet") - sheet_df = libgdrive.download_sheet(account_info, sheet_id, year_str) - sheet_df = sanitize_lab_metadata_df(sheet_df) - - # the year might be in the future therefore it does not exist - if sheet_df.empty: - break + year_str = str(year) + logger.info(f"Downloading {year_str} sheet") + sheet_df = libgdrive.download_sheet(account_info, sheet_id, year_str) + sheet_df = sanitize_lab_metadata_df(sheet_df) - frames.append(sheet_df) + frames.append(sheet_df) df: pd.DataFrame = pd.concat(frames) return df diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/tests/test_tracking_sheet_srv.py b/lib/workload/stateless/stacks/metadata-manager/proc/tests/test_tracking_sheet_srv.py index fb70cb430..d18e03cd9 100644 --- a/lib/workload/stateless/stacks/metadata-manager/proc/tests/test_tracking_sheet_srv.py +++ b/lib/workload/stateless/stacks/metadata-manager/proc/tests/test_tracking_sheet_srv.py @@ -5,6 +5,8 @@ from proc.service.tracking_sheet_srv import sanitize_lab_metadata_df, persist_lab_metadata +SHEET_YEAR = "2010" + RECORD_1 = { "LibraryID": "L10001", "SampleID": "PRJ10001", @@ -106,7 +108,7 @@ def test_persist_lab_metadata(self): metadata_pd = pd.json_normalize(mock_sheet_data) metadata_pd = sanitize_lab_metadata_df(metadata_pd) - result = persist_lab_metadata(metadata_pd) + result = persist_lab_metadata(metadata_pd, SHEET_YEAR) self.assertEqual(result.get("invalid_record_count"), 0, "non invalid record should exist") @@ -114,10 +116,10 @@ def test_persist_lab_metadata(self): self.assertEqual(result.get("library").get("update_count"), 0, "0 update in library") self.assertEqual(result.get("specimen").get("new_count"), 2, "2 new specimen should be created") - self.assertEqual(result.get("specimen").get("update_count"), 1, "1 update in specimen") + self.assertEqual(result.get("specimen").get("update_count"), 0, "no update in specimen") self.assertEqual(result.get("subject").get("new_count"), 1, "1 new subject should be created") - self.assertEqual(result.get("subject").get("update_count"), 2, "2 update in subject") + self.assertEqual(result.get("subject").get("update_count"), 0, "no update in subject") lib_1 = Library.objects.get(library_id=RECORD_1.get("LibraryID")) self.assertEqual(lib_1.type, RECORD_1.get("Type"), "incorrect value (Type) stored") @@ -125,7 +127,7 @@ def test_persist_lab_metadata(self): self.assertEqual(lib_1.assay, RECORD_1.get("Assay"), "incorrect value (Assay) stored") self.assertEqual(lib_1.workflow, RECORD_1.get("Workflow"), "incorrect value (Workflow) stored") self.assertEqual(lib_1.project_owner, RECORD_1.get("ProjectOwner"), "incorrect value (ProjectOwner) stored") - self.assertEqual(lib_1.project_name, RECORD_1.get("ProjectName"),"incorrect value (ProjectName) stored") + self.assertEqual(lib_1.project_name, RECORD_1.get("ProjectName"), "incorrect value (ProjectName) stored") self.assertEqual(lib_1.specimen.specimen_id, RECORD_1.get("SampleID"), "incorrect specimen linked") spc_1 = Specimen.objects.get(specimen_id=RECORD_1.get("SampleID")) @@ -148,6 +150,32 @@ def test_persist_lab_metadata(self): self.assertEqual(lib.specimen.subject.subject_id, RECORD_1.get("SubjectID"), "library is not linked to the same subject") + def test_new_df_in_different_year(self) -> None: + """ + python manage.py test proc.tests.test_tracking_sheet_srv.TrackingSheetSrvUnitTests.test_new_df_in_different_year + """ + + metadata_pd = pd.json_normalize([RECORD_1]) + metadata_pd = sanitize_lab_metadata_df(metadata_pd) + persist_lab_metadata(metadata_pd, SHEET_YEAR) + + new_lib_id = 'L24001' + mock_record = RECORD_1.copy() + mock_record['LibraryID'] = new_lib_id + metadata_pd = pd.json_normalize([mock_record]) + metadata_pd = sanitize_lab_metadata_df(metadata_pd) + persist_lab_metadata(metadata_pd, '2024') + + lib_all = Library.objects.all() + self.assertEqual(lib_all.count(), 2, "2 library should be created") + + lib_1 = Library.objects.get(library_id=RECORD_1.get("LibraryID")) + self.assertIsNotNone(lib_1) + + lib_change = Library.objects.get(library_id=new_lib_id) + self.assertIsNotNone(lib_change) + + def test_persist_lab_metadata_alter_sbj(self): """ test where lib moved to different spc, and spc to different sbj @@ -158,11 +186,11 @@ def test_persist_lab_metadata_alter_sbj(self): metadata_pd = pd.json_normalize([RECORD_3]) metadata_pd = sanitize_lab_metadata_df(metadata_pd) - persist_lab_metadata(metadata_pd) + persist_lab_metadata(metadata_pd, SHEET_YEAR) metadata_pd = pd.json_normalize([RECORD_3_DIFF_SBJ]) metadata_pd = sanitize_lab_metadata_df(metadata_pd) - persist_lab_metadata(metadata_pd) + persist_lab_metadata(metadata_pd, SHEET_YEAR) sbj_4 = Subject.objects.get(subject_id=RECORD_3_DIFF_SBJ['SubjectID']) self.assertIsNotNone(sbj_4) @@ -172,7 +200,7 @@ def test_persist_lab_metadata_alter_sbj(self): metadata_pd = pd.json_normalize([RECORD_3_DIFF_SPC]) metadata_pd = sanitize_lab_metadata_df(metadata_pd) - persist_lab_metadata(metadata_pd) + persist_lab_metadata(metadata_pd, SHEET_YEAR) lib_3 = Library.objects.get(library_id=RECORD_3['LibraryID']) self.assertEqual(lib_3.specimen.specimen_id, RECORD_3_DIFF_SPC['SampleID'], @@ -186,13 +214,13 @@ def test_with_deleted_model(self) -> None: metadata_pd = pd.json_normalize(mock_sheet_data) metadata_pd = sanitize_lab_metadata_df(metadata_pd) - persist_lab_metadata(metadata_pd) + persist_lab_metadata(metadata_pd, SHEET_YEAR) mock_sheet_data = [RECORD_3] metadata_pd = pd.json_normalize(mock_sheet_data) metadata_pd = sanitize_lab_metadata_df(metadata_pd) - result = persist_lab_metadata(metadata_pd) + result = persist_lab_metadata(metadata_pd, SHEET_YEAR) deleted_lib = Library.objects.filter(library_id__in=[RECORD_1.get('LibraryID'), RECORD_2.get('LibraryID')]) self.assertEqual(deleted_lib.count(), 0, 'these library query should all be deleted') @@ -210,7 +238,7 @@ def test_save_choice_from_human_readable_label(self) -> None: metadata_pd = pd.json_normalize(mock_sheet_data) metadata_pd = sanitize_lab_metadata_df(metadata_pd) - persist_lab_metadata(metadata_pd) + persist_lab_metadata(metadata_pd, SHEET_YEAR) spc = Specimen.objects.get(specimen_id=mock_record.get("SampleID")) self.assertIsNotNone(spc)