Skip to content

Commit

Permalink
Merge pull request #3950 from broadinstitute/reload-saved-variant-new…
Browse files Browse the repository at this point in the history
…-samples

Reload saved variants - shared annotations only
  • Loading branch information
hanars authored Mar 11, 2024
2 parents 33d2d4c + 464cb3b commit 7f29aca
Show file tree
Hide file tree
Showing 17 changed files with 257 additions and 88 deletions.
19 changes: 14 additions & 5 deletions hail_search/queries/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1061,12 +1061,23 @@ def gene_counts(self):
ht.gene_ids, hl.struct(total=hl.agg.count(), families=hl.agg.counter(ht.families))
))

def lookup_variant(self, variant_id, sample_data=None):
self._parse_intervals(intervals=None, variant_ids=[variant_id], variant_keys=[variant_id])
def lookup_variants(self, variant_ids, annotation_fields=None):
self._parse_intervals(intervals=None, variant_ids=variant_ids, variant_keys=variant_ids)
ht = self._read_table('annotations.ht', drop_globals=['paths', 'versions'])
self._load_table_kwargs['_n_partitions'] = 1
ht = ht.filter(hl.is_defined(ht[XPOS]))

if not annotation_fields:
annotation_fields = {
k: v for k, v in self.annotation_fields(include_genotype_overrides=False).items()
if k not in {FAMILY_GUID_FIELD, GENOTYPES_FIELD, 'genotypeFilters'}
}

formatted = self._format_results(ht.key_by(), annotation_fields=annotation_fields, include_genotype_overrides=False)

return formatted.aggregate(hl.agg.take(formatted.row, len(variant_ids)))

def lookup_variant(self, variant_id, sample_data=None):
annotation_fields = self.annotation_fields(include_genotype_overrides=False)
entry_annotations = {k: annotation_fields[k] for k in [FAMILY_GUID_FIELD, GENOTYPES_FIELD]}
annotation_fields.update({
Expand All @@ -1075,9 +1086,7 @@ def lookup_variant(self, variant_id, sample_data=None):
'genotypeFilters': lambda ht: hl.str(''),
})

formatted = self._format_results(ht.key_by(), annotation_fields=annotation_fields, include_genotype_overrides=False)

variants = formatted.aggregate(hl.agg.take(formatted.row, 1))
variants = self.lookup_variants([variant_id], annotation_fields=annotation_fields)
if not variants:
raise HTTPNotFound()
variant = dict(variants[0])
Expand Down
5 changes: 5 additions & 0 deletions hail_search/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ def lookup_variant(request):
return query.lookup_variant(request['variant_id'], sample_data=request.get('sample_data'))


def lookup_variants(request):
query = QUERY_CLASS_MAP[(request['data_type'], request['genome_version'])](sample_data=None)
return query.lookup_variants(request['variant_ids'])


def load_globals():
for cls in QUERY_CLASS_MAP.values():
cls.load_globals()
30 changes: 28 additions & 2 deletions hail_search/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@
'numAlt': 1, 'dp': 28, 'gq': 99, 'ab': 0.5,
}

NO_GENOTYPE_GCNV_VARIANT = {**GCNV_VARIANT4, 'numExon': 8, 'end': 38736268}

