Skip to content

Commit

Permalink
Merge pull request #4341 from broadinstitute/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
hanars authored Aug 30, 2024
2 parents adcdf12 + 26f2af0 commit 6bc7ac1
Show file tree
Hide file tree
Showing 33 changed files with 838 additions and 589 deletions.
20 changes: 16 additions & 4 deletions hail_search/queries/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,10 +637,7 @@ def _parse_intervals(self, intervals, gene_ids=None, **kwargs):
intervals = [[f'chr{interval[0]}', *interval[1:]] for interval in (intervals or [])]

if len(intervals) > MAX_GENE_INTERVALS and len(intervals) == len(gene_ids or []):
super_intervals = defaultdict(lambda: (1e9, 0))
for chrom, start, end in intervals:
super_intervals[chrom] = (min(super_intervals[chrom][0], start), max(super_intervals[chrom][1], end))
intervals = [(chrom, start, end) for chrom, (start, end) in super_intervals.items()]
intervals = self.cluster_intervals(sorted(intervals))

parsed_intervals = [
hl.eval(hl.locus_interval(*interval, reference_genome=self.GENOME_VERSION, invalid_missing=True))
Expand All @@ -659,6 +656,21 @@ def _parse_intervals(self, intervals, gene_ids=None, **kwargs):

return parsed_intervals

@classmethod
def cluster_intervals(cls, intervals, distance=100000, max_intervals=MAX_GENE_INTERVALS):
if len(intervals) <= max_intervals:
return intervals

merged_intervals = [intervals[0]]
for chrom, start, end in intervals[1:]:
prev_chrom, prev_start, prev_end = merged_intervals[-1]
if chrom == prev_chrom and start - prev_end < distance:
merged_intervals[-1] = [chrom, prev_start, max(prev_end, end)]
else:
merged_intervals.append([chrom, start, end])

return cls.cluster_intervals(merged_intervals, distance=distance+100000, max_intervals=max_intervals)

def _should_add_chr_prefix(self):
return self.GENOME_VERSION == GENOME_VERSION_GRCh38

Expand Down
21 changes: 21 additions & 0 deletions hail_search/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
FAMILY_2_MITO_SAMPLE_DATA, FAMILY_2_ALL_SAMPLE_DATA, MITO_VARIANT1, MITO_VARIANT2, MITO_VARIANT3, \
EXPECTED_SAMPLE_DATA_WITH_SEX, SV_WGS_SAMPLE_DATA_WITH_SEX, VARIANT_LOOKUP_VARIANT
from hail_search.web_app import init_web_app, sync_to_async_hail_query
from hail_search.queries.base import BaseHailTableQuery

PROJECT_2_VARIANT = {
'variantId': '1-10146-ACC-A',
Expand Down Expand Up @@ -581,6 +582,26 @@ async def test_location_search(self):
gene_ids=['ENSG00000171621'],
)

async def test_cluster_intervals(self):
intervals = [
['1', 11785723, 11806455], ['1', 91500851, 91525764], ['2', 1234, 5678], ['2', 12345, 67890],
['7', 1, 11100], ['7', 202020, 20202020],
]

self.assertListEqual(BaseHailTableQuery.cluster_intervals(intervals, max_intervals=5), [
['1', 11785723, 11806455], ['1', 91500851, 91525764], ['2', 1234, 67890],
['7', 1, 11100], ['7', 202020, 20202020],
])

self.assertListEqual(BaseHailTableQuery.cluster_intervals(intervals, max_intervals=4), [
['1', 11785723, 11806455], ['1', 91500851, 91525764], ['2', 1234, 67890], ['7', 1, 20202020],
])

self.assertListEqual(BaseHailTableQuery.cluster_intervals(intervals, max_intervals=3), [
['1', 11785723, 91525764], ['2', 1234, 67890], ['7', 1, 20202020],
])


async def test_variant_id_search(self):
await self._assert_expected_search([VARIANT2], omit_data_type='SV_WES', **RSID_SEARCH)

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,4 @@ urllib3==1.26.19
# requests
whitenoise==6.3.0
# via -r requirements.in
zipp>=3.19.1 # not directly required, pinned by Snyk to avoid a vulnerability
68 changes: 65 additions & 3 deletions seqr/management/commands/check_for_new_samples_from_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
from seqr.utils.search.add_data_utils import notify_search_data_loaded
from seqr.utils.search.utils import parse_valid_variant_id
from seqr.utils.search.hail_search_utils import hail_variant_multi_lookup, search_data_type
from seqr.views.utils.airtable_utils import AirtableSession, LOADABLE_PDO_STATUSES, AVAILABLE_PDO_STATUS
from seqr.views.utils.dataset_utils import match_and_update_search_samples
from seqr.views.utils.permissions_utils import is_internal_anvil_project, project_has_anvil
from seqr.views.utils.variant_utils import reset_cached_search_results, update_projects_saved_variant_json, \
get_saved_variants
from settings import SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL
from settings import SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, BASE_URL

logger = logging.getLogger(__name__)

Expand All @@ -25,6 +27,12 @@
USER_EMAIL = 'manage_command'
MAX_LOOKUP_VARIANTS = 5000

PDO_COPY_FIELDS = [
'PDO', 'PDOStatus', 'SeqrLoadingDate', 'GATKShortReadCallsetPath', 'SeqrProjectURL', 'TerraProjectURL',
'SequencingProduct', 'PDOName', 'SequencingSubmissionDate', 'SequencingCompletionDate', 'CallsetRequestedDate',
'CallsetCompletionDate', 'Project', 'Metrics Checked', 'gCNV_SV_CallsetPath', 'DRAGENShortReadCallsetPath',
]


class Command(BaseCommand):
help = 'Check for newly loaded seqr samples'
Expand Down Expand Up @@ -91,7 +99,7 @@ def handle(self, *args, **options):
# Reset cached results for all projects, as seqr AFs will have changed for all projects when new data is added
reset_cached_search_results(project=None)

# Send loading notifications
# Send loading notifications and update Airtable PDOs
update_sample_data_by_project = {
s['individual__family__project']: s for s in updated_samples.values('individual__family__project').annotate(
samples=ArrayAgg(JSONObject(sample_id='sample_id', individual_id='individual_id')),
Expand All @@ -100,15 +108,20 @@ def handle(self, *args, **options):
}
updated_project_families = []
updated_families = set()
split_project_pdos = {}
session = AirtableSession(user=None, no_auth=True)
for project, sample_ids in samples_by_project.items():
project_sample_data = update_sample_data_by_project[project.id]
is_internal = not project_has_anvil(project) or is_internal_anvil_project(project)
notify_search_data_loaded(
project, dataset_type, sample_type, inactivated_sample_guids,
project, is_internal, dataset_type, sample_type, inactivated_sample_guids,
updated_samples=project_sample_data['samples'], num_samples=len(sample_ids),
)
project_families = project_sample_data['family_guids']
updated_families.update(project_families)
updated_project_families.append((project.id, project.name, project.genome_version, project_families))
if is_internal and dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS:
split_project_pdos[project.name] = self._update_pdos(session, project.guid, sample_ids)

# Send failure notifications
failed_family_samples = metadata.get('failed_family_samples', {})
Expand All @@ -124,6 +137,9 @@ def handle(self, *args, **options):
)
for project, failures in failures_by_project.items():
summary = '\n'.join(sorted(failures))
split_pdos = split_project_pdos.get(project)
if split_pdos:
summary += f'\n\nSkipped samples in this project have been moved to {", ".join(split_pdos)}'
safe_post_to_slack(
SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL,
f'The following {len(failures)} families failed {check.replace("_", " ")} in {project}:\n{summary}'
Expand All @@ -138,6 +154,52 @@ def handle(self, *args, **options):

logger.info('DONE')

@staticmethod
def _update_pdos(session, project_guid, sample_ids):
airtable_samples = session.fetch_records(
'Samples', fields=['CollaboratorSampleID', 'SeqrCollaboratorSampleID', 'PDOID'],
or_filters={'PDOStatus': LOADABLE_PDO_STATUSES},
and_filters={'SeqrProject': f'{BASE_URL}project/{project_guid}/project_page'}
)

pdo_ids = set()
skipped_pdo_samples = defaultdict(list)
for record_id, sample in airtable_samples.items():
pdo_id = sample['PDOID'][0]
sample_id = sample.get('SeqrCollaboratorSampleID') or sample['CollaboratorSampleID']
if sample_id in sample_ids:
pdo_ids.add(pdo_id)
else:
skipped_pdo_samples[pdo_id].append(record_id)

if pdo_ids:
session.safe_patch_records_by_id('PDO', pdo_ids, {'PDOStatus': AVAILABLE_PDO_STATUS})

skipped_pdo_samples = {
pdo_id: sample_record_ids for pdo_id, sample_record_ids in skipped_pdo_samples.items() if pdo_id in pdo_ids
}
if not skipped_pdo_samples:
return []

pdos_to_create = {
f"{pdo.pop('PDO')}_sr": (record_id, pdo) for record_id, pdo in session.fetch_records(
'PDO', fields=PDO_COPY_FIELDS, or_filters={'RECORD_ID()': list(skipped_pdo_samples.keys())}
).items()
}

# Create PDOs and then update Samples with new PDOs
# Does not create PDOs with Samples directly as that would not remove Samples from old PDOs
new_pdos = session.safe_create_records('PDO', [
{'PDO': pdo_name, **pdo} for pdo_name, (_, pdo) in pdos_to_create.items()
])
pdo_id_map = {pdos_to_create[record['fields']['PDO']][0]: record['id'] for record in new_pdos}
for pdo_id, sample_record_ids in skipped_pdo_samples.items():
new_pdo_id = pdo_id_map.get(pdo_id)
if new_pdo_id:
session.safe_patch_records_by_id('Samples', sample_record_ids, {'PDOID': [new_pdo_id]})

return sorted(pdos_to_create.keys())

@staticmethod
def _reload_shared_variant_annotations(data_type, genome_version, updated_variants_by_id=None, exclude_families=None):
dataset_type = data_type.split('_')[0]
Expand Down
128 changes: 123 additions & 5 deletions seqr/management/tests/check_for_new_samples_from_pipeline_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,72 @@
f'<a href=https://seqr.broadinstitute.org/project/{PROJECT_GUID}/project_page>Test Reprocessed Project</a>' \
f'<br /><br />All the best,<br />The seqr team'

PDO_QUERY_FIELDS = '&'.join([f'fields[]={field}' for field in [
'PDO', 'PDOStatus', 'SeqrLoadingDate', 'GATKShortReadCallsetPath', 'SeqrProjectURL', 'TerraProjectURL',
'SequencingProduct', 'PDOName', 'SequencingSubmissionDate', 'SequencingCompletionDate', 'CallsetRequestedDate',
'CallsetCompletionDate', 'Project', 'Metrics Checked', 'gCNV_SV_CallsetPath', 'DRAGENShortReadCallsetPath',
]])
AIRTABLE_SAMPLE_RECORDS = {
'records': [
{
'id': 'rec2B6OGmQpAkQW3s',
'fields': {
'CollaboratorSampleID': 'NA19675_1',
'PDOID': ['recW24C2CJW5lT64K'],
},
},
{
'id': 'recfMYDEZpPtzAIeV',
'fields': {
'CollaboratorSampleID': 'NA19678',
'PDOID': ['recW24C2CJW5lT64K'],
},
},
{
'id': 'rec2B67GmXpAkQW8z',
'fields': {
'CollaboratorSampleID': 'NA19679',
'PDOID': ['rec2Nkg10N1KssPc3'],
},
},
{
'id': 'rec2Nkg10N1KssPc3',
'fields': {
'SeqrCollaboratorSampleID': 'HG00731',
'CollaboratorSampleID': 'VCGS_FAM203_621_D2',
'PDOID': ['recW24C2CJW5lT64K'],
},
},
{
'id': 'recrbZh9Hn1UFtMi2',
'fields': {
'SeqrCollaboratorSampleID': 'NA20888',
'CollaboratorSampleID': 'NA20888_D1',
'PDOID': ['recW24C2CJW5lT64K'],
},
},
{
'id': 'rec2Nkg1fKssJc7',
'fields': {
'CollaboratorSampleID': 'NA20889',
'PDOID': ['rec0RWBVfDVbtlBSL'],
},
},
]}
AIRTABLE_PDO_RECORDS = {
'records': [
{
'id': 'recW24C2CJW5lT64K',
'fields': {
'PDO': 'PDO-1234',
'SeqrProjectURL': 'https://test-seqr.org/project/R0003_test/project_page',
'PDOStatus': 'Methods (Loading)',
'PDOName': 'RGP_WGS_12',
}
},
]
}


@mock.patch('seqr.utils.search.hail_search_utils.HAIL_BACKEND_SERVICE_HOSTNAME', MOCK_HAIL_HOST)
@mock.patch('seqr.models.random.randint', lambda *args: GUID_ID)
Expand Down Expand Up @@ -102,9 +168,9 @@ def _test_success(self, path, metadata, dataset_type, sample_guids, reload_calls
])

# Test reload saved variants
self.assertEqual(len(responses.calls), len(reload_calls) + (3 if has_additional_requests else 0))
self.assertEqual(len(responses.calls), len(reload_calls) + (9 if has_additional_requests else 0))
for i, call in enumerate(reload_calls):
resp = responses.calls[i+(1 if has_additional_requests else 0)]
resp = responses.calls[i+(7 if has_additional_requests else 0)]
self.assertEqual(resp.request.url, f'{MOCK_HAIL_HOST}:5000/search')
self.assertEqual(resp.request.headers.get('From'), 'manage_command')
self.assertDictEqual(json.loads(resp.request.body), call)
Expand All @@ -123,6 +189,8 @@ def _test_success(self, path, metadata, dataset_type, sample_guids, reload_calls
)

@mock.patch('seqr.management.commands.check_for_new_samples_from_pipeline.MAX_LOOKUP_VARIANTS', 1)
@mock.patch('seqr.management.commands.check_for_new_samples_from_pipeline.BASE_URL', 'https://test-seqr.org/')
@mock.patch('seqr.views.utils.airtable_utils.MAX_UPDATE_RECORDS', 2)
@mock.patch('seqr.views.utils.airtable_utils.logger')
@mock.patch('seqr.utils.communication_utils.EmailMultiAlternatives')
@responses.activate
Expand All @@ -131,6 +199,21 @@ def test_command(self, mock_email, mock_airtable_utils):
responses.GET,
"http://testairtable/appUelDNM3BnWaR7M/AnVIL%20Seqr%20Loading%20Requests%20Tracking?fields[]=Status&pageSize=2&filterByFormula=AND({AnVIL Project URL}='https://seqr.broadinstitute.org/project/R0004_non_analyst_project/project_page',OR(Status='Loading',Status='Loading Requested'))",
json={'records': [{'id': 'rec12345', 'fields': {}}, {'id': 'rec67890', 'fields': {}}]})
airtable_samples_url = 'http://testairtable/app3Y97xtbbaOopVR/Samples'
airtable_pdo_url = 'http://testairtable/app3Y97xtbbaOopVR/PDO'
responses.add(
responses.GET,
f"{airtable_samples_url}?fields[]=CollaboratorSampleID&fields[]=SeqrCollaboratorSampleID&fields[]=PDOID&pageSize=100&filterByFormula=AND({{SeqrProject}}='https://test-seqr.org/project/R0003_test/project_page',OR(PDOStatus='Methods (Loading)',PDOStatus='On hold for phenotips, but ready to load'))",
json=AIRTABLE_SAMPLE_RECORDS)
responses.add(
responses.GET,
f"{airtable_pdo_url}?{PDO_QUERY_FIELDS}&pageSize=100&filterByFormula=OR(RECORD_ID()='recW24C2CJW5lT64K')",
json=AIRTABLE_PDO_RECORDS)
responses.add(responses.PATCH, airtable_samples_url, json=AIRTABLE_SAMPLE_RECORDS)
responses.add(responses.PATCH, airtable_pdo_url, status=400)
responses.add_callback(responses.POST, airtable_pdo_url, callback=lambda request: (200, {}, json.dumps({
'records': [{'id': f'rec{i}ABC123', **r} for i, r in enumerate(json.loads(request.body)['records'])]
})))
responses.add(responses.POST, f'{MOCK_HAIL_HOST}:5000/search', status=200, json={
'results': [{'variantId': '1-248367227-TC-T', 'familyGuids': ['F000014_14'], 'updated_field': 'updated_value'}],
'total': 1,
Expand Down Expand Up @@ -263,9 +346,40 @@ def test_command(self, mock_email, mock_airtable_utils):
])
self.assertEqual(Family.objects.get(guid='F000014_14').analysis_status, 'Rncc')

# Test airtable PDO updates
update_pdos_request = responses.calls[1].request
self.assertEqual(update_pdos_request.url, airtable_pdo_url)
self.assertEqual(update_pdos_request.method, 'PATCH')
self.assertDictEqual(json.loads(update_pdos_request.body), {'records': [
{'id': 'rec0RWBVfDVbtlBSL', 'fields': {'PDOStatus': 'Available in seqr'}},
{'id': 'recW24C2CJW5lT64K', 'fields': {'PDOStatus': 'Available in seqr'}},
]})
create_pdos_request = responses.calls[3].request
self.assertEqual(create_pdos_request.url, airtable_pdo_url)
self.assertEqual(create_pdos_request.method, 'POST')
self.assertDictEqual(json.loads(create_pdos_request.body), {'records': [{'fields': {
'PDO': 'PDO-1234_sr',
'SeqrProjectURL': 'https://test-seqr.org/project/R0003_test/project_page',
'PDOStatus': 'Methods (Loading)',
'PDOName': 'RGP_WGS_12',
}}]})
update_samples_request = responses.calls[4].request
self.assertEqual(update_samples_request.url, airtable_samples_url)
self.assertEqual(update_samples_request.method, 'PATCH')
self.assertDictEqual(json.loads(update_samples_request.body), {'records': [
{'id': 'rec2B6OGmQpAkQW3s', 'fields': {'PDOID': ['rec0ABC123']}},
{'id': 'rec2Nkg10N1KssPc3', 'fields': {'PDOID': ['rec0ABC123']}},
]})
update_samples_request_2 = responses.calls[5].request
self.assertEqual(update_samples_request_2.url, airtable_samples_url)
self.assertEqual(update_samples_request_2.method, 'PATCH')
self.assertDictEqual(json.loads(update_samples_request_2.body), {'records': [
{'id': 'recfMYDEZpPtzAIeV', 'fields': {'PDOID': ['rec0ABC123']}},
]})

# Test SavedVariant model updated
for i, variant_id in enumerate([['1', 1562437, 'G', 'CA'], ['1', 46859832, 'G', 'A']]):
multi_lookup_request = responses.calls[3+i].request
multi_lookup_request = responses.calls[9+i].request
self.assertEqual(multi_lookup_request.url, f'{MOCK_HAIL_HOST}:5000/multi_lookup')
self.assertEqual(multi_lookup_request.headers.get('From'), 'manage_command')
self.assertDictEqual(json.loads(multi_lookup_request.body), {
Expand Down Expand Up @@ -348,11 +462,15 @@ def test_command(self, mock_email, mock_airtable_utils):
self.assertDictEqual(mock_email.return_value.esp_extra, {'MessageStream': 'seqr-notifications'})
self.assertDictEqual(mock_email.return_value.merge_data, {})

mock_airtable_utils.error.assert_called_with(
self.assertEqual(mock_airtable_utils.error.call_count, 2)
mock_airtable_utils.error.assert_has_calls([mock.call(
f'Airtable patch "PDO" error: 400 Client Error: Bad Request for url: {airtable_pdo_url}', None, detail={
'record_ids': {'rec0RWBVfDVbtlBSL', 'recW24C2CJW5lT64K'}, 'update': {'PDOStatus': 'Available in seqr'}}
), mock.call(
'Airtable patch "AnVIL Seqr Loading Requests Tracking" error: Unable to identify record to update', None, detail={
'or_filters': {'Status': ['Loading', 'Loading Requested']},
'and_filters': {'AnVIL Project URL': 'https://seqr.broadinstitute.org/project/R0004_non_analyst_project/project_page'},
'update': {'Status': 'Available in Seqr'}})
'update': {'Status': 'Available in Seqr'}})])

self.assertEqual(self.manager_user.notifications.count(), 3)
self.assertEqual(
Expand Down
3 changes: 1 addition & 2 deletions seqr/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
forgot_password

from seqr.views.apis.data_manager_api import elasticsearch_status, upload_qc_pipeline_output, delete_index, \
update_rna_seq, load_rna_seq_sample_data, proxy_to_kibana, load_phenotype_prioritization_data, write_pedigree, \
update_rna_seq, load_rna_seq_sample_data, proxy_to_kibana, load_phenotype_prioritization_data, \
validate_callset, get_loaded_projects, load_data
from seqr.views.apis.report_api import \
anvil_export, \
Expand Down Expand Up @@ -333,7 +333,6 @@
'data_management/update_rna_seq': update_rna_seq,
'data_management/load_rna_seq_sample/(?P<sample_guid>[^/]+)': load_rna_seq_sample_data,
'data_management/load_phenotype_prioritization_data': load_phenotype_prioritization_data,
'data_management/write_pedigree/(?P<project_guid>[^/]+)': write_pedigree,
'data_management/validate_callset': validate_callset,
'data_management/loaded_projects/(?P<sample_type>[^/]+)/(?P<dataset_type>[^/]+)': get_loaded_projects,
'data_management/load_data': load_data,
Expand Down
Loading

0 comments on commit 6bc7ac1

Please sign in to comment.