diff --git a/lib/workload/stateless/stacks/metadata-manager/README.md b/lib/workload/stateless/stacks/metadata-manager/README.md index 77242c0e0..1d4ccf5db 100644 --- a/lib/workload/stateless/stacks/metadata-manager/README.md +++ b/lib/workload/stateless/stacks/metadata-manager/README.md @@ -43,8 +43,10 @@ This is the current (WIP) schema that reflects the current implementation. To modify the diagram, open the `docs/schema.drawio.svg` with [diagrams.net](https://app.diagrams.net/?src=about). -`orcabus_id` is the unique identifier for each record in the database. It is generated by the application where the first 3 characters are the model prefix followed by [ULID](https://pypi.org/project/ulid-py/) separated by a dot (.). +`orcabus_id` is the unique identifier for each record in the database. It is generated by the application where the +first 3 characters are the model prefix followed by [ULID](https://pypi.org/project/ulid-py/) separated by a dot (.). The prefix is as follows: + - Library model are `lib` - Specimen model are `spc` - Subject model are `sbj` @@ -72,13 +74,13 @@ from the Google tracking sheet and mapping it to its respective model as follows | ProjectOwner | `Library` | project_owner | | ProjectName | `Library` | project_name | - Some important notes of the sync: 1. The sync will only run from the current year. -2. The tracking sheet is the single source of truth, any deletion/update on any record (including the record that has - been - loaded) will also apply to the existing data. +2. The tracking sheet is the single source of truth for the current year. Any deletion or update to existing records + will be applied based on their internal IDs (`library_id`, `specimen_id`, and `subject_id`). For the library + model, the deletion will only occur based on the current year's prefix. For example, syncing the 2024 tracking + sheet will only query libraries with `library_id` starting with `L24` to determine whether to delete it. 3. `LibraryId` is treated as a unique value in the tracking sheet, so for any duplicated value (including from other tabs) it will only recognize the last appearance. 4. In cases where multiple records share the same unique identifier (such as SampleId), only the data from the most diff --git a/lib/workload/stateless/stacks/metadata-manager/app/management/commands/trigger_sync_handler.py b/lib/workload/stateless/stacks/metadata-manager/app/management/commands/trigger_sync_handler.py index ccad1704f..72feb0d52 100644 --- a/lib/workload/stateless/stacks/metadata-manager/app/management/commands/trigger_sync_handler.py +++ b/lib/workload/stateless/stacks/metadata-manager/app/management/commands/trigger_sync_handler.py @@ -13,7 +13,7 @@ class Command(BaseCommand): def handle(self, *args, **options): event = { - "years": [2024] + "year": 2024 } print(f"Trigger lambda handler for sync tracking sheet. Event {libjson.dumps(event)}") diff --git a/lib/workload/stateless/stacks/metadata-manager/app/models/base.py b/lib/workload/stateless/stacks/metadata-manager/app/models/base.py index 10327fb84..658fabd78 100644 --- a/lib/workload/stateless/stacks/metadata-manager/app/models/base.py +++ b/lib/workload/stateless/stacks/metadata-manager/app/models/base.py @@ -79,6 +79,33 @@ def exclude_params(params): return qs + def update_or_create_if_needed(self, search_key: dict, data: dict) -> tuple[models.Model, bool, bool]: + """ + The regular django update_or_create method will always update the record even if there is no change. This + method is a wrapper that will check and only update or create when necessary. + + Args: + search_key (dict): The search key to find the object + data (dict): The latest data to update or create if needed + + Returns: + tuple: A tuple containing: + - obj (Model): The object that is updated or created + - is_created (bool): A boolean if the object is created + - is_updated (bool): A boolean if the object is updated + """ + + try: + # We wanted the exact match of the data, else we need to update this + obj = self.get(**data) + return obj, False, False + except self.model.DoesNotExist: + # If the search key doesn't exist it will create a new one, else it will update the record no matter what + obj, is_created = self.update_or_create(**search_key, defaults=data) + + # obj, is_created, is_updated (if the object is created, it is not updated) + return obj, is_created, not is_created + class BaseModel(models.Model): class Meta: diff --git a/lib/workload/stateless/stacks/metadata-manager/deploy/README.md b/lib/workload/stateless/stacks/metadata-manager/deploy/README.md index 32fefc5c8..554400d13 100644 --- a/lib/workload/stateless/stacks/metadata-manager/deploy/README.md +++ b/lib/workload/stateless/stacks/metadata-manager/deploy/README.md @@ -40,22 +40,21 @@ To query in a local terminal gsheet_sync_lambda_arn=$(aws ssm get-parameter --name '/orcabus/metadata-manager/sync-gsheet-lambda-arn' --with-decryption | jq -r .Parameter.Value) ``` -The lambda handler will accept an array of years from which sheet to run from the GSheet workbook. If no year is specified, it will run the current year. +The lambda handler will accept a single year from which sheet to run from the GSheet workbook. If no year is specified, it will run the current year. ```json { - "year":["2024"] + "year": "2024" } ``` -Note that if you specify more than one year at a single invoke (e.g. `["2020", "2021"]`), there are high chances that lambda -would timeout and the sync is not completed properly. +Invoking lambda cmd: ```sh aws lambda invoke \ --function-name $gsheet_sync_lambda_arn \ --invocation-type Event \ - --payload '{ "year": ["2024"] }' \ + --payload '{ "year": "2024" }' \ --cli-binary-format raw-in-base64-out \ res.json ``` diff --git a/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py b/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py index 46e0908c2..f6ad69361 100644 --- a/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py +++ b/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py @@ -18,12 +18,14 @@ def handler(event, context): logger.info("Start processing update from google tracking sheet") logger.info(f'event: {libjson.dumps(event)}') - year_array = event.get('years', [datetime.date.today().year]) + year = event.get('year', datetime.date.today().year) + if isinstance(year, list): + raise ValueError("Year cannot be an array") - tracking_sheet_df = download_tracking_sheet(year_array) + tracking_sheet_df = download_tracking_sheet(year) sanitize_df = sanitize_lab_metadata_df(tracking_sheet_df) duplicate_clean_df = warn_drop_duplicated_library(sanitize_df) - result = persist_lab_metadata(duplicate_clean_df) + result = persist_lab_metadata(duplicate_clean_df, year) logger.info(f'persist report: {libjson.dumps(result)}') return result diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py b/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py index c7bb8745c..0dec34af6 100644 --- a/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py +++ b/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py @@ -1,5 +1,6 @@ import os import re +import json from typing import List import pandas as pd @@ -25,12 +26,14 @@ @transaction.atomic -def persist_lab_metadata(df: pd.DataFrame): +def persist_lab_metadata(df: pd.DataFrame, sheet_year: str): """ Persist metadata records from a pandas dataframe into the db - :param df: dataframe to persist - :return: result statistics - count of LabMetadata rows created + Args: + df (pd.DataFrame): The source of truth for the metadata in this particular year + sheet_year (type): The year for the metadata df supplied + """ logger.info(f"Start processing LabMetadata") @@ -47,19 +50,18 @@ def persist_lab_metadata(df: pd.DataFrame): rows_invalid = list() - # If the df do not contain to what has existed in the db, it will be deleted - for lib in Library.objects.exclude(library_id__in=df['library_id'].tolist()).iterator(): + # The data frame is to be the source of truth for the particular year + # So we need to remove db records which are not in the data frame + # Only doing this for library records and (dangling) specimen/subject may be removed on a separate process + + # For the library_id we need craft the library_id prefix to match the year + # E.g. year 2024, library_id prefix is 'L24' as what the Lab tracking sheet convention + library_prefix = f'L{sheet_year[-2:]}' + for lib in Library.objects.filter(library_id__startswith=library_prefix).exclude( + library_id__in=df['library_id'].tolist()).iterator(): library_deleted.append(lib) lib.delete() - for spc in Specimen.objects.exclude(specimen_id__in=df['sample_id'].tolist()).iterator(): - specimen_deleted.append(spc) - spc.delete() - - for sbj in Subject.objects.exclude(subject_id__in=df['subject_id'].tolist()).iterator(): - subject_deleted.append(sbj) - sbj.delete() - # Update: 12/07/2024. 'Subject' -> 'Specimen' is now ONE to Many, therefore the process of unliking the many to # many is not needed. The following code is commented for future reference when the 'Individual' concept is # introduced (which will have many-to-many as 'Individual' <-> 'Subject'). @@ -98,20 +100,20 @@ def persist_lab_metadata(df: pd.DataFrame): for record in df.to_dict('records'): try: # 1. update or create all data in the model from the given record - subject, is_sub_created = Subject.objects.update_or_create( - subject_id=record.get('subject_id'), - defaults={ + subject, is_sub_created, is_sub_updated = Subject.objects.update_or_create_if_needed( + search_key={"subject_id": record.get('subject_id')}, + data={ "subject_id": record.get('subject_id') } ) if is_sub_created: subject_created.append(subject) - else: + if is_sub_updated: subject_updated.append(subject) - specimen, is_spc_created = Specimen.objects.update_or_create( - specimen_id=record.get('sample_id'), - defaults={ + specimen, is_spc_created, is_spc_updated = Specimen.objects.update_or_create_if_needed( + search_key={"specimen_id": record.get('sample_id')}, + data={ "specimen_id": record.get('sample_id'), "source": get_value_from_human_readable_label(Source.choices, record.get('source')), 'subject_id': subject.orcabus_id @@ -119,11 +121,12 @@ def persist_lab_metadata(df: pd.DataFrame): ) if is_spc_created: specimen_created.append(specimen) - else: + if is_spc_updated: specimen_updated.append(specimen) - library, is_lib_created = Library.objects.update_or_create( - library_id=record.get('library_id'), - defaults={ + + library, is_lib_created, is_lib_updated = Library.objects.update_or_create_if_needed( + search_key={"library_id": record.get('library_id')}, + data={ 'library_id': record.get('library_id'), 'phenotype': get_value_from_human_readable_label(Phenotype.choices, record.get('phenotype')), 'workflow': get_value_from_human_readable_label(WorkflowType.choices, record.get('workflow')), @@ -138,7 +141,7 @@ def persist_lab_metadata(df: pd.DataFrame): ) if is_lib_created: library_created.append(library) - else: + if is_lib_updated: library_updated.append(library) # 2. linking or updating model to each other based on the record (update if it does not match) @@ -155,10 +158,11 @@ def persist_lab_metadata(df: pd.DataFrame): except Exception as e: if any(record.values()): # silent off blank row - logger.warning(f"Invalid record: {libjson.dumps(record)} Exception: {e}") + logger.warning(f"Invalid record ({e}): {json.dumps(record, indent=2)}") rows_invalid.append(record) continue + # clean up history for django-simple-history model if any clean_model_history() return { @@ -183,7 +187,7 @@ def persist_lab_metadata(df: pd.DataFrame): } -def download_tracking_sheet(year_array: List[int]) -> pd.DataFrame: +def download_tracking_sheet(year: int) -> pd.DataFrame: """ Download the full original metadata from Google tracking sheet """ @@ -191,17 +195,12 @@ def download_tracking_sheet(year_array: List[int]) -> pd.DataFrame: account_info = libssm.get_secret(SSM_NAME_GDRIVE_ACCOUNT) frames = [] - for i in year_array: - year_str = str(i) - logger.info(f"Downloading {year_str} sheet") - sheet_df = libgdrive.download_sheet(account_info, sheet_id, year_str) - sheet_df = sanitize_lab_metadata_df(sheet_df) - - # the year might be in the future therefore it does not exist - if sheet_df.empty: - break + year_str = str(year) + logger.info(f"Downloading {year_str} sheet") + sheet_df = libgdrive.download_sheet(account_info, sheet_id, year_str) + sheet_df = sanitize_lab_metadata_df(sheet_df) - frames.append(sheet_df) + frames.append(sheet_df) df: pd.DataFrame = pd.concat(frames) return df diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/tests/test_tracking_sheet_srv.py b/lib/workload/stateless/stacks/metadata-manager/proc/tests/test_tracking_sheet_srv.py index fb70cb430..d18e03cd9 100644 --- a/lib/workload/stateless/stacks/metadata-manager/proc/tests/test_tracking_sheet_srv.py +++ b/lib/workload/stateless/stacks/metadata-manager/proc/tests/test_tracking_sheet_srv.py @@ -5,6 +5,8 @@ from proc.service.tracking_sheet_srv import sanitize_lab_metadata_df, persist_lab_metadata +SHEET_YEAR = "2010" + RECORD_1 = { "LibraryID": "L10001", "SampleID": "PRJ10001", @@ -106,7 +108,7 @@ def test_persist_lab_metadata(self): metadata_pd = pd.json_normalize(mock_sheet_data) metadata_pd = sanitize_lab_metadata_df(metadata_pd) - result = persist_lab_metadata(metadata_pd) + result = persist_lab_metadata(metadata_pd, SHEET_YEAR) self.assertEqual(result.get("invalid_record_count"), 0, "non invalid record should exist") @@ -114,10 +116,10 @@ def test_persist_lab_metadata(self): self.assertEqual(result.get("library").get("update_count"), 0, "0 update in library") self.assertEqual(result.get("specimen").get("new_count"), 2, "2 new specimen should be created") - self.assertEqual(result.get("specimen").get("update_count"), 1, "1 update in specimen") + self.assertEqual(result.get("specimen").get("update_count"), 0, "no update in specimen") self.assertEqual(result.get("subject").get("new_count"), 1, "1 new subject should be created") - self.assertEqual(result.get("subject").get("update_count"), 2, "2 update in subject") + self.assertEqual(result.get("subject").get("update_count"), 0, "no update in subject") lib_1 = Library.objects.get(library_id=RECORD_1.get("LibraryID")) self.assertEqual(lib_1.type, RECORD_1.get("Type"), "incorrect value (Type) stored") @@ -125,7 +127,7 @@ def test_persist_lab_metadata(self): self.assertEqual(lib_1.assay, RECORD_1.get("Assay"), "incorrect value (Assay) stored") self.assertEqual(lib_1.workflow, RECORD_1.get("Workflow"), "incorrect value (Workflow) stored") self.assertEqual(lib_1.project_owner, RECORD_1.get("ProjectOwner"), "incorrect value (ProjectOwner) stored") - self.assertEqual(lib_1.project_name, RECORD_1.get("ProjectName"),"incorrect value (ProjectName) stored") + self.assertEqual(lib_1.project_name, RECORD_1.get("ProjectName"), "incorrect value (ProjectName) stored") self.assertEqual(lib_1.specimen.specimen_id, RECORD_1.get("SampleID"), "incorrect specimen linked") spc_1 = Specimen.objects.get(specimen_id=RECORD_1.get("SampleID")) @@ -148,6 +150,32 @@ def test_persist_lab_metadata(self): self.assertEqual(lib.specimen.subject.subject_id, RECORD_1.get("SubjectID"), "library is not linked to the same subject") + def test_new_df_in_different_year(self) -> None: + """ + python manage.py test proc.tests.test_tracking_sheet_srv.TrackingSheetSrvUnitTests.test_new_df_in_different_year + """ + + metadata_pd = pd.json_normalize([RECORD_1]) + metadata_pd = sanitize_lab_metadata_df(metadata_pd) + persist_lab_metadata(metadata_pd, SHEET_YEAR) + + new_lib_id = 'L24001' + mock_record = RECORD_1.copy() + mock_record['LibraryID'] = new_lib_id + metadata_pd = pd.json_normalize([mock_record]) + metadata_pd = sanitize_lab_metadata_df(metadata_pd) + persist_lab_metadata(metadata_pd, '2024') + + lib_all = Library.objects.all() + self.assertEqual(lib_all.count(), 2, "2 library should be created") + + lib_1 = Library.objects.get(library_id=RECORD_1.get("LibraryID")) + self.assertIsNotNone(lib_1) + + lib_change = Library.objects.get(library_id=new_lib_id) + self.assertIsNotNone(lib_change) + + def test_persist_lab_metadata_alter_sbj(self): """ test where lib moved to different spc, and spc to different sbj @@ -158,11 +186,11 @@ def test_persist_lab_metadata_alter_sbj(self): metadata_pd = pd.json_normalize([RECORD_3]) metadata_pd = sanitize_lab_metadata_df(metadata_pd) - persist_lab_metadata(metadata_pd) + persist_lab_metadata(metadata_pd, SHEET_YEAR) metadata_pd = pd.json_normalize([RECORD_3_DIFF_SBJ]) metadata_pd = sanitize_lab_metadata_df(metadata_pd) - persist_lab_metadata(metadata_pd) + persist_lab_metadata(metadata_pd, SHEET_YEAR) sbj_4 = Subject.objects.get(subject_id=RECORD_3_DIFF_SBJ['SubjectID']) self.assertIsNotNone(sbj_4) @@ -172,7 +200,7 @@ def test_persist_lab_metadata_alter_sbj(self): metadata_pd = pd.json_normalize([RECORD_3_DIFF_SPC]) metadata_pd = sanitize_lab_metadata_df(metadata_pd) - persist_lab_metadata(metadata_pd) + persist_lab_metadata(metadata_pd, SHEET_YEAR) lib_3 = Library.objects.get(library_id=RECORD_3['LibraryID']) self.assertEqual(lib_3.specimen.specimen_id, RECORD_3_DIFF_SPC['SampleID'], @@ -186,13 +214,13 @@ def test_with_deleted_model(self) -> None: metadata_pd = pd.json_normalize(mock_sheet_data) metadata_pd = sanitize_lab_metadata_df(metadata_pd) - persist_lab_metadata(metadata_pd) + persist_lab_metadata(metadata_pd, SHEET_YEAR) mock_sheet_data = [RECORD_3] metadata_pd = pd.json_normalize(mock_sheet_data) metadata_pd = sanitize_lab_metadata_df(metadata_pd) - result = persist_lab_metadata(metadata_pd) + result = persist_lab_metadata(metadata_pd, SHEET_YEAR) deleted_lib = Library.objects.filter(library_id__in=[RECORD_1.get('LibraryID'), RECORD_2.get('LibraryID')]) self.assertEqual(deleted_lib.count(), 0, 'these library query should all be deleted') @@ -210,7 +238,7 @@ def test_save_choice_from_human_readable_label(self) -> None: metadata_pd = pd.json_normalize(mock_sheet_data) metadata_pd = sanitize_lab_metadata_df(metadata_pd) - persist_lab_metadata(metadata_pd) + persist_lab_metadata(metadata_pd, SHEET_YEAR) spc = Specimen.objects.get(specimen_id=mock_record.get("SampleID")) self.assertIsNotNone(spc)