Skip to content

Commit 0c262d5

Browse files
add custom csv loader
1 parent 978ce55 commit 0c262d5

File tree

7 files changed

+379
-75
lines changed

7 files changed

+379
-75
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import logging
2+
from django.core.management import BaseCommand
3+
from libumccr import libjson
4+
5+
from handler.load_custom_metadata_csv import handler
6+
7+
logger = logging.getLogger()
8+
logger.setLevel(logging.INFO)
9+
10+
11+
class Command(BaseCommand):
12+
help = "Trigger lambda handler for to sync metadata from csv url"
13+
14+
def handle(self, *args, **options):
15+
event = {
16+
"url" :"SOME_URL",
17+
}
18+
19+
print(f"Trigger lambda handler for sync tracking sheet. Event {libjson.dumps(event)}")
20+
result = handler(event, {})
21+
22+
print(f"result: {libjson.dumps(result)}")

lib/workload/stateless/stacks/metadata-manager/app/models/library.py

+14
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,17 @@ class Library(BaseModel):
9999

100100
# history
101101
history = HistoricalRecords(m2m_fields=[project_set])
102+
103+
104+
def sanitize_library_coverage(value: str):
105+
"""
106+
convert value that is valid in the tracking sheet to return a value that is recognizable by the Django Model
107+
"""
108+
try:
109+
# making coverage is float-able type
110+
lib_coverage = float(value)
111+
return f'{lib_coverage}'
112+
113+
except (ValueError, TypeError):
114+
return None
115+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import django
2+
import os
3+
import logging
4+
5+
from libumccr import libjson
6+
7+
from proc.service.utils import sanitize_lab_metadata_df, warn_drop_duplicated_library
8+
9+
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings.base')
10+
django.setup()
11+
12+
from proc.service.load_csv_srv import load_metadata_csv, download_csv_to_pandas
13+
14+
logger = logging.getLogger()
15+
logger.setLevel(logging.INFO)
16+
17+
18+
def handler(event, _context):
19+
logger.info(f'event: {libjson.dumps(event)}')
20+
21+
csv_url = event.get('url', None)
22+
if csv_url is None:
23+
raise ValueError("URL is required")
24+
25+
csv_df = download_csv_to_pandas(csv_url)
26+
sanitize_df = sanitize_lab_metadata_df(csv_df)
27+
duplicate_clean_df = warn_drop_duplicated_library(sanitize_df)
28+
result = load_metadata_csv(duplicate_clean_df)
29+
30+
logger.info(f'persist report: {libjson.dumps(result)}')
31+
return result
32+
33+
34+
if __name__ == '__main__':
35+
handler({}, {})

lib/workload/stateless/stacks/metadata-manager/handler/sync_tracking_sheet.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings.base')
88
django.setup()
99

10-
from proc.service.tracking_sheet_srv import download_tracking_sheet, sanitize_lab_metadata_df, persist_lab_metadata, \
11-
warn_drop_duplicated_library
10+
from proc.service.tracking_sheet_srv import download_tracking_sheet, persist_lab_metadata
11+
from proc.service.utils import sanitize_lab_metadata_df, warn_drop_duplicated_library
1212