# Ensures no variants are filtered out by annotation/path filters for compound hets
COMP_HET_ALL_PASS_FILTERS = {
'annotations': {'splice_ai': '0.0', 'structural': ['DEL', 'CPX', 'INS', 'gCNV_DEL', 'gCNV_DUP']},
Expand Down Expand Up @@ -694,19 +696,43 @@ async def test_variant_lookup(self):
self.assertEqual(resp.status, 200)
resp_json = await resp.json()
self.assertDictEqual(resp_json, {
**GCNV_VARIANT4, 'numExon': 8, 'end': 38736268, 'familyGuids': [], 'genotypes': {}, 'genotypeFilters': '',
**NO_GENOTYPE_GCNV_VARIANT, 'familyGuids': [], 'genotypes': {}, 'genotypeFilters': '',
})

async with self.client.request('POST', '/lookup', json={**body, 'sample_data': EXPECTED_SAMPLE_DATA['SV_WES']}) as resp:
self.assertEqual(resp.status, 200)
resp_json = await resp.json()
self.assertDictEqual(resp_json, {
**GCNV_VARIANT4, 'numExon': 8, 'end': 38736268, 'genotypes': {
**NO_GENOTYPE_GCNV_VARIANT, 'genotypes': {
individual: {k: v for k, v in genotype.items() if k not in {'start', 'end', 'numExon', 'geneIds'}}
for individual, genotype in GCNV_VARIANT4['genotypes'].items()
}
})

async def test_multi_variant_lookup(self):
await self._test_multi_lookup(VARIANT_ID_SEARCH['variant_ids'], 'SNV_INDEL', [VARIANT1])

await self._test_multi_lookup([['7', 143270172, 'A', 'G']], 'SNV_INDEL', [GRCH37_VARIANT], genome_version='GRCh37')

await self._test_multi_lookup([['M', 4429, 'G', 'A'], ['M', 14783, 'T', 'C']], 'MITO', [MITO_VARIANT1, MITO_VARIANT3])

await self._test_multi_lookup(
['cohort_2911.chr1.final_cleanup_INS_chr1_160', 'phase2_DEL_chr14_4640'],
'SV_WGS', [SV_VARIANT2, SV_VARIANT4],
)

await self._test_multi_lookup(['suffix_140608_DUP'], 'SV_WES', [NO_GENOTYPE_GCNV_VARIANT])

async def _test_multi_lookup(self, variant_ids, data_type, results, genome_version='GRCh38'):
body = {'genome_version': genome_version, 'data_type': data_type, 'variant_ids': variant_ids}
async with self.client.request('POST', '/multi_lookup', json=body) as resp:
self.assertEqual(resp.status, 200)
resp_json = await resp.json()
self.assertDictEqual(resp_json, {'results': [
{k: v for k, v in variant.items() if k not in {'familyGuids', 'genotypes', 'genotypeFilters'}}
for variant in results
]})

async def test_frequency_filter(self):
sv_callset_filter = {'sv_callset': {'af': 0.05}}
await self._assert_expected_search(
Expand Down
8 changes: 7 additions & 1 deletion hail_search/web_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import traceback
from typing import Callable

from hail_search.search import search_hail_backend, load_globals, lookup_variant
from hail_search.search import search_hail_backend, load_globals, lookup_variant, lookup_variants

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -64,6 +64,11 @@ async def lookup(request: web.Request) -> web.Response:
return web.json_response(result, dumps=hl_json_dumps)


async def multi_lookup(request: web.Request) -> web.Response:
result = await sync_to_async_hail_query(request, lookup_variants)
return web.json_response({'results': result}, dumps=hl_json_dumps)


async def status(request: web.Request) -> web.Response:
return web.json_response({'success': True})

Expand All @@ -84,6 +89,7 @@ async def init_web_app():
web.post('/search', search),
web.post('/gene_counts', gene_counts),
web.post('/lookup', lookup),
web.post('/multi_lookup', multi_lookup),
])
# The idea here is to run the hail queries off the main thread so that the
# event loop stays live and the /status check is responsive. We only
Expand Down
4 changes: 2 additions & 2 deletions matchmaker/views/matchmaker_api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def test_search_individual_mme_matches(self, mock_email, mock_logger, mock_slack
'start': 248367227,
'referenceBases': 'TC',
'alternateBases': 'T',
'assembly': 'GRCh37',
'assembly': 'GRCh38',
},
'zygosity': 1,
}],
Expand All @@ -332,7 +332,7 @@ def test_search_individual_mme_matches(self, mock_email, mock_logger, mock_slack
'ref': 'TC',
'alt': 'T',
'end': None,
'genomeVersion': 'GRCh37',
'genomeVersion': 'GRCh38',
}
}
],
Expand Down
5 changes: 2 additions & 3 deletions seqr/fixtures/1kg_project.json
Original file line number Diff line number Diff line change
Expand Up @@ -1884,7 +1884,6 @@
"genotypeFilters": "pass",
"mainTranscriptId": "ENST00000505820",
"populations": {"callset": {"ac": null, "an": null, "af": null}, "g1k": {"ac": null, "an": null, "af": 0.0}, "gnomad_genomes": {"hemi": null, "ac": null, "an": null, "hom": null, "af": 0.0}, "gnomad_exomes": {"hemi": null, "ac": null, "an": null, "hom": null, "af": 8.142526788913136e-06}, "exac": {"hemi": null, "ac": null, "an": null, "hom": null, "af": 0.0}, "topmed": {"ac": null, "an": null, "af": null}},
"genomeVersion": "37",
"pos": 1560662,
"predictions": {"eigen": null, "revel": null, "sift": "damaging", "cadd": "31", "metasvm": "D", "mpc": null, "splice_ai": null, "phastcons_100_vert": null, "mut_taster": "disease_causing", "fathmm": "tolerated", "polyphen": "probably_damaging", "dann": null, "primate_ai": null, "gerp_rs": null},
"hgmd": {"accession": null, "class": null},
Expand Down Expand Up @@ -1970,7 +1969,7 @@
"hom": 10332
}
},
"genomeVersion": "37",
"genomeVersion": "38",
"pos": 1562437,
"hgmd": {"accession": null, "class": null},
"rsid": null,
Expand Down Expand Up @@ -2085,7 +2084,7 @@
"alt": "T",
"variant_id": "12-48367227-TC-T",
"saved_variant_json": {
"liftedOverGenomeVersion": "38", "liftedOverPos": "", "genomeVersion": "37", "pos": 248367227,
"liftedOverGenomeVersion": "37", "liftedOverPos": "", "genomeVersion": "38", "pos": 248367227,
"transcripts": {}, "chrom": "1", "genotypes": {
"I000018_na21234": {"sampleId": "NA20885", "ab": 0.0, "gq": 99.0, "numAlt": 1}}},
"family": 14
Expand Down
85 changes: 74 additions & 11 deletions seqr/management/commands/check_for_new_samples_from_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
from collections import defaultdict
from django.contrib.postgres.aggregates import ArrayAgg
from django.core.management.base import BaseCommand, CommandError
from django.db.models import Q
from django.db.models.functions import JSONObject
import json
import logging

