Skip to content

Commit

Permalink
Merge pull request #3986 from broadinstitute/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
hanars authored Mar 20, 2024
2 parents e2690b4 + 97b25c6 commit 3c4c954
Show file tree
Hide file tree
Showing 16 changed files with 207 additions and 186 deletions.
8 changes: 7 additions & 1 deletion hail_search/web_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ def hl_json_dumps(obj):
return json.dumps(obj, default=_hl_json_default)

async def sync_to_async_hail_query(request: web.Request, query: Callable, *args, timeout_s=QUERY_TIMEOUT_S, **kwargs):
request_body = None
if request.body_exists:
request_body = await request.json()

loop = asyncio.get_running_loop()
future = loop.run_in_executor(request.app.pool, functools.partial(query, await request.json(), *args, **kwargs))
future = loop.run_in_executor(request.app.pool, functools.partial(query, request_body, *args, **kwargs))
try:
return await asyncio.wait_for(future, timeout_s)
except asyncio.TimeoutError:
Expand Down Expand Up @@ -94,6 +98,8 @@ async def multi_lookup(request: web.Request) -> web.Response:


async def status(request: web.Request) -> web.Response:
# Make sure the hail backend process is still alive.
await sync_to_async_hail_query(request, lambda _: hl.eval(1 + 1))
return web.json_response({'success': True})


Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ click==8.1.3
# via pip-tools
coverage==5.1
# via -r requirements-dev.in
django==3.2.24
django==3.2.25
# via
# -c requirements.txt
# django-appconf
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defusedxml==0.7.1
# via
# python3-openid
# social-auth-core
django==3.2.24
django==3.2.25
# via
# -r requirements.in
# django-anymail
Expand Down
40 changes: 30 additions & 10 deletions seqr/management/commands/load_rna_seq.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
from collections import defaultdict
from django.core.management.base import BaseCommand

from seqr.models import Sample
from seqr.views.utils.file_utils import parse_file
from seqr.views.utils.dataset_utils import load_rna_seq, RNA_DATA_TYPE_CONFIGS
from seqr.views.utils.dataset_utils import load_rna_seq, post_process_rna_data, RNA_DATA_TYPE_CONFIGS

logger = logging.getLogger(__name__)

Expand All @@ -24,18 +25,37 @@ def handle(self, *args, **options):
mapping_file = parse_file(options['mapping_file'], f)

data_type = options['data_type']
self.model_cls = RNA_DATA_TYPE_CONFIGS[data_type]['model_class']
model_cls = RNA_DATA_TYPE_CONFIGS[data_type]['model_class']

sample_guids, _, _ = load_rna_seq(
data_type, options['input_file'], self._save_sample_data, lambda *args: {}, create_models_before_save=True,
sample_data_by_guid = defaultdict(list)

def _save_sample_data(sample_guid, row):
sample_data_by_guid[sample_guid].append(row)

possible_sample_guids, _, _ = load_rna_seq(
data_type, options['input_file'], _save_sample_data,
mapping_file=mapping_file, ignore_extra_samples=options['ignore_extra_samples'])

sample_models_by_guid = {
s['guid']: s for s in Sample.objects.filter(guid__in=sample_data_by_guid).values('guid', 'id', 'sample_id')
}
errors = []
sample_guids = []
for sample_guid in possible_sample_guids:
data_rows, error = post_process_rna_data(sample_guid, sample_data_by_guid[sample_guid])
if error:
errors.append(error)
continue

sample_guids.append(sample_guid)
sample_model = sample_models_by_guid[sample_guid]
models = model_cls.objects.bulk_create(
[model_cls(sample_id=sample_model['id'], **data) for data in data_rows], batch_size=1000)
logger.info(f'create {len(models)} {model_cls.__name__} for {sample_model["sample_id"]}')

Sample.bulk_update(user=None, update_json={'is_active': True}, guid__in=sample_guids)

logger.info('DONE')
for error in errors:
logger.info(error)

def _save_sample_data(self, sample_guid, data_by_gene):
sample = Sample.objects.get(guid=sample_guid)
models = self.model_cls.objects.bulk_create(
[self.model_cls(sample=sample, **data) for data in data_by_gene.values()], batch_size=1000)
logger.info(f'create {len(models)} {self.model_cls.__name__} for {sample.sample_id}')
logger.info('DONE')
10 changes: 4 additions & 6 deletions seqr/management/tests/load_rna_seq_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,10 @@ def test_tpm(self, mock_utils_logger):
'NA19675_D2\t1kg project nåme with uniçøde\t\tENSG00000240361\t12.6\t\n',
'NA19675_D2\t1kg project nåme with uniçøde\t\tENSG00000233750\t1.26\t\n',
'NA19678_D1\t1kg project nåme with uniçøde\t\tENSG00000233750\t 6.04\twhole_blood\n',
'GTEX-001\t1kg project nåme with uniçøde\t\tENSG00000240361\t3.1\tinvalid\n',
'NA19677\t1kg project nåme with uniçøde\t\tENSG00000233750\t5.31\tmuscle\n',
'GTEX-001\t1kg project nåme with uniçøde\t\tENSG00000233750\t7.8\tmuscle\n',
'NA19678\tTest Reprocessed Project\t\tENSG00000240361\t0.2\twhole_blood\n',
],
unmatched_samples='NA19677, NA19678, NA19678_D1',
unmatched_samples='NA19677 (1kg project nåme with uniçøde), NA19678 (Test Reprocessed Project), NA19678_D1 (1kg project nåme with uniçøde)',
additional_errors=['Samples missing required "tissue": NA19675_D2'],
)

