From 61e50e6d28673cf89db927399c2ec263a4823858 Mon Sep 17 00:00:00 2001 From: William Putra Intan <61998484+williamputraintan@users.noreply.github.com> Date: Tue, 24 Sep 2024 11:37:48 +1000 Subject: [PATCH] Feat (MM): State Change Event (#569) --- config/stacks/metadataManager.ts | 2 + config/stacks/schema/events.ts | 9 + docs/schemas/events/metadatamanager/Makefile | 3 + .../MetadataStateChange.schema.json | 78 +++++++ .../example/MSC__example1.json | 27 +++ .../example/MSC__example2.json | 27 +++ .../stacks/metadata-manager/Makefile | 2 +- .../stacks/metadata-manager/README.md | 37 ++++ .../app/serializers/library.py | 9 +- .../metadata-manager/app/serializers/utils.py | 17 ++ .../metadata-manager/app/tests/test_models.py | 50 +++++ .../construct/lambda-load-custom-csv/index.ts | 13 +- .../construct/lambda-sync-gsheet/index.ts | 11 +- .../stacks/metadata-manager/deploy/stack.ts | 6 + .../handler/load_custom_metadata_csv.py | 3 +- .../handler/sync_tracking_sheet.py | 4 +- .../metadata-manager/proc/aws/event/event.py | 36 +++ .../metadatastatechange/Event.py | 205 ++++++++++++++++++ .../MetadataStateChange.py | 124 +++++++++++ .../metadatastatechange/__init__.py | 7 + .../metadatastatechange/marshaller.py | 138 ++++++++++++ .../proc/service/load_csv_srv.py | 24 +- .../proc/service/tracking_sheet_srv.py | 45 +++- .../proc/tests/test_tracking_sheet_srv.py | 114 +++++++++- .../metadata-manager/proc/tests/utils.py | 42 ++++ 25 files changed, 1014 insertions(+), 19 deletions(-) create mode 100644 docs/schemas/events/metadatamanager/Makefile create mode 100644 docs/schemas/events/metadatamanager/MetadataStateChange.schema.json create mode 100644 docs/schemas/events/metadatamanager/example/MSC__example1.json create mode 100644 docs/schemas/events/metadatamanager/example/MSC__example2.json create mode 100644 lib/workload/stateless/stacks/metadata-manager/app/serializers/utils.py create mode 100644 lib/workload/stateless/stacks/metadata-manager/proc/aws/event/event.py create mode 100644 lib/workload/stateless/stacks/metadata-manager/proc/aws/event/schema/orcabus_metadatamanager/metadatastatechange/Event.py create mode 100644 lib/workload/stateless/stacks/metadata-manager/proc/aws/event/schema/orcabus_metadatamanager/metadatastatechange/MetadataStateChange.py create mode 100644 lib/workload/stateless/stacks/metadata-manager/proc/aws/event/schema/orcabus_metadatamanager/metadatastatechange/__init__.py create mode 100644 lib/workload/stateless/stacks/metadata-manager/proc/aws/event/schema/orcabus_metadatamanager/metadatastatechange/marshaller.py create mode 100644 lib/workload/stateless/stacks/metadata-manager/proc/tests/utils.py diff --git a/config/stacks/metadataManager.ts b/config/stacks/metadataManager.ts index babfe63b8..6a75a0f7f 100644 --- a/config/stacks/metadataManager.ts +++ b/config/stacks/metadataManager.ts @@ -5,6 +5,7 @@ import { corsAllowOrigins, logsApiGatewayConfig, vpcProps, + eventBusName, } from '../constants'; import { MetadataManagerStackProps } from '../../lib/workload/stateless/stacks/metadata-manager/deploy/stack'; @@ -15,6 +16,7 @@ export const getMetadataManagerStackProps = (stage: AppStage): MetadataManagerSt vpcProps, isDailySync: isDailySync, lambdaSecurityGroupName: computeSecurityGroupName, + eventBusName: eventBusName, apiGatewayCognitoProps: { ...cognitoApiGatewayConfig, corsAllowOrigins: corsAllowOrigins[stage], diff --git a/config/stacks/schema/events.ts b/config/stacks/schema/events.ts index 536c2806a..7daf1a6d1 100644 --- a/config/stacks/schema/events.ts +++ b/config/stacks/schema/events.ts @@ -40,6 +40,15 @@ export const getEventSchemaStackProps = (): SchemaStackProps => { docBase + '/executionservice/WorkflowRunStateChange.schema.json' ), }, + { + ...defaultProps, + schemaName: 'orcabus.metadatamanager@MetadataStateChange', + schemaDescription: 'State change event for lab metadata changes', + schemaLocation: path.join( + __dirname, + docBase + '/metadatamanager/MetadataStateChange.schema.json' + ), + }, ], }; }; diff --git a/docs/schemas/events/metadatamanager/Makefile b/docs/schemas/events/metadatamanager/Makefile new file mode 100644 index 000000000..8cf03dc27 --- /dev/null +++ b/docs/schemas/events/metadatamanager/Makefile @@ -0,0 +1,3 @@ +test: + @check-jsonschema --schemafile MetadataStateChange.schema.json example/MSC__example1.json + @check-jsonschema --schemafile MetadataStateChange.schema.json example/MSC__example2.json diff --git a/docs/schemas/events/metadatamanager/MetadataStateChange.schema.json b/docs/schemas/events/metadatamanager/MetadataStateChange.schema.json new file mode 100644 index 000000000..fd13cb3f6 --- /dev/null +++ b/docs/schemas/events/metadatamanager/MetadataStateChange.schema.json @@ -0,0 +1,78 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "$id": "https://raw.githubusercontent.com/umccr/orcabus/main/docs/schemas/events/metadatamanager/MetadataStateChange.schema.json", + "description": "EventBridge custom event schema for orcabus.metadatamanager@MetadataStateChange", + "title": "AWSEvent", + "type": "object", + "required": [ + "detail-type", + "detail", + "source" + ], + "properties": { + "id": { + "type": "string" + }, + "region": { + "type": "string" + }, + "resources": { + "type": "array", + "items": { + "type": "string" + } + }, + "source": { + "enum": ["orcabus.metadatamanager"] + }, + "time": { + "type": "string", + "format": "date-time" + }, + "version": { + "type": "string" + }, + "account": { + "type": "string" + }, + "detail-type": { + "enum": ["MetadataStateChange"] + }, + "detail": { + "$ref": "#/definitions/MetadataStateChange" + } + }, + "definitions": { + "MetadataStateChange": { + "type": "object", + "required": [ + "model", + "action", + "data", + "refId" + ], + "properties": { + "model": { + "type": "string", + "enum": [ + "LIBRARY" + ] + }, + "action": { + "type": "string", + "enum": [ + "CREATE", + "UPDATE", + "DELETE" + ] + }, + "refId": { + "type": "string" + }, + "data": { + "type": "object" + } + } + } + } +} diff --git a/docs/schemas/events/metadatamanager/example/MSC__example1.json b/docs/schemas/events/metadatamanager/example/MSC__example1.json new file mode 100644 index 000000000..872b4b28e --- /dev/null +++ b/docs/schemas/events/metadatamanager/example/MSC__example1.json @@ -0,0 +1,27 @@ +{ + "version": "0", + "id": "f71bbbbb-5b36-40c2-f7dc-804ca6270cd6", + "detail-type": "MetadataStateChange", + "source": "orcabus.metadatamanager", + "account": "123456789012", + "time": "2024-05-01T09:25:44Z", + "region": "ap-southeast-2", + "resources": [], + "detail": { + "action": "CREATE", + "model": "LIBRARY", + "refId": "lib.01J8GMF3XCHW9CV8ZFS8F1P1RF", + "data": { + "orcabusId": "lib.01J8GMF3XCHW9CV8ZFS8F1P1RF", + "libraryId": "L10001", + "phenotype": "normal", + "workflow": "research", + "quality": "good", + "type": "WTS", + "assay": "ctTSO", + "coverage": 120.0, + "sample": "smp.01J8GMF3WD6TD5Y491EEBARYBE", + "subject": "sbj.01J8GMF3VZRGYQG1GYDJC6E9MV" + } + } +} diff --git a/docs/schemas/events/metadatamanager/example/MSC__example2.json b/docs/schemas/events/metadatamanager/example/MSC__example2.json new file mode 100644 index 000000000..67c9baaef --- /dev/null +++ b/docs/schemas/events/metadatamanager/example/MSC__example2.json @@ -0,0 +1,27 @@ +{ + "version": "0", + "id": "f71bbbbb-5b36-40c2-f7dc-804ca6270cd6", + "detail-type": "MetadataStateChange", + "source": "orcabus.metadatamanager", + "account": "123456789012", + "time": "2024-05-01T09:25:44Z", + "region": "ap-southeast-2", + "resources": [], + "detail": { + "action": "DELETE", + "model": "LIBRARY", + "refId": "lib.01J8GNBB7YK8RCVFGXSBV4ZKST", + "data": { + "orcabusId": "lib.01J8GNBB7YK8RCVFGXSBV4ZKST", + "libraryId": "L10001", + "phenotype": "normal", + "workflow": "research", + "quality": "poor", + "type": "WTS", + "assay": "ctTSO", + "coverage": 120.0, + "sample": "smp.01J8GNBB67HGG3G17QA5QP98TE", + "subject": "sbj.01J8GNBB59GR0KMTNR0BT3F31V" + } + } +} diff --git a/lib/workload/stateless/stacks/metadata-manager/Makefile b/lib/workload/stateless/stacks/metadata-manager/Makefile index e74d81833..10cb5fa4b 100644 --- a/lib/workload/stateless/stacks/metadata-manager/Makefile +++ b/lib/workload/stateless/stacks/metadata-manager/Makefile @@ -76,7 +76,7 @@ insert-data: @python manage.py insert_mock_data suite: - @python manage.py test + @python manage.py test --parallel # full mock suite test pipeline - install deps, bring up compose stack, run suite, bring down compose stack test: install up suite down diff --git a/lib/workload/stateless/stacks/metadata-manager/README.md b/lib/workload/stateless/stacks/metadata-manager/README.md index e1dbf3ff8..fc84b820f 100644 --- a/lib/workload/stateless/stacks/metadata-manager/README.md +++ b/lib/workload/stateless/stacks/metadata-manager/README.md @@ -61,6 +61,43 @@ on the model of the record. ## How things work +The metadata loader currently supports two Lambda functions: one for syncing from a tracking sheet and another for syncing from a custom CSV presigned URL. Upon a CREATE or UPDATE operation in the metadata library, the Lambda function will publish events to the `MainOrcabusEventBus` using the schema defined in [MetadataStateChange.schema.json](/docs/schemas/events/metadatamanager/MetadataStateChange.schema.json). + +The event data will adhere to the same schema as the OpenAPI schema without nested object. + +Example of the event emitted. + +```json +{ + "version": "0", + "id": "e7b8a2d4-3b6e-4f9b-9c1e-1a2b3c4d5e6f", + "detail-type": "MetadataStateChange", + "source": "orcabus.metadatamanager", + "account": "12345678", + "time": "2000-09-01T00:00:00Z", + "region": "ap-southeast-2", + "resources": [], + "detail": { + "action": "CREATE", + "model": "LIBRARY", + "refId": "lib.01J8GMF3XCHW9CV8ZFS8F1P1RF", + "data": { + "orcabusId": "lib.01J8GMF3XCHW9CV8ZFS8F1P1RF", + "libraryId": "L10001", + "phenotype": "normal", + "workflow": "research", + "quality": "good", + "type": "WTS", + "assay": "ctTSO", + "coverage": 120.0, + "sample": "smp.01J8GMF3WD6TD5Y491EEBARYBE", + "subject": "sbj.01J8GMF3VZRGYQG1GYDJC6E9MV" + } + } +} + +``` + ### How Syncing The Data Works In the near future, we might introduce different ways to load data into the application. For the time being, we are diff --git a/lib/workload/stateless/stacks/metadata-manager/app/serializers/library.py b/lib/workload/stateless/stacks/metadata-manager/app/serializers/library.py index 6a6e65ac8..3617dc26d 100644 --- a/lib/workload/stateless/stacks/metadata-manager/app/serializers/library.py +++ b/lib/workload/stateless/stacks/metadata-manager/app/serializers/library.py @@ -1,4 +1,4 @@ -from app.models import Library +from app.models import Library, Sample, Subject from .base import SerializersBase from .project import ProjectSerializer from .sample import SampleSerializer @@ -14,6 +14,13 @@ class Meta: model = Library exclude = ["project_set"] + def to_representation(self, instance): + representation = super().to_representation(instance) + representation['sample'] = Sample.orcabus_id_prefix + representation['sample'] + representation['subject'] = Subject.orcabus_id_prefix + representation['subject'] + return representation + + class LibraryDetailSerializer(LibraryBaseSerializer): project_set = ProjectSerializer(many=True, read_only=True) diff --git a/lib/workload/stateless/stacks/metadata-manager/app/serializers/utils.py b/lib/workload/stateless/stacks/metadata-manager/app/serializers/utils.py new file mode 100644 index 000000000..890c56e2e --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/app/serializers/utils.py @@ -0,0 +1,17 @@ + + +def to_camel_case_key_dict(data: dict) -> dict: + """ + Convert dictionary keys from snake_case to camelCase. + """ + def snake_to_camel(word): + components = word.split('_') + # We capitalize the first letter of each component except the first one + # with the 'title' method and join them together. + return components[0] + ''.join(x.title() for x in components[1:]) + + new_data = {} + for key, value in data.items(): + new_key = snake_to_camel(key) + new_data[new_key] = value + return new_data diff --git a/lib/workload/stateless/stacks/metadata-manager/app/tests/test_models.py b/lib/workload/stateless/stacks/metadata-manager/app/tests/test_models.py index a6146cdaa..4e06ef7dd 100644 --- a/lib/workload/stateless/stacks/metadata-manager/app/tests/test_models.py +++ b/lib/workload/stateless/stacks/metadata-manager/app/tests/test_models.py @@ -1,5 +1,6 @@ import logging +from unittest.mock import MagicMock, patch from django.test import TestCase from app.models import Subject, Sample, Library, Contact, Project, Individual @@ -69,3 +70,52 @@ def test_metadata_model_relationship(self): # find the linked contact cnt_one = prj_one.contact_set.get(contact_id=CONTACT_1['contact_id']) self.assertEqual(cnt_one.contact_id, CONTACT_1['contact_id'], "incorrect contact 'id' linked to project") + + def test_upsert_method(self): + """ + python manage.py test app.tests.test_models.MetadataTestCase.test_upsert_method + """ + + # Test function with updating existing record + updated_spc_data = { + "sample_id": SAMPLE_1['sample_id'], + "source": 'skin', + } + obj, is_created, is_updated = Sample.objects.update_or_create_if_needed( + {"sample_id": updated_spc_data["sample_id"]}, + updated_spc_data + ) + self.assertIsNotNone(obj, "object should not be None") + self.assertFalse(is_created, "object should NOT be created") + self.assertTrue(is_updated, "object should be updated") + + smp_one = Sample.objects.get(sample_id=updated_spc_data["sample_id"]) + self.assertEqual(smp_one.source, updated_spc_data['source'], "incorrect 'source' from updated specimen id") + + # Test function with creating new record + new_spc_data = { + "sample_id": 'SMP002', + "source": 'RNA', + } + obj, is_created, is_updated = Sample.objects.update_or_create_if_needed( + {"sample_id": new_spc_data['sample_id']}, + new_spc_data + ) + self.assertIsNotNone(obj, "object should not be None") + self.assertTrue(is_created, "new object should be created") + self.assertFalse(is_updated, "new object should not be updated") + spc_two = Sample.objects.get(sample_id=new_spc_data['sample_id']) + self.assertEqual(spc_two.sample_id, new_spc_data["sample_id"], "incorrect specimen 'id'") + self.assertEqual(spc_two.source, new_spc_data['source'], "incorrect 'source' from new specimen id") + + # Test if no update called if no data has changed + with patch.object(Sample.objects, 'update_or_create', return_value=(None, False)) as mock_update_or_create: + obj, is_created, is_updated = Sample.objects.update_or_create_if_needed( + {"sample_id": new_spc_data['sample_id']}, + new_spc_data + ) + mock_update_or_create.assert_not_called() + self.assertIsNotNone(obj, "object should not be None") + self.assertFalse(is_created, "object should not be created") + self.assertFalse(is_updated, "object should not be updated") + diff --git a/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/index.ts b/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/index.ts index 46ccf74ef..5677888e8 100644 --- a/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/index.ts +++ b/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-load-custom-csv/index.ts @@ -9,6 +9,7 @@ import { DockerImageFunctionProps, DockerImageCode, } from 'aws-cdk-lib/aws-lambda'; +import { EventBus } from 'aws-cdk-lib/aws-events'; type LambdaProps = { /** @@ -19,6 +20,10 @@ type LambdaProps = { * The secret for the db connection where the lambda will need access to */ dbConnectionSecret: ISecret; + /** + * The eventBusName to notify metadata state change + */ + eventBusName: string; }; export class LambdaLoadCustomCSVConstruct extends Construct { @@ -30,6 +35,8 @@ export class LambdaLoadCustomCSVConstruct extends Construct { this.lambda = new DockerImageFunction(this, 'LoadCustomCSVLambda', { environment: { ...lambdaProps.basicLambdaConfig.environment, + + EVENT_BUS_NAME: lambdaProps.eventBusName, }, securityGroups: lambdaProps.basicLambdaConfig.securityGroups, vpc: lambdaProps.basicLambdaConfig.vpc, @@ -45,10 +52,14 @@ export class LambdaLoadCustomCSVConstruct extends Construct { lambdaProps.dbConnectionSecret.grantRead(this.lambda); // We need to store this lambda ARN somewhere so that we could refer when need to load this manually - const ssmParameter = new StringParameter(this, 'LoadCustomCSVLambdaArnParameterStore', { + new StringParameter(this, 'LoadCustomCSVLambdaArnParameterStore', { parameterName: '/orcabus/metadata-manager/load-custom-csv-lambda-arn', description: 'The ARN of the lambda that load metadata from a presigned URL CSV file', stringValue: this.lambda.functionArn, }); + + // The lambda will need permission to put events to the event bus when metadata state change + const orcabusEventBus = EventBus.fromEventBusName(this, 'EventBus', lambdaProps.eventBusName); + orcabusEventBus.grantPutEventsTo(this.lambda); } } diff --git a/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-sync-gsheet/index.ts b/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-sync-gsheet/index.ts index 00c343d08..1367a2369 100644 --- a/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-sync-gsheet/index.ts +++ b/lib/workload/stateless/stacks/metadata-manager/deploy/construct/lambda-sync-gsheet/index.ts @@ -4,7 +4,7 @@ import { Duration } from 'aws-cdk-lib'; import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha'; import { ISecret } from 'aws-cdk-lib/aws-secretsmanager'; import { StringParameter } from 'aws-cdk-lib/aws-ssm'; -import { Rule, Schedule } from 'aws-cdk-lib/aws-events'; +import { Rule, Schedule, EventBus } from 'aws-cdk-lib/aws-events'; import { LambdaFunction } from 'aws-cdk-lib/aws-events-targets'; import { DockerImageFunction, @@ -25,6 +25,10 @@ type LambdaProps = { * If the lambda should run daily sync */ isDailySync: boolean; + /** + * The eventBusName to notify metadata state change + */ + eventBusName: string; }; export class LambdaSyncGsheetConstruct extends Construct { @@ -41,6 +45,7 @@ export class LambdaSyncGsheetConstruct extends Construct { ...lambdaProps.basicLambdaConfig.environment, SSM_NAME_GDRIVE_ACCOUNT: this.GDRIVE_CRED_PARAM_NAME, SSM_NAME_TRACKING_SHEET_ID: this.GDRIVE_SHEET_ID_PARAM_NAME, + EVENT_BUS_NAME: lambdaProps.eventBusName, }, securityGroups: lambdaProps.basicLambdaConfig.securityGroups, vpc: lambdaProps.basicLambdaConfig.vpc, @@ -85,5 +90,9 @@ export class LambdaSyncGsheetConstruct extends Construct { targets: [gsheetSyncLambdaEventTarget], }); } + + // The lambda will need permission to put events to the event bus when metadata state change + const orcabusEventBus = EventBus.fromEventBusName(this, 'EventBus', lambdaProps.eventBusName); + orcabusEventBus.grantPutEventsTo(this.lambda); } } diff --git a/lib/workload/stateless/stacks/metadata-manager/deploy/stack.ts b/lib/workload/stateless/stacks/metadata-manager/deploy/stack.ts index c582a6aa4..1cbbb0204 100644 --- a/lib/workload/stateless/stacks/metadata-manager/deploy/stack.ts +++ b/lib/workload/stateless/stacks/metadata-manager/deploy/stack.ts @@ -30,6 +30,10 @@ export type MetadataManagerStackProps = { * API Gateway props */ apiGatewayCognitoProps: ApiGatewayConstructProps; + /** + * API Gateway props + */ + eventBusName: string; }; export class MetadataManagerStack extends Stack { @@ -104,12 +108,14 @@ export class MetadataManagerStack extends Stack { basicLambdaConfig: basicLambdaConfig, dbConnectionSecret: dbSecret, isDailySync: props.isDailySync, + eventBusName: props.eventBusName, }); // (4) new LambdaLoadCustomCSVConstruct(this, 'CustomCsvLoaderLambda', { basicLambdaConfig: basicLambdaConfig, dbConnectionSecret: dbSecret, + eventBusName: props.eventBusName, }); } } diff --git a/lib/workload/stateless/stacks/metadata-manager/handler/load_custom_metadata_csv.py b/lib/workload/stateless/stacks/metadata-manager/handler/load_custom_metadata_csv.py index a54a7662a..b35b969d3 100644 --- a/lib/workload/stateless/stacks/metadata-manager/handler/load_custom_metadata_csv.py +++ b/lib/workload/stateless/stacks/metadata-manager/handler/load_custom_metadata_csv.py @@ -4,11 +4,10 @@ from libumccr import libjson -from proc.service.utils import sanitize_lab_metadata_df, warn_drop_duplicated_library - os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings.base') django.setup() +from proc.service.utils import sanitize_lab_metadata_df, warn_drop_duplicated_library from proc.service.load_csv_srv import load_metadata_csv, download_csv_to_pandas logger = logging.getLogger() diff --git a/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py b/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py index 4a32eca47..5df53a0c3 100644 --- a/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py +++ b/lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py @@ -7,8 +7,8 @@ os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings.base') django.setup() -from proc.service.tracking_sheet_srv import download_tracking_sheet, sanitize_lab_metadata_df, persist_lab_metadata, \ - warn_drop_duplicated_library +from proc.service.utils import warn_drop_duplicated_library +from proc.service.tracking_sheet_srv import download_tracking_sheet, sanitize_lab_metadata_df, persist_lab_metadata logger = logging.getLogger() logger.setLevel(logging.INFO) diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/aws/event/event.py b/lib/workload/stateless/stacks/metadata-manager/proc/aws/event/event.py new file mode 100644 index 000000000..b0597eb8a --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/proc/aws/event/event.py @@ -0,0 +1,36 @@ +import os +import json +from typing import Literal + +from app.serializers.utils import to_camel_case_key_dict +from .schema.orcabus_metadatamanager.metadatastatechange import Marshaller + + +class MetadataStateChangeEvent: + namespace = "orcabus.metadatamanager" + detail_type = "MetadataStateChange" + + def __init__(self, + action: Literal['CREATE', 'UPDATE', 'DELETE'], + model: Literal['LIBRARY', 'SPECIMEN', 'SUBJECT'], + ref_id: str, + data: dict) -> None: + self.event_bus_name = os.getenv('EVENT_BUS_NAME', '') + # Below must be in camelCase as what we agreed (and written in docs) in API level + self.detail = json.dumps({ + "action": action, + "model": model, + "refId": ref_id, + "data": Marshaller.marshall(to_camel_case_key_dict(data)) + }) + + def __str__(self): + return self.__dict__ + + def get_put_event_entry(self): + return { + "Source": self.namespace, + "DetailType": self.detail_type, + "Detail": self.detail, + "EventBusName": self.event_bus_name + } diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/aws/event/schema/orcabus_metadatamanager/metadatastatechange/Event.py b/lib/workload/stateless/stacks/metadata-manager/proc/aws/event/schema/orcabus_metadatamanager/metadatastatechange/Event.py new file mode 100644 index 000000000..916926a2b --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/proc/aws/event/schema/orcabus_metadatamanager/metadatastatechange/Event.py @@ -0,0 +1,205 @@ +# coding: utf-8 +import pprint +import re # noqa: F401 + +import six +from enum import Enum +from proc.aws.event.schema.orcabus_metadatamanager.metadatastatechange.MetadataStateChange import MetadataStateChange # noqa: F401,E501 + +class Event(object): + + + _types = { + 'account': 'str', + 'detail': 'MetadataStateChange', + 'detail_type': 'Object', + 'id': 'str', + 'region': 'str', + 'resources': 'list[str]', + 'source': 'Object', + 'time': 'datetime', + 'version': 'str' + } + + _attribute_map = { + 'account': 'account', + 'detail': 'detail', + 'detail_type': 'detail-type', + 'id': 'id', + 'region': 'region', + 'resources': 'resources', + 'source': 'source', + 'time': 'time', + 'version': 'version' + } + + def __init__(self, account=None, detail=None, detail_type=None, id=None, region=None, resources=None, source=None, time=None, version=None): # noqa: E501 + self._account = None + self._detail = None + self._detail_type = None + self._id = None + self._region = None + self._resources = None + self._source = None + self._time = None + self._version = None + self.discriminator = None + self.account = account + self.detail = detail + self.detail_type = detail_type + self.id = id + self.region = region + self.resources = resources + self.source = source + self.time = time + self.version = version + + + @property + def account(self): + + return self._account + + @account.setter + def account(self, account): + + + self._account = account + + + @property + def detail(self): + + return self._detail + + @detail.setter + def detail(self, detail): + + + self._detail = detail + + + @property + def detail_type(self): + + return self._detail_type + + @detail_type.setter + def detail_type(self, detail_type): + + + self._detail_type = detail_type + + + @property + def id(self): + + return self._id + + @id.setter + def id(self, id): + + + self._id = id + + + @property + def region(self): + + return self._region + + @region.setter + def region(self, region): + + + self._region = region + + + @property + def resources(self): + + return self._resources + + @resources.setter + def resources(self, resources): + + + self._resources = resources + + + @property + def source(self): + + return self._source + + @source.setter + def source(self, source): + + + self._source = source + + + @property + def time(self): + + return self._time + + @time.setter + def time(self, time): + + + self._time = time + + + @property + def version(self): + + return self._version + + @version.setter + def version(self, version): + + + self._version = version + + def to_dict(self): + result = {} + + for attr, _ in six.iteritems(self._types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + if issubclass(Event, dict): + for key, value in self.items(): + result[key] = value + + return result + + def to_str(self): + return pprint.pformat(self.to_dict()) + + def __repr__(self): + return self.to_str() + + def __eq__(self, other): + if not isinstance(other, Event): + return False + + return self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not self == other + diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/aws/event/schema/orcabus_metadatamanager/metadatastatechange/MetadataStateChange.py b/lib/workload/stateless/stacks/metadata-manager/proc/aws/event/schema/orcabus_metadatamanager/metadatastatechange/MetadataStateChange.py new file mode 100644 index 000000000..9ddc56795 --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/proc/aws/event/schema/orcabus_metadatamanager/metadatastatechange/MetadataStateChange.py @@ -0,0 +1,124 @@ +# coding: utf-8 +import pprint +import re # noqa: F401 + +import six +from enum import Enum + +class MetadataStateChange(object): + + + _types = { + 'action': 'str', + 'data': 'object', + 'model': 'str', + 'refId': 'str' + } + + _attribute_map = { + 'action': 'action', + 'data': 'data', + 'model': 'model', + 'refId': 'refId' + } + + def __init__(self, action=None, data=None, model=None, refId=None): # noqa: E501 + self._action = None + self._data = None + self._model = None + self._refId = None + self.discriminator = None + self.action = action + self.data = data + self.model = model + self.refId = refId + + + @property + def action(self): + + return self._action + + @action.setter + def action(self, action): + + + self._action = action + + + @property + def data(self): + + return self._data + + @data.setter + def data(self, data): + + + self._data = data + + + @property + def model(self): + + return self._model + + @model.setter + def model(self, model): + + + self._model = model + + + @property + def refId(self): + + return self._refId + + @refId.setter + def refId(self, refId): + + + self._refId = refId + + def to_dict(self): + result = {} + + for attr, _ in six.iteritems(self._types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + if issubclass(MetadataStateChange, dict): + for key, value in self.items(): + result[key] = value + + return result + + def to_str(self): + return pprint.pformat(self.to_dict()) + + def __repr__(self): + return self.to_str() + + def __eq__(self, other): + if not isinstance(other, MetadataStateChange): + return False + + return self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not self == other + diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/aws/event/schema/orcabus_metadatamanager/metadatastatechange/__init__.py b/lib/workload/stateless/stacks/metadata-manager/proc/aws/event/schema/orcabus_metadatamanager/metadatastatechange/__init__.py new file mode 100644 index 000000000..f3721c12e --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/proc/aws/event/schema/orcabus_metadatamanager/metadatastatechange/__init__.py @@ -0,0 +1,7 @@ +# coding: utf-8 + +from __future__ import absolute_import + +from proc.aws.event.schema.orcabus_metadatamanager.metadatastatechange.marshaller import Marshaller +from proc.aws.event.schema.orcabus_metadatamanager.metadatastatechange.Event import Event +from proc.aws.event.schema.orcabus_metadatamanager.metadatastatechange.MetadataStateChange import MetadataStateChange diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/aws/event/schema/orcabus_metadatamanager/metadatastatechange/marshaller.py b/lib/workload/stateless/stacks/metadata-manager/proc/aws/event/schema/orcabus_metadatamanager/metadatastatechange/marshaller.py new file mode 100644 index 000000000..a63f00764 --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/proc/aws/event/schema/orcabus_metadatamanager/metadatastatechange/marshaller.py @@ -0,0 +1,138 @@ +import datetime +import re +import six +from .MetadataStateChange import MetadataStateChange + +class Marshaller: + PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types + + NATIVE_TYPES_MAPPING = { + 'int': int, + 'long': int, + 'float': float, + 'str': str, + 'bool': bool, + 'date': datetime.date, + 'datetime': datetime.datetime, + 'object': object, + } + + @classmethod + def marshall(cls, obj): + if obj is None: + return None + elif isinstance(obj, cls.PRIMITIVE_TYPES): + return obj + elif isinstance(obj, list): + return [cls.marshall(sub_obj) + for sub_obj in obj] + elif isinstance(obj, tuple): + return tuple(cls.marshall(sub_obj) + for sub_obj in obj) + elif isinstance(obj, (datetime.datetime, datetime.date)): + return obj.isoformat() + + if isinstance(obj, dict): + obj_dict = obj + else: + obj_dict = {obj._attribute_map[attr]: getattr(obj, attr) + for attr, _ in six.iteritems(obj._types) + if getattr(obj, attr) is not None} + + return {key: cls.marshall(val) + for key, val in six.iteritems(obj_dict)} + + @classmethod + def unmarshall(cls, data, typeName): + + if data is None: + return None + + if type(typeName) == str: + if typeName.startswith('list['): + sub_kls = re.match(r'list\[(.*)\]', typeName).group(1) + return [cls.unmarshall(sub_data, sub_kls) + for sub_data in data] + + if typeName.startswith('dict('): + sub_kls = re.match(r'dict\(([^,]*), (.*)\)', typeName).group(2) + return {k: cls.unmarshall(v, sub_kls) + for k, v in six.iteritems(data)} + + if typeName in cls.NATIVE_TYPES_MAPPING: + typeName = cls.NATIVE_TYPES_MAPPING[typeName] + else: + typeName = getattr(MetadataStateChange, typeName) + + if typeName in cls.PRIMITIVE_TYPES: + return cls.__unmarshall_primitive(data, typeName) + elif typeName == object: + return cls.__unmarshall_object(data) + elif typeName == datetime.date: + return cls.__unmarshall_date(data) + elif typeName == datetime.datetime: + return cls.__unmarshall_datatime(data) + else: + return cls.__unmarshall_model(data, typeName) + + @classmethod + def __unmarshall_primitive(cls, data, typeName): + try: + return typeName(data) + except UnicodeEncodeError: + return six.text_type(data) + except TypeError: + return data + + @classmethod + def __unmarshall_object(cls, value): + return value + + @classmethod + def __unmarshall_date(cls, string): + try: + from dateutil.parser import parse + return parse(string).date() + except ImportError: + return string + + @classmethod + def __unmarshall_datatime(cls, string): + try: + from dateutil.parser import parse + return parse(string) + except ImportError: + return string + + @classmethod + def __unmarshall_model(cls, data, typeName): + if (not typeName._types and + not cls.__hasattr(typeName, 'get_real_child_model')): + return data + + kwargs = {} + if typeName._types is not None: + for attr, attr_type in six.iteritems(typeName._types): + if (data is not None and + typeName._attribute_map[attr] in data and + isinstance(data, (list, dict))): + value = data[typeName._attribute_map[attr]] + kwargs[attr] = cls.unmarshall(value, attr_type) + + instance = typeName(**kwargs) + + if (isinstance(instance, dict) and + typeName._types is not None and + isinstance(data, dict)): + for key, value in data.items(): + if key not in typeName._types: + instance[key] = value + if cls.__hasattr(instance, 'get_real_child_model'): + type_name = instance.get_real_child_model(data) + if type_name: + instance = cls.unmarshall(data, type_name) + return instance + + @classmethod + def __hasattr(cls, object, name): + return name in object.__class__.__dict__ diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/service/load_csv_srv.py b/lib/workload/stateless/stacks/metadata-manager/proc/service/load_csv_srv.py index f1a49164b..8bd75d16a 100644 --- a/lib/workload/stateless/stacks/metadata-manager/proc/service/load_csv_srv.py +++ b/lib/workload/stateless/stacks/metadata-manager/proc/service/load_csv_srv.py @@ -9,6 +9,8 @@ from app.models.sample import Source from app.models.utils import get_value_from_human_readable_label from proc.service.utils import clean_model_history +from app.serializers import LibrarySerializer +from proc.aws.event.event import MetadataStateChangeEvent logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -25,6 +27,9 @@ def load_metadata_csv(df: pd.DataFrame): """ logger.info(f"Start processing LabMetadata") + # Event entries for the event bus + event_bus_entries = list() + # Used for statistics invalid_data = [] stats = { @@ -52,7 +57,6 @@ def load_metadata_csv(df: pd.DataFrame): "contact": { "create_count": 0, "update_count": 0, - "delete_count": 0, }, 'invalid_record_count': 0, } @@ -201,11 +205,29 @@ def load_metadata_csv(df: pd.DataFrame): 'subject_id': subject.orcabus_id, } ) + + lib_dict = LibrarySerializer(library).data if is_lib_created: stats['library']['create_count'] += 1 + event = MetadataStateChangeEvent( + action='CREATE', + model='LIBRARY', + ref_id=lib_dict.get('orcabus_id'), + data=lib_dict + ) + event_bus_entries.append(event.get_put_event_entry()) + if is_lib_updated: stats['library']['update_count'] += 1 + event = MetadataStateChangeEvent( + action='UPDATE', + model='LIBRARY', + ref_id=lib_dict.get('orcabus_id'), + data=lib_dict, + ) + event_bus_entries.append(event.get_put_event_entry()) + # link library to its project if project: try: diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py b/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py index 8805b3a74..ff439c2ac 100644 --- a/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py +++ b/lib/workload/stateless/stacks/metadata-manager/proc/service/tracking_sheet_srv.py @@ -5,8 +5,8 @@ from django.core.exceptions import ObjectDoesNotExist from django.db import transaction -from libumccr import libgdrive -from libumccr.aws import libssm +from libumccr import libgdrive, libjson +from libumccr.aws import libssm, libeb import logging @@ -14,6 +14,8 @@ from app.models.library import Quality, LibraryType, Phenotype, WorkflowType, sanitize_library_coverage from app.models.sample import Source from app.models.utils import get_value_from_human_readable_label +from app.serializers import LibrarySerializer +from proc.aws.event.event import MetadataStateChangeEvent from proc.service.utils import clean_model_history, sanitize_lab_metadata_df logger = logging.getLogger() @@ -35,6 +37,9 @@ def persist_lab_metadata(df: pd.DataFrame, sheet_year: str): """ logger.info(f"Start processing LabMetadata") + # Event entries for the event bus + event_bus_entries = list() + # Used for statistics invalid_data = [] stats = { @@ -46,28 +51,23 @@ def persist_lab_metadata(df: pd.DataFrame, sheet_year: str): "sample": { "create_count": 0, "update_count": 0, - "delete_count": 0, }, "subject": { "create_count": 0, "update_count": 0, - "delete_count": 0, }, "individual": { "create_count": 0, "update_count": 0, - "delete_count": 0, }, "project": { "create_count": 0, "update_count": 0, - "delete_count": 0, }, "contact": { "create_count": 0, "update_count": 0, - "delete_count": 0, }, 'invalid_record_count': 0, } @@ -83,6 +83,14 @@ def persist_lab_metadata(df: pd.DataFrame, sheet_year: str): for lib in Library.objects.filter(library_id__startswith=library_prefix).exclude( library_id__in=df['library_id'].tolist()).iterator(): stats['library']['delete_count'] += 1 + lib_dict = LibrarySerializer(lib).data + event = MetadataStateChangeEvent( + action='DELETE', + model='LIBRARY', + ref_id=lib_dict.get('orcabus_id'), + data=lib_dict + ) + event_bus_entries.append(event.get_put_event_entry()) lib.delete() # this the where records are updated, inserted, linked based on library_id @@ -210,11 +218,29 @@ def persist_lab_metadata(df: pd.DataFrame, sheet_year: str): 'subject_id': subject.orcabus_id, } ) + lib_dict = LibrarySerializer(library).data + if is_lib_created: stats['library']['create_count'] += 1 + event = MetadataStateChangeEvent( + action='CREATE', + model='LIBRARY', + ref_id=lib_dict.get('orcabus_id'), + data=lib_dict + ) + event_bus_entries.append(event.get_put_event_entry()) + if is_lib_updated: stats['library']['update_count'] += 1 + event = MetadataStateChangeEvent( + action='UPDATE', + model='LIBRARY', + ref_id=lib_dict.get('orcabus_id'), + data=lib_dict, + ) + event_bus_entries.append(event.get_put_event_entry()) + # link library to its project try: library.project_set.get(orcabus_id=project.orcabus_id) @@ -241,6 +267,11 @@ def persist_lab_metadata(df: pd.DataFrame, sheet_year: str): if len(invalid_data) > 0: logger.warning(f"Invalid record: {invalid_data}") + + if len(event_bus_entries) > 0: + logger.info(f'Dispatch event bridge entries: {libjson.dumps(event_bus_entries)}') + libeb.dispatch_events(event_bus_entries) + logger.info(f"Processed LabMetadata: {json.dumps(stats)}") return stats diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/tests/test_tracking_sheet_srv.py b/lib/workload/stateless/stacks/metadata-manager/proc/tests/test_tracking_sheet_srv.py index 596731836..9eba5ffb0 100644 --- a/lib/workload/stateless/stacks/metadata-manager/proc/tests/test_tracking_sheet_srv.py +++ b/lib/workload/stateless/stacks/metadata-manager/proc/tests/test_tracking_sheet_srv.py @@ -1,9 +1,16 @@ +import os +import json import pandas as pd +from libumccr.aws import libeb +from unittest.mock import MagicMock from django.test import TestCase -from app.models import Library, Sample, Subject, Project, Contact, Individual +from app.models import Library, Sample, Subject, Project, Contact, Individual from proc.service.tracking_sheet_srv import sanitize_lab_metadata_df, persist_lab_metadata +from .utils import check_put_event_entries_format, check_put_event_value, is_expected_event_in_output + +TEST_EVENT_BUS_NAME = "TEST_BUS" SHEET_YEAR = "2010" @@ -87,10 +94,13 @@ class TrackingSheetSrvUnitTests(TestCase): def setUp(self) -> None: - super(TrackingSheetSrvUnitTests, self).setUp() + super().setUp() + self._real_dispatch_events = libeb.dispatch_events + libeb.dispatch_events = MagicMock() def tearDown(self) -> None: - super(TrackingSheetSrvUnitTests, self).tearDown() + libeb.dispatch_events = self._real_dispatch_events + super().tearDown() def test_persist_lab_metadata(self): """ @@ -300,3 +310,101 @@ def test_save_choice_from_human_readable_label(self) -> None: spc = Sample.objects.get(sample_id=mock_record.get("SampleID")) self.assertIsNotNone(spc) self.assertEqual(spc.source, 'water', "incorrect value stored") + + def test_eb_put_event(self) -> None: + """ + python manage.py test proc.tests.test_tracking_sheet_srv.TrackingSheetSrvUnitTests.test_eb_put_event + """ + os.environ['EVENT_BUS_NAME'] = TEST_EVENT_BUS_NAME + + mock_dispatch_events = MagicMock() + libeb.dispatch_events = mock_dispatch_events + + # #### + # Test if event entries are in the correct format when CREATE new records + # #### + metadata_pd = pd.json_normalize([RECORD_1]) + metadata_pd = sanitize_lab_metadata_df(metadata_pd) + persist_lab_metadata(metadata_pd, SHEET_YEAR) + + arg = mock_dispatch_events.call_args.args[0] + expected_created_detail = [ + { + "action": "CREATE", + "model": "LIBRARY", + "refId": "lib.ULID", + "data": { + "libraryId": "L10001", + } + } + ] + + for entry in arg: + check_put_event_entries_format(self, entry) + check_put_event_value(self, entry=entry, source="orcabus.metadatamanager", + detail_type="MetadataStateChange", + event_bus_name=TEST_EVENT_BUS_NAME + ) + for event in expected_created_detail: + self.assertTrue( + is_expected_event_in_output(self, expected=event, output=[json.loads(i.get('Detail')) for i in arg])) + + # #### + # Test if record are UPDATE and event entries are correct + # #### + updated_record_1 = RECORD_1.copy() + updated_record_1['Quality'] = 'poor' + mock_dispatch_events.reset_mock() + metadata_pd = pd.json_normalize([updated_record_1]) + metadata_pd = sanitize_lab_metadata_df(metadata_pd) + persist_lab_metadata(metadata_pd, SHEET_YEAR) + + arg = mock_dispatch_events.call_args.args[0] + expected_update_detail = [ + { + "action": "UPDATE", + "model": "LIBRARY", + "refId": "lib.ULID", + "data": { + "libraryId": "L10001", + } + }, + ] + + for entry in arg: + check_put_event_entries_format(self, entry) + check_put_event_value(self, entry=entry, source="orcabus.metadatamanager", + detail_type="MetadataStateChange", + event_bus_name=TEST_EVENT_BUS_NAME + ) + for event in expected_update_detail: + self.assertTrue( + is_expected_event_in_output(self, expected=event, output=[json.loads(i.get('Detail')) for i in arg])) + # #### + # Test if the record are DELETE and event entries are correct + # #### + mock_dispatch_events.reset_mock() + empty_pd = metadata_pd.drop(0) # Remove the only one record data + persist_lab_metadata(empty_pd, SHEET_YEAR) + + arg = mock_dispatch_events.call_args.args[0] + expected_delete_detail = [ + { + "action": "DELETE", + "model": "LIBRARY", + "refId": "lib.ULID", + "data": { + "libraryId": "L10001", + } + } + ] + + for entry in arg: + check_put_event_entries_format(self, entry) + check_put_event_value(self, entry=entry, source="orcabus.metadatamanager", + detail_type="MetadataStateChange", + event_bus_name=TEST_EVENT_BUS_NAME + ) + for event in expected_delete_detail: + self.assertTrue( + is_expected_event_in_output(self, expected=event, output=[json.loads(i.get('Detail')) for i in arg])) diff --git a/lib/workload/stateless/stacks/metadata-manager/proc/tests/utils.py b/lib/workload/stateless/stacks/metadata-manager/proc/tests/utils.py new file mode 100644 index 000000000..3fd267f8b --- /dev/null +++ b/lib/workload/stateless/stacks/metadata-manager/proc/tests/utils.py @@ -0,0 +1,42 @@ +from typing import List + + +def check_put_event_entries_format(self, entry): + self.assertIn('Source', entry) + self.assertIn('DetailType', entry) + self.assertIn('Detail', entry) + self.assertIn('EventBusName', entry) + + +def check_put_event_value(self, entry: dict, source: str, detail_type: str, event_bus_name: str): + self.assertEqual(entry['Source'], source) + self.assertEqual(entry['DetailType'], detail_type) + self.assertEqual(entry['EventBusName'], event_bus_name) + + +def is_expected_event_in_output(self, expected: dict, output: List[dict]) -> bool: + """ + Check if the expected event is in the output list + """ + + def is_subset_dict(subset_dict: dict, main_dict: dict): + for key, value in subset_dict.items(): + if value != main_dict[key]: + return False + return True + + for o in output: + + try: + self.assertEqual(expected['action'], o['action']) + self.assertEqual(expected['model'], o['model']) + self.assertIn('refId', o) + + # The expected is the bare minimum data, so we need to check if the expected data is a subset of the + # actual data + self.assertTrue(is_subset_dict(main_dict=o['data'], subset_dict=expected['data'])) + return True + except AssertionError: + continue + + return False