from reference_data.models import GENOME_VERSION_LOOKUP
from seqr.models import Family, Sample, Project
from seqr.models import Family, Sample, SavedVariant
from seqr.utils.file_utils import file_iter, does_file_exist
from seqr.utils.search.add_data_utils import notify_search_data_loaded
from seqr.utils.search.utils import parse_valid_variant_id
from seqr.utils.search.hail_search_utils import hail_variant_multi_lookup
from seqr.views.utils.dataset_utils import match_and_update_search_samples
from seqr.views.utils.variant_utils import reset_cached_search_results, update_projects_saved_variant_json
from seqr.views.utils.variant_utils import reset_cached_search_results, update_projects_saved_variant_json, \
saved_variants_dataset_type_filter

logger = logging.getLogger(__name__)

GS_PATH_TEMPLATE = 'gs://seqr-hail-search-data/v03/{path}/runs/{version}/'
DATASET_TYPE_MAP = {'GCNV': Sample.DATASET_TYPE_SV_CALLS}
USER_EMAIL = 'manage_command'


class Command(BaseCommand):
Expand Down Expand Up @@ -83,20 +89,77 @@ def handle(self, *args, **options):
reset_cached_search_results(project=None)

# Send loading notifications
update_sample_data_by_project = {
s['individual__family__project']: s for s in updated_samples.values('individual__family__project').annotate(
samples=ArrayAgg(JSONObject(sample_id='sample_id', individual_id='individual_id')),
family_guids=ArrayAgg('individual__family__guid', distinct=True),
)
}
updated_project_families = []
updated_families = set()
for project, sample_ids in samples_by_project.items():
project_updated_samples = updated_samples.filter(individual__family__project=project)
project_sample_data = update_sample_data_by_project[project.id]
notify_search_data_loaded(
project, dataset_type, sample_type, inactivated_sample_guids,
updated_samples=project_updated_samples, num_samples=len(sample_ids),
updated_samples=project_sample_data['samples'], num_samples=len(sample_ids),
)
project_families = project_sample_data['family_guids']
updated_families.update(project_families)
updated_project_families.append((project.id, project.name, project_families))

# Reload saved variant JSON
updated_annotation_samples = Sample.objects.filter(is_active=True, dataset_type=dataset_type)
if dataset_type == Sample.DATASET_TYPE_SV_CALLS:
updated_annotation_samples = updated_annotation_samples.filter(sample_type=sample_type)
projects = Project.objects.filter(
genome_version=genome_version.replace('GRCh', ''), family__individual__sample__in=updated_annotation_samples,
).annotate(families=ArrayAgg('family__family_id', distinct=True)).values_list('id', 'name', 'families')
update_projects_saved_variant_json(projects, user_email='manage_command', dataset_type=dataset_type)
updated_variants_by_id = update_projects_saved_variant_json(
updated_project_families, user_email=USER_EMAIL, dataset_type=dataset_type)
self._reload_shared_variant_annotations(
updated_variants_by_id, updated_families, dataset_type, sample_type, genome_version)

logger.info('DONE')

@staticmethod
def _reload_shared_variant_annotations(updated_variants_by_id, updated_families, dataset_type, sample_type, genome_version):
data_type = dataset_type
is_sv = dataset_type == Sample.DATASET_TYPE_SV_CALLS
db_genome_version = genome_version.replace('GRCh', '')
updated_annotation_samples = Sample.objects.filter(
is_active=True, dataset_type=dataset_type,
individual__family__project__genome_version=db_genome_version,
).exclude(individual__family__guid__in=updated_families)
if is_sv:
updated_annotation_samples = updated_annotation_samples.filter(sample_type=sample_type)
data_type = f'{dataset_type}_{sample_type}'

variant_models = SavedVariant.objects.filter(
family_id__in=updated_annotation_samples.values_list('individual__family', flat=True).distinct(),
**saved_variants_dataset_type_filter(dataset_type),
).filter(Q(saved_variant_json__genomeVersion__isnull=True) | Q(saved_variant_json__genomeVersion=db_genome_version))

if not variant_models:
logger.info('No additional saved variants to update')
return

variants_by_id = defaultdict(list)
for v in variant_models:
variants_by_id[v.variant_id].append(v)

logger.info(f'Reloading shared annotations for {len(variant_models)} saved variants ({len(variants_by_id)} unique)')

updated_variants_by_id = {
variant_id: {k: v for k, v in variant.items() if k not in {'familyGuids', 'genotypes', 'genotypeFilters'}}
for variant_id, variant in updated_variants_by_id.items()
}
fetch_variant_ids = set(variants_by_id.keys()) - set(updated_variants_by_id.keys())
if fetch_variant_ids:
if not is_sv:
fetch_variant_ids = [parse_valid_variant_id(variant_id) for variant_id in fetch_variant_ids]
updated_variants = hail_variant_multi_lookup(USER_EMAIL, sorted(fetch_variant_ids), data_type, genome_version)
logger.info(f'Fetched {len(updated_variants)} additional variants')
updated_variants_by_id.update({variant['variantId']: variant for variant in updated_variants})

updated_variant_models = []
for variant_id, variant in updated_variants_by_id.items():
for variant_model in variants_by_id[variant_id]:
variant_model.saved_variant_json.update(variant)
updated_variant_models.append(variant_model)

SavedVariant.objects.bulk_update(updated_variant_models, ['saved_variant_json'])
logger.info(f'Updated {len(updated_variant_models)} saved variants')
2 changes: 1 addition & 1 deletion seqr/management/commands/lift_project_to_hg38.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def handle(self, *args, **options):
len(es_variants), len(missing_variants) + len(lift_failed), missing_family_count))

def _get_variants_to_lift(saved_variants):
saved_variants_to_lift = [v for v in saved_variants if v['genomeVersion'] != GENOME_VERSION_GRCh38]
saved_variants_to_lift = [v for v in saved_variants if v.get('genomeVersion') != GENOME_VERSION_GRCh38]
num_already_lifted = len(saved_variants) - len(saved_variants_to_lift)
if num_already_lifted:
if input('Found {} saved variants already on Hg38. Continue with liftover (y/n)? '.format(
Expand Down
6 changes: 3 additions & 3 deletions seqr/management/commands/reload_saved_variant_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ class Command(BaseCommand):

def add_arguments(self, parser):
parser.add_argument('projects', nargs="*", help='Project(s) to transfer. If not specified, defaults to all projects.')
parser.add_argument('--family-id', help='optional family to reload variants for')
parser.add_argument('--family-guid', help='optional family to reload variants for')

def handle(self, *args, **options):
"""transfer project"""
projects_to_process = options['projects']
family_id = options['family_id']
family_guid = options['family_guid']

if projects_to_process:
projects = Project.objects.filter(Q(name__in=projects_to_process) | Q(guid__in=projects_to_process))
Expand All @@ -27,7 +27,7 @@ def handle(self, *args, **options):
projects = Project.objects.all()
logging.info("Processing all %s projects" % len(projects))

family_ids = [family_id] if family_id else None
family_ids = [family_guid] if family_guid else None
project_list = [(*project, family_ids) for project in projects.values_list('id', 'name')]
update_projects_saved_variant_json(project_list, user_email='manage_command')
logger.info("Done")
Loading

0 comments on commit 7f29aca

Please sign in to comment.