Expand Down Expand Up @@ -108,7 +106,7 @@ def test_tpm(self, mock_utils_logger):
mock.call('DONE'),
])
mock_utils_logger.warning.assert_has_calls([
mock.call('Skipped loading for the following 2 unmatched samples: NA19677, NA19678', None),
mock.call('Skipped loading for the following 2 unmatched samples: NA19677 (1kg project nåme with uniçøde), NA19678 (Test Reprocessed Project)', None),
])

# Test a new sample created for a mismatched tissue and a row with 0.0 tpm
Expand Down Expand Up @@ -136,13 +134,13 @@ def test_outlier(self):
'NA19675_D3\t1kg project nåme with uniçøde\tENSG00000233750\tdetail1\t0.064\t0.0000057\t7.8\tmuscle\n',
'NA19675_D4\t1kg project nåme with uniçøde\tENSG00000233750\tdetail1\t0.064\t0.0000057\t7.8\tmuscle\n',
],
unmatched_samples='NA19675_D3, NA19675_D4',
unmatched_samples='NA19675_D3 (1kg project nåme with uniçøde), NA19675_D4 (1kg project nåme with uniçøde)',
)

self.mock_open.return_value.__enter__.return_value.__iter__.return_value = ['NA19675_D4\tNA19678']
with self.assertRaises(ErrorsWarningsException) as e:
call_command('load_rna_seq', 'outlier', RNA_FILE_ID, '--mapping-file', 'map.tsv')
self.assertEqual(e.exception.errors, ['Unable to find matches for the following samples: NA19675_D3'])
self.assertEqual(e.exception.errors, ['Unable to find matches for the following samples: NA19675_D3 (1kg project nåme with uniçøde)'])

call_command('load_rna_seq', 'outlier', RNA_FILE_ID, '--ignore-extra-samples')

Expand Down
27 changes: 18 additions & 9 deletions seqr/views/apis/data_manager_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from seqr.utils.vcf_utils import validate_vcf_exists

from seqr.views.utils.airflow_utils import trigger_data_loading, write_data_loading_pedigree
from seqr.views.utils.dataset_utils import load_rna_seq, load_phenotype_prioritization_data_file, RNA_DATA_TYPE_CONFIGS
from seqr.views.utils.dataset_utils import load_rna_seq, load_phenotype_prioritization_data_file, RNA_DATA_TYPE_CONFIGS, \
post_process_rna_data
from seqr.views.utils.file_utils import parse_file, get_temp_upload_directory, load_uploaded_file
from seqr.views.utils.json_utils import create_json_response
from seqr.views.utils.json_to_orm_utils import update_model_from_json
Expand Down Expand Up @@ -272,14 +273,17 @@ def update_rna_seq(request):

file_name_prefix = f'rna_sample_data__{data_type}__{datetime.now().isoformat()}'

sample_files = {}

def _save_sample_data(sample_guid, sample_data):
file_name = os.path.join(get_temp_upload_directory(), _get_sample_file_name(file_name_prefix, sample_guid))
with gzip.open(file_name, 'wt') as f:
json.dump(sample_data, f)
if sample_guid not in sample_files:
file_name = os.path.join(get_temp_upload_directory(), _get_sample_file_name(file_name_prefix, sample_guid))
sample_files[sample_guid] = gzip.open(file_name, 'at')
sample_files[sample_guid].write(f'{json.dumps(sample_data)}\n')

try:
sample_guids, info, warnings = load_rna_seq(
data_type, file_path, _save_sample_data, lambda sample_guid: _load_saved_sample_data(file_name_prefix, sample_guid),
data_type, file_path, _save_sample_data,
user=request.user, mapping_file=mapping_file, ignore_extra_samples=request_json.get('ignoreExtraSamples'))
except ValueError as e:
return create_json_response({'error': str(e)}, status=400)
Expand All @@ -300,7 +304,7 @@ def _load_saved_sample_data(file_name_prefix, sample_guid):
file_name = os.path.join(get_temp_upload_directory(), _get_sample_file_name(file_name_prefix, sample_guid))
if os.path.exists(file_name):
with gzip.open(file_name, 'rt') as f:
return json.load(f)
return [json.loads(line) for line in f.readlines()]
return None


Expand All @@ -312,10 +316,15 @@ def load_rna_seq_sample_data(request, sample_guid):
request_json = json.loads(request.body)
file_name = request_json['fileName']
data_type = request_json['dataType']
data_by_gene = _load_saved_sample_data(file_name, sample_guid)
config = RNA_DATA_TYPE_CONFIGS[data_type]

data_rows = _load_saved_sample_data(file_name, sample_guid)
data_rows, error = post_process_rna_data(sample_guid, data_rows, **config.get('post_process_kwargs', {}))
if error:
return create_json_response({'error': error}, status=400)

model_cls = RNA_DATA_TYPE_CONFIGS[data_type]['model_class']
model_cls.bulk_create(request.user, [model_cls(sample=sample, **data) for data in data_by_gene.values()])
model_cls = config['model_class']
model_cls.bulk_create(request.user, [model_cls(sample=sample, **data) for data in data_rows])
update_model_from_json(sample, {'is_active': True}, user=request.user)

return create_json_response({'success': True})
Expand Down
Loading

0 comments on commit 3c4c954

Please sign in to comment.