Skip to content

Commit

Permalink
Merge pull request #3967 from broadinstitute/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
hanars authored Mar 13, 2024
2 parents 8b4a47e + a9c80e6 commit 059bce6
Show file tree
Hide file tree
Showing 35 changed files with 372 additions and 166 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## dev

## 3/13/24
* Add "Probably Solved" analysis status (REQUIRES DB MIGRATION)

## 3/1/24
* Add subscribable project notifications (REQUIRES DB MIGRATION)

Expand Down
45 changes: 32 additions & 13 deletions hail_search/queries/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ def _parse_sample_data(self, sample_data):
def _load_filtered_project_hts(self, project_samples, skip_all_missing=False, **kwargs):
filtered_project_hts = []
exception_messages = set()
num_projects = len(project_samples)
for i, (project_guid, project_sample_data) in enumerate(project_samples.items()):
project_ht = self._read_table(
f'projects/{project_guid}.ht',
Expand All @@ -292,7 +291,7 @@ def _load_filtered_project_hts(self, project_samples, skip_all_missing=False, **
continue
try:
filtered_project_hts.append(
(*self._filter_entries_table(project_ht, project_sample_data, num_projects=num_projects, **kwargs), len(project_sample_data))
(*self._filter_entries_table(project_ht, project_sample_data, **kwargs), len(project_sample_data))
)
except HTTPBadRequest as e:
exception_messages.add(e.reason)
Expand All @@ -305,6 +304,13 @@ def _load_filtered_project_hts(self, project_samples, skip_all_missing=False, **
return filtered_project_hts

def import_filtered_table(self, project_samples, num_families, intervals=None, **kwargs):
use_annotations_ht_first = len(project_samples) > 1 and (kwargs.get('parsed_intervals') or kwargs.get('padded_interval'))
if use_annotations_ht_first:
# For multi-project interval search, faster to first read and filter the annotation table and then add entries
ht = self._read_table('annotations.ht')
ht = self._filter_annotated_table(ht, **kwargs, is_comp_het=self._has_comp_het_search)
self._load_table_kwargs['variant_ht'] = ht.select()

if num_families == 1:
family_sample_data = list(project_samples.values())[0]
family_guid = list(family_sample_data.keys())[0]
Expand Down Expand Up @@ -338,13 +344,17 @@ def import_filtered_table(self, project_samples, num_families, intervals=None, *
logger.info(f'Found {num_projects_added} {self.DATA_TYPE} projects with matched entries')

if comp_het_families_ht is not None:
comp_het_ht = self._query_table_annotations(comp_het_families_ht, self._get_table_path('annotations.ht'))
self._comp_het_ht = self._filter_annotated_table(comp_het_ht, is_comp_het=True, **kwargs)
self._comp_het_ht = self._query_table_annotations(comp_het_families_ht, self._get_table_path('annotations.ht'))
if not use_annotations_ht_first:
self._comp_het_ht = self._filter_annotated_table(self._comp_het_ht, is_comp_het=True, **kwargs)
self._comp_het_ht = self._filter_compound_hets()

if families_ht is not None:
ht = self._query_table_annotations(families_ht, self._get_table_path('annotations.ht'))
self._ht = self._filter_annotated_table(ht, **kwargs)
self._ht = self._query_table_annotations(families_ht, self._get_table_path('annotations.ht'))
if not use_annotations_ht_first:
self._ht = self._filter_annotated_table(self._ht, **kwargs)
elif self._has_comp_het_search:
self._ht = self._filter_by_annotations(self._ht, **(kwargs.get('parsed_annotations') or {}))

def _add_project_ht(self, families_ht, project_ht, default, default_1=None):
if default_1 is None:
Expand Down Expand Up @@ -569,7 +579,7 @@ def _filter_annotated_table(self, ht, gene_ids=None, rs_ids=None, frequencies=No

ht = self._filter_by_in_silico(ht, in_silico)

return self._filter_by_annotations(ht, is_comp_het, **(parsed_annotations or {}))
return self._filter_by_annotations(ht, is_comp_het=is_comp_het, **(parsed_annotations or {}))

def _filter_by_gene_ids(self, ht, gene_ids):
gene_ids = hl.set(gene_ids)
Expand Down Expand Up @@ -730,7 +740,7 @@ def _parse_annotations(self, annotations, annotations_secondary, **kwargs):
})
return parsed_annotations

def _filter_by_annotations(self, ht, is_comp_het, consequence_ids=None, annotation_overrides=None,
def _filter_by_annotations(self, ht, is_comp_het=False, consequence_ids=None, annotation_overrides=None,
secondary_consequence_ids=None, secondary_annotation_overrides=None, **kwargs):

annotation_exprs = {}
Expand Down Expand Up @@ -1051,12 +1061,23 @@ def gene_counts(self):
ht.gene_ids, hl.struct(total=hl.agg.count(), families=hl.agg.counter(ht.families))
))

def lookup_variant(self, variant_id, sample_data=None):
self._parse_intervals(intervals=None, variant_ids=[variant_id], variant_keys=[variant_id])
def lookup_variants(self, variant_ids, annotation_fields=None):
self._parse_intervals(intervals=None, variant_ids=variant_ids, variant_keys=variant_ids)
ht = self._read_table('annotations.ht', drop_globals=['paths', 'versions'])
self._load_table_kwargs['_n_partitions'] = 1
ht = ht.filter(hl.is_defined(ht[XPOS]))

if not annotation_fields:
annotation_fields = {
k: v for k, v in self.annotation_fields(include_genotype_overrides=False).items()
if k not in {FAMILY_GUID_FIELD, GENOTYPES_FIELD, 'genotypeFilters'}
}

formatted = self._format_results(ht.key_by(), annotation_fields=annotation_fields, include_genotype_overrides=False)

return formatted.aggregate(hl.agg.take(formatted.row, len(variant_ids)))

def lookup_variant(self, variant_id, sample_data=None):
annotation_fields = self.annotation_fields(include_genotype_overrides=False)
entry_annotations = {k: annotation_fields[k] for k in [FAMILY_GUID_FIELD, GENOTYPES_FIELD]}
annotation_fields.update({
Expand All @@ -1065,9 +1086,7 @@ def lookup_variant(self, variant_id, sample_data=None):
'genotypeFilters': lambda ht: hl.str(''),
})

formatted = self._format_results(ht.key_by(), annotation_fields=annotation_fields, include_genotype_overrides=False)

variants = formatted.aggregate(hl.agg.take(formatted.row, 1))
variants = self.lookup_variants([variant_id], annotation_fields=annotation_fields)
if not variants:
raise HTTPNotFound()
variant = dict(variants[0])
Expand Down
4 changes: 2 additions & 2 deletions hail_search/queries/snv_indel.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ class SnvIndelHailTableQuery(MitoHailTableQuery):
('is_gt_10_percent', 0.1),
])