1313
logger = logging.getLogger()
1414
logger.setLevel(logging.INFO)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
import json
2+
import logging
3+
import pandas as pd
4+
from django.core.exceptions import ObjectDoesNotExist
5+
from django.db import transaction
6+
7+
from app.models import Subject, Sample, Library, Project, Contact, Individual
8+
from app.models.library import Quality, LibraryType, Phenotype, WorkflowType, sanitize_library_coverage
9+
from app.models.sample import Source
10+
from app.models.utils import get_value_from_human_readable_label
11+
from proc.service.utils import clean_model_history
12+
13+
logger = logging.getLogger()
14+
logger.setLevel(logging.INFO)
15+
16+
17+
@transaction.atomic
18+
def load_metadata_csv(df: pd.DataFrame):
19+
"""
20+
Persist metadata records from a pandas dataframe into the db. No record deletion is performed in this method.
21+
22+
Args:
23+
df (pd.DataFrame): The source of truth for the metadata in this particular year
24+
25+
"""
26+
logger.info(f"Start processing LabMetadata")
27+
28+
# Used for statistics
29+
invalid_data = []
30+
stats = {
31+
"library": {
32+
"create_count": 0,
33+
"update_count": 0,
34+
},
35+
"sample": {
36+
"create_count": 0,
37+
"update_count": 0,
38+
39+
},
40+
"subject": {
41+
"create_count": 0,
42+
"update_count": 0,
43+
},
44+
"individual": {
45+
"create_count": 0,
46+
"update_count": 0,
47+
},
48+
"project": {
49+
"create_count": 0,
50+
"update_count": 0,
51+
},
52+
"contact": {
53+
"create_count": 0,
54+
"update_count": 0,
55+
"delete_count": 0,
56+
},
57+
'invalid_record_count': 0,
58+
}
59+
60+
# this the where records are updated, inserted, linked based on library_id
61+
for record in df.to_dict('records'):
62+
try:
63+
# 1. update or create all data in the model from the given record
64+
65+
# ------------------------------
66+
# Individual
67+
# ------------------------------
68+
idv = None
69+
individual_id = record.get('individual_id')
70+
source = record.get('source')
71+
72+
if individual_id and source:
73+
74+
idv, is_idv_created, is_idv_updated = Individual.objects.update_or_create_if_needed(
75+
search_key={
76+
"individual_id": individual_id,
77+
"source": source
78+
},
79+
data={
80+
"individual_id": individual_id,
81+
"source": source
82+
}
83+
)
84+
if is_idv_created:
85+
stats['individual']['create_count'] += 1
86+
if is_idv_updated:
87+
stats['individual']['update_count'] += 1
88+
89+
# ------------------------------
90+
# Subject
91+
# ------------------------------
92+
93+
subject_id = record.get('subject_id')
94+
subject, is_sub_created, is_sub_updated = Subject.objects.update_or_create_if_needed(
95+
search_key={"subject_id": subject_id},
96+
data={
97+
"subject_id": subject_id,
98+
}
99+
)
100+
101+
if is_sub_created:
102+
stats['subject']['create_count'] += 1
103+
if is_sub_updated:
104+
stats['subject']['update_count'] += 1
105+
106+
if idv:
107+
# link individual to external subject
108+
try:
109+
subject.individual_set.get(orcabus_id=idv.orcabus_id)
110+
except ObjectDoesNotExist:
111+
subject.individual_set.add(idv)
112+
113+
# We update the stats when new idv is linked to sbj, only if this is not recorded as
114+
# update/create in previous upsert method
115+
if not is_sub_created and not is_sub_updated:
116+
stats['subject']['update_count'] += 1
117+
118+
# ------------------------------
119+
# Sample
120+
# ------------------------------
121+
sample = None
122+
sample_id = record.get('sample_id')
123+
if sample_id:
124+
sample, is_smp_created, is_smp_updated = Sample.objects.update_or_create_if_needed(
125+
search_key={"sample_id": sample_id},
126+
data={
127+
"sample_id": record.get('sample_id'),
128+
"external_sample_id": record.get('external_sample_id'),
129+
"source": get_value_from_human_readable_label(Source.choices, record.get('source')),
130+
}
131+
)
132+
if is_smp_created:
133+
stats['sample']['create_count'] += 1
134+
if is_smp_updated:
135+
stats['sample']['update_count'] += 1
136+
137+
# ------------------------------
138+
# Contact
139+
# ------------------------------
140+
contact = None
141+
contact_id = record.get('project_owner')
142+
143+
if contact_id:
144+
contact, is_ctc_created, is_ctc_updated = Contact.objects.update_or_create_if_needed(
145+
search_key={"contact_id": record.get('project_owner')},
146+
data={
147+
"contact_id": record.get('project_owner'),
148+
}
149+
)
150+
if is_ctc_created:
151+
stats['contact']['create_count'] += 1
152+
if is_ctc_updated:
153+
stats['contact']['update_count'] += 1
154+
155+
# ------------------------------
156+
# Project: Upsert project with contact as part of the project
157+
# ------------------------------
158+
project = None
159+
160+
project_id = record.get('project_name')
161+
if project_id:
162+
project, is_prj_created, is_prj_updated = Project.objects.update_or_create_if_needed(
163+
search_key={"project_id": record.get('project_name')},
164+
data={
165+
"project_id": record.get('project_name'),
166+
}
167+
)
168+
if is_prj_created:
169+
stats['project']['create_count'] += 1
170+
if is_prj_updated:
171+
stats['project']['update_count'] += 1
172+
173+
# link project to its contact of exist
174+
if contact:
175+
try:
176+
project.contact_set.get(orcabus_id=contact.orcabus_id)
177+
except ObjectDoesNotExist:
178+
project.contact_set.add(contact)
179+
180+
# We update the stats when new ctc is linked to prj, only if this is not recorded as
181+
# update/create in previous upsert method
182+
if not is_prj_created and not is_prj_updated:
183+
stats['project']['update_count'] += 1
184+
185+
# ------------------------------
186+
# Library: Upsert library record with related sample, subject, project
187+
# ------------------------------
188+
library, is_lib_created, is_lib_updated = Library.objects.update_or_create_if_needed(
189+
search_key={"library_id": record.get('library_id')},
190+
data={
191+
'library_id': record.get('library_id'),
192+
'phenotype': get_value_from_human_readable_label(Phenotype.choices, record.get('phenotype')),
193+
'workflow': get_value_from_human_readable_label(WorkflowType.choices, record.get('workflow')),
194+
'quality': get_value_from_human_readable_label(Quality.choices, record.get('quality')),
195+
'type': get_value_from_human_readable_label(LibraryType.choices, record.get('type')),
196+
'assay': record.get('assay'),
197+
'coverage': sanitize_library_coverage(record.get('coverage')),
198+
199+
# relationships
200+
'sample_id': sample.orcabus_id,
201+
'subject_id': subject.orcabus_id,
202+
}
203+
)
204+
if is_lib_created:
205+
stats['library']['create_count'] += 1
206+
if is_lib_updated:
207+
stats['library']['update_count'] += 1
208+
209+
# link library to its project
210+
if project:
211+
try:
212+
library.project_set.get(orcabus_id=project.orcabus_id)
213+
except ObjectDoesNotExist:
214+
library.project_set.add(project)
215+
216+
# We update the stats when new project is linked to library, only if this is not recorded as
217+
# update/create in previous upsert method
218+
if not is_lib_created and not is_lib_updated:
219+
stats['library']['update_count'] += 1
220+
221+
except Exception as e:
222+
if any(record.values()):
223+
stats['invalid_record_count'] += 1
224+
invalid_data.append({
225+
"reason": e,
226+
"data": record
227+
})
228+
continue
229+
230+
# clean up history for django-simple-history model if any
231+
# Only clean for the past 15 minutes as this is what the maximum lambda cutoff
232+
clean_model_history(minutes=15)
233+
234+
logger.warning(f"Invalid record: {invalid_data}")
235+
logger.info(f"Processed LabMetadata: {json.dumps(stats)}")
236+
return stats
237+
238+
239+
def download_csv_to_pandas(url: str) -> pd.DataFrame:
240+
"""
241+
Download csv file from a given url and return it as a pandas dataframe
242+
"""
243+
return pd.read_csv(url)

0 commit comments

Comments
 (0)