def _prefilter_entries_table(self, ht, *args, num_projects=1, **kwargs):
def _prefilter_entries_table(self, ht, *args, **kwargs):
ht = super()._prefilter_entries_table(ht, *args, **kwargs)
if num_projects > 1 or not self._load_table_kwargs.get('_filter_intervals'):
if 'variant_ht' not in self._load_table_kwargs and not self._load_table_kwargs.get('_filter_intervals'):
af_ht = self._get_loaded_filter_ht(
GNOMAD_GENOMES_FIELD, 'high_af_variants.ht', self._get_gnomad_af_prefilter, **kwargs)
if af_ht:
Expand Down
11 changes: 0 additions & 11 deletions hail_search/queries/sv.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,6 @@ class SvHailTableQuery(BaseHailTableQuery):
def _get_sample_type(cls, *args):
return cls.DATA_TYPE.split('_')[-1]

def import_filtered_table(self, project_samples, *args, parsed_intervals=None, padded_interval=None, **kwargs):
if len(project_samples) > 1 and (parsed_intervals or padded_interval):
# For multi-project interval search, faster to first read and filter the annotation table and then add entries
ht = self._read_table('annotations.ht')
ht = self._filter_annotated_table(ht, parsed_intervals=parsed_intervals, padded_interval=padded_interval)
self._load_table_kwargs['variant_ht'] = ht.select()
parsed_intervals = None
padded_interval = None

return super().import_filtered_table(project_samples, *args, parsed_intervals=parsed_intervals, padded_interval=padded_interval, **kwargs)

def _filter_annotated_table(self, ht, *args, parsed_intervals=None, exclude_intervals=False, padded_interval=None, **kwargs):
if parsed_intervals:
interval_filter = hl.array(parsed_intervals).any(lambda interval: hl.if_else(
Expand Down
5 changes: 5 additions & 0 deletions hail_search/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ def lookup_variant(request):
return query.lookup_variant(request['variant_id'], sample_data=request.get('sample_data'))


def lookup_variants(request):
query = QUERY_CLASS_MAP[(request['data_type'], request['genome_version'])](sample_data=None)
return query.lookup_variants(request['variant_ids'])


def load_globals():
for cls in QUERY_CLASS_MAP.values():
cls.load_globals()
30 changes: 28 additions & 2 deletions hail_search/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@
'numAlt': 1, 'dp': 28, 'gq': 99, 'ab': 0.5,
}

NO_GENOTYPE_GCNV_VARIANT = {**GCNV_VARIANT4, 'numExon': 8, 'end': 38736268}

# Ensures no variants are filtered out by annotation/path filters for compound hets
COMP_HET_ALL_PASS_FILTERS = {
'annotations': {'splice_ai': '0.0', 'structural': ['DEL', 'CPX', 'INS', 'gCNV_DEL', 'gCNV_DUP']},
Expand Down Expand Up @@ -694,19 +696,43 @@ async def test_variant_lookup(self):
self.assertEqual(resp.status, 200)
resp_json = await resp.json()
self.assertDictEqual(resp_json, {
**GCNV_VARIANT4, 'numExon': 8, 'end': 38736268, 'familyGuids': [], 'genotypes': {}, 'genotypeFilters': '',
**NO_GENOTYPE_GCNV_VARIANT, 'familyGuids': [], 'genotypes': {}, 'genotypeFilters': '',
})

async with self.client.request('POST', '/lookup', json={**body, 'sample_data': EXPECTED_SAMPLE_DATA['SV_WES']}) as resp:
self.assertEqual(resp.status, 200)
resp_json = await resp.json()
self.assertDictEqual(resp_json, {
**GCNV_VARIANT4, 'numExon': 8, 'end': 38736268, 'genotypes': {
**NO_GENOTYPE_GCNV_VARIANT, 'genotypes': {
individual: {k: v for k, v in genotype.items() if k not in {'start', 'end', 'numExon', 'geneIds'}}
for individual, genotype in GCNV_VARIANT4['genotypes'].items()
}
})

async def test_multi_variant_lookup(self):
await self._test_multi_lookup(VARIANT_ID_SEARCH['variant_ids'], 'SNV_INDEL', [VARIANT1])

await self._test_multi_lookup([['7', 143270172, 'A', 'G']], 'SNV_INDEL', [GRCH37_VARIANT], genome_version='GRCh37')

await self._test_multi_lookup([['M', 4429, 'G', 'A'], ['M', 14783, 'T', 'C']], 'MITO', [MITO_VARIANT1, MITO_VARIANT3])

await self._test_multi_lookup(
['cohort_2911.chr1.final_cleanup_INS_chr1_160', 'phase2_DEL_chr14_4640'],
'SV_WGS', [SV_VARIANT2, SV_VARIANT4],
)

await self._test_multi_lookup(['suffix_140608_DUP'], 'SV_WES', [NO_GENOTYPE_GCNV_VARIANT])

async def _test_multi_lookup(self, variant_ids, data_type, results, genome_version='GRCh38'):
body = {'genome_version': genome_version, 'data_type': data_type, 'variant_ids': variant_ids}
async with self.client.request('POST', '/multi_lookup', json=body) as resp:
self.assertEqual(resp.status, 200)
resp_json = await resp.json()
self.assertDictEqual(resp_json, {'results': [
{k: v for k, v in variant.items() if k not in {'familyGuids', 'genotypes', 'genotypeFilters'}}
for variant in results
]})

async def test_frequency_filter(self):
sv_callset_filter = {'sv_callset': {'af': 0.05}}
await self._assert_expected_search(
Expand Down
8 changes: 7 additions & 1 deletion hail_search/web_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import traceback
from typing import Callable

from hail_search.search import search_hail_backend, load_globals, lookup_variant
from hail_search.search import search_hail_backend, load_globals, lookup_variant, lookup_variants

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -64,6 +64,11 @@ async def lookup(request: web.Request) -> web.Response:
return web.json_response(result, dumps=hl_json_dumps)


async def multi_lookup(request: web.Request) -> web.Response:
result = await sync_to_async_hail_query(request, lookup_variants)
return web.json_response({'results': result}, dumps=hl_json_dumps)


async def status(request: web.Request) -> web.Response:
return web.json_response({'success': True})

Expand All @@ -84,6 +89,7 @@ async def init_web_app():
web.post('/search', search),
web.post('/gene_counts', gene_counts),
web.post('/lookup', lookup),
web.post('/multi_lookup', multi_lookup),
])
# The idea here is to run the hail queries off the main thread so that the
# event loop stays live and the /status check is responsive. We only
Expand Down
22 changes: 6 additions & 16 deletions matchmaker/views/external_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,15 @@ def mme_match_proxy(request, originating_node_name):
patient_data=query_patient_data, origin_request_host=originating_node_name,
)

try:
_generate_notification_for_incoming_match(results, incoming_query, originating_node_name, query_patient_data)
except Exception as e:
logger.error('Unable to create notification for incoming MME match request for {} matches{}: {}'.format(
len(results),
' ({})'.format(', '.join(sorted([result.get('patient', {}).get('id') for result in results]))) if results else '',
str(e)),
)
_safe_generate_notification_for_incoming_match(results, incoming_query, originating_node_name, query_patient_data)

return create_json_response({
'results': sorted(results, key=lambda result: result['score']['patient'], reverse=True),
'_disclaimer': MME_DISCLAIMER,
})


def _generate_notification_for_incoming_match(results, incoming_query, incoming_request_node, incoming_patient):
def _safe_generate_notification_for_incoming_match(results, incoming_query, incoming_request_node, incoming_patient):
"""
Generate a SLACK notifcation to say that a VALID match request came in and the following
results were sent back. If Slack is not supported, a message is not sent, but details persisted.
Expand Down Expand Up @@ -151,7 +144,7 @@ def _generate_notification_for_incoming_match(results, incoming_query, incoming_
emails = [email.strip() for email in submission.contact_href.replace('mailto:', '').split(',')]
send_emails = emails if len(emails) < 2 else [email for email in emails if email!= MME_DEFAULT_CONTACT_EMAIL]
all_emails.update(send_emails)
match_results.append((result_text, send_emails))
match_results.append((result_text, send_emails, submission.submission_id))
match_results = sorted(match_results, key=lambda result_tuple: result_tuple[0])

base_message = """Dear collaborators,
Expand Down Expand Up @@ -180,12 +173,11 @@ def _generate_notification_for_incoming_match(results, incoming_query, incoming_
We sent this email alert to: {email_addresses_alert_sent_to}\n{footer}."""

safe_post_to_slack(MME_SLACK_MATCH_NOTIFICATION_CHANNEL, message_template.format(
base_message=base_message, match_results='\n'.join([text for text, _ in match_results]),
base_message=base_message, match_results='\n'.join([result[0] for result in match_results]),
email_addresses_alert_sent_to=', '.join(sorted(all_emails)), footer=MME_EMAIL_FOOTER
))

email_errors = []
for result_text, emails in match_results:
for result_text, emails, submission_id in match_results:
try:
email_message = EmailMessage(
subject='Received new MME match',
Expand All @@ -198,9 +190,7 @@ def _generate_notification_for_incoming_match(results, incoming_query, incoming_
)
email_message.send()
except Exception as e:
email_errors.append(str(e))
if email_errors:
raise ValueError('\n'.join(email_errors))
logger.error(f'Unable to send notification email for incoming MME match with {submission_id}: {e}')


MME_EMAIL_FOOTER = """
Expand Down
17 changes: 9 additions & 8 deletions matchmaker/views/external_api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ def test_mme_match_proxy(self, mock_post_to_slack, mock_email, mock_logger):
mock.call().send(),
])

mock_logger.error.assert_called_with(
'Unable to create notification for incoming MME match request for 3 matches (NA19675_1_01, P0004515, P0004517): Email error')
mock_logger.error.assert_called_once_with(
'Unable to send notification email for incoming MME match with NA19675_1_01: Email error')

# Test receive same request again
mock_post_to_slack.reset_mock()
Expand Down Expand Up @@ -399,9 +399,9 @@ def test_mme_match_proxy_phenotype_only(self, mock_post_to_slack, mock_email):
mock.call().send(),
])

@mock.patch('matchmaker.views.external_api.logger')
@mock.patch('seqr.utils.communication_utils.logger')
@mock.patch('matchmaker.views.external_api.EmailMessage')
@mock.patch('matchmaker.views.external_api.safe_post_to_slack')
@mock.patch('seqr.utils.communication_utils._post_to_slack')
def test_mme_match_proxy_no_results(self, mock_post_to_slack, mock_email, mock_logger):
url = '/api/matchmaker/v1/match'
request_body = {
Expand All @@ -424,11 +424,12 @@ def test_mme_match_proxy_no_results(self, mock_post_to_slack, mock_email, mock_l
self.assertEqual(incoming_query_q.count(), 1)
self.assertIsNone(incoming_query_q.first().patient_id)

mock_post_to_slack.assert_called_with(
'matchmaker_matches',
"""A match request for 12345 came in from Test Institute today.
slack_message = """A match request for 12345 came in from Test Institute today.
The contact information given was: [email protected].
We didn't find any individuals in matchbox that matched that query well, *so no results were sent back*."""
mock_post_to_slack.assert_called_with(
'matchmaker_matches',
slack_message
)
mock_email.assert_not_called()

Expand All @@ -440,6 +441,6 @@ def test_mme_match_proxy_no_results(self, mock_post_to_slack, mock_email, mock_l
self.assertListEqual(response.json()['results'], [])
self.assertEqual(MatchmakerIncomingQuery.objects.filter(institution='Test Institute').count(), 2)
mock_logger.error.assert_called_with(
'Unable to create notification for incoming MME match request for 0 matches: Slack connection error')
f'Slack error: Slack connection error: Original message in channel (matchmaker_matches) - {slack_message}')


4 changes: 2 additions & 2 deletions matchmaker/views/matchmaker_api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def test_search_individual_mme_matches(self, mock_email, mock_logger, mock_slack
'start': 248367227,
'referenceBases': 'TC',
'alternateBases': 'T',
'assembly': 'GRCh37',
'assembly': 'GRCh38',
},
'zygosity': 1,
}],
Expand All @@ -332,7 +332,7 @@ def test_search_individual_mme_matches(self, mock_email, mock_logger, mock_slack
'ref': 'TC',
'alt': 'T',
'end': None,
'genomeVersion': 'GRCh37',
'genomeVersion': 'GRCh38',
}
}
],
Expand Down
Loading

0 comments on commit 059bce6

Please sign in to comment.