diff --git a/hail_search/queries/base.py b/hail_search/queries/base.py index bef76459b4..434ee49241 100644 --- a/hail_search/queries/base.py +++ b/hail_search/queries/base.py @@ -637,10 +637,7 @@ def _parse_intervals(self, intervals, gene_ids=None, **kwargs): intervals = [[f'chr{interval[0]}', *interval[1:]] for interval in (intervals or [])] if len(intervals) > MAX_GENE_INTERVALS and len(intervals) == len(gene_ids or []): - super_intervals = defaultdict(lambda: (1e9, 0)) - for chrom, start, end in intervals: - super_intervals[chrom] = (min(super_intervals[chrom][0], start), max(super_intervals[chrom][1], end)) - intervals = [(chrom, start, end) for chrom, (start, end) in super_intervals.items()] + intervals = self.cluster_intervals(sorted(intervals)) parsed_intervals = [ hl.eval(hl.locus_interval(*interval, reference_genome=self.GENOME_VERSION, invalid_missing=True)) @@ -659,6 +656,21 @@ def _parse_intervals(self, intervals, gene_ids=None, **kwargs): return parsed_intervals + @classmethod + def cluster_intervals(cls, intervals, distance=100000, max_intervals=MAX_GENE_INTERVALS): + if len(intervals) <= max_intervals: + return intervals + + merged_intervals = [intervals[0]] + for chrom, start, end in intervals[1:]: + prev_chrom, prev_start, prev_end = merged_intervals[-1] + if chrom == prev_chrom and start - prev_end < distance: + merged_intervals[-1] = [chrom, prev_start, max(prev_end, end)] + else: + merged_intervals.append([chrom, start, end]) + + return cls.cluster_intervals(merged_intervals, distance=distance+100000, max_intervals=max_intervals) + def _should_add_chr_prefix(self): return self.GENOME_VERSION == GENOME_VERSION_GRCh38 diff --git a/hail_search/test_search.py b/hail_search/test_search.py index cabff967a9..8d8c720605 100644 --- a/hail_search/test_search.py +++ b/hail_search/test_search.py @@ -12,6 +12,7 @@ FAMILY_2_MITO_SAMPLE_DATA, FAMILY_2_ALL_SAMPLE_DATA, MITO_VARIANT1, MITO_VARIANT2, MITO_VARIANT3, \ EXPECTED_SAMPLE_DATA_WITH_SEX, SV_WGS_SAMPLE_DATA_WITH_SEX, VARIANT_LOOKUP_VARIANT from hail_search.web_app import init_web_app, sync_to_async_hail_query +from hail_search.queries.base import BaseHailTableQuery PROJECT_2_VARIANT = { 'variantId': '1-10146-ACC-A', @@ -581,6 +582,26 @@ async def test_location_search(self): gene_ids=['ENSG00000171621'], ) + async def test_cluster_intervals(self): + intervals = [ + ['1', 11785723, 11806455], ['1', 91500851, 91525764], ['2', 1234, 5678], ['2', 12345, 67890], + ['7', 1, 11100], ['7', 202020, 20202020], + ] + + self.assertListEqual(BaseHailTableQuery.cluster_intervals(intervals, max_intervals=5), [ + ['1', 11785723, 11806455], ['1', 91500851, 91525764], ['2', 1234, 67890], + ['7', 1, 11100], ['7', 202020, 20202020], + ]) + + self.assertListEqual(BaseHailTableQuery.cluster_intervals(intervals, max_intervals=4), [ + ['1', 11785723, 11806455], ['1', 91500851, 91525764], ['2', 1234, 67890], ['7', 1, 20202020], + ]) + + self.assertListEqual(BaseHailTableQuery.cluster_intervals(intervals, max_intervals=3), [ + ['1', 11785723, 91525764], ['2', 1234, 67890], ['7', 1, 20202020], + ]) + + async def test_variant_id_search(self): await self._assert_expected_search([VARIANT2], omit_data_type='SV_WES', **RSID_SEARCH) diff --git a/requirements.txt b/requirements.txt index a3175ce89a..191441bf4e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -182,3 +182,4 @@ urllib3==1.26.19 # requests whitenoise==6.3.0 # via -r requirements.in +zipp>=3.19.1 # not directly required, pinned by Snyk to avoid a vulnerability diff --git a/seqr/management/commands/check_for_new_samples_from_pipeline.py b/seqr/management/commands/check_for_new_samples_from_pipeline.py index 5666046029..ffa517cba3 100644 --- a/seqr/management/commands/check_for_new_samples_from_pipeline.py +++ b/seqr/management/commands/check_for_new_samples_from_pipeline.py @@ -13,10 +13,12 @@ from seqr.utils.search.add_data_utils import notify_search_data_loaded from seqr.utils.search.utils import parse_valid_variant_id from seqr.utils.search.hail_search_utils import hail_variant_multi_lookup, search_data_type +from seqr.views.utils.airtable_utils import AirtableSession, LOADABLE_PDO_STATUSES, AVAILABLE_PDO_STATUS from seqr.views.utils.dataset_utils import match_and_update_search_samples +from seqr.views.utils.permissions_utils import is_internal_anvil_project, project_has_anvil from seqr.views.utils.variant_utils import reset_cached_search_results, update_projects_saved_variant_json, \ get_saved_variants -from settings import SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL +from settings import SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, BASE_URL logger = logging.getLogger(__name__) @@ -25,6 +27,12 @@ USER_EMAIL = 'manage_command' MAX_LOOKUP_VARIANTS = 5000 +PDO_COPY_FIELDS = [ + 'PDO', 'PDOStatus', 'SeqrLoadingDate', 'GATKShortReadCallsetPath', 'SeqrProjectURL', 'TerraProjectURL', + 'SequencingProduct', 'PDOName', 'SequencingSubmissionDate', 'SequencingCompletionDate', 'CallsetRequestedDate', + 'CallsetCompletionDate', 'Project', 'Metrics Checked', 'gCNV_SV_CallsetPath', 'DRAGENShortReadCallsetPath', +] + class Command(BaseCommand): help = 'Check for newly loaded seqr samples' @@ -91,7 +99,7 @@ def handle(self, *args, **options): # Reset cached results for all projects, as seqr AFs will have changed for all projects when new data is added reset_cached_search_results(project=None) - # Send loading notifications + # Send loading notifications and update Airtable PDOs update_sample_data_by_project = { s['individual__family__project']: s for s in updated_samples.values('individual__family__project').annotate( samples=ArrayAgg(JSONObject(sample_id='sample_id', individual_id='individual_id')), @@ -100,15 +108,20 @@ def handle(self, *args, **options): } updated_project_families = [] updated_families = set() + split_project_pdos = {} + session = AirtableSession(user=None, no_auth=True) for project, sample_ids in samples_by_project.items(): project_sample_data = update_sample_data_by_project[project.id] + is_internal = not project_has_anvil(project) or is_internal_anvil_project(project) notify_search_data_loaded( - project, dataset_type, sample_type, inactivated_sample_guids, + project, is_internal, dataset_type, sample_type, inactivated_sample_guids, updated_samples=project_sample_data['samples'], num_samples=len(sample_ids), ) project_families = project_sample_data['family_guids'] updated_families.update(project_families) updated_project_families.append((project.id, project.name, project.genome_version, project_families)) + if is_internal and dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS: + split_project_pdos[project.name] = self._update_pdos(session, project.guid, sample_ids) # Send failure notifications failed_family_samples = metadata.get('failed_family_samples', {}) @@ -124,6 +137,9 @@ def handle(self, *args, **options): ) for project, failures in failures_by_project.items(): summary = '\n'.join(sorted(failures)) + split_pdos = split_project_pdos.get(project) + if split_pdos: + summary += f'\n\nSkipped samples in this project have been moved to {", ".join(split_pdos)}' safe_post_to_slack( SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, f'The following {len(failures)} families failed {check.replace("_", " ")} in {project}:\n{summary}' @@ -138,6 +154,52 @@ def handle(self, *args, **options): logger.info('DONE') + @staticmethod + def _update_pdos(session, project_guid, sample_ids): + airtable_samples = session.fetch_records( + 'Samples', fields=['CollaboratorSampleID', 'SeqrCollaboratorSampleID', 'PDOID'], + or_filters={'PDOStatus': LOADABLE_PDO_STATUSES}, + and_filters={'SeqrProject': f'{BASE_URL}project/{project_guid}/project_page'} + ) + + pdo_ids = set() + skipped_pdo_samples = defaultdict(list) + for record_id, sample in airtable_samples.items(): + pdo_id = sample['PDOID'][0] + sample_id = sample.get('SeqrCollaboratorSampleID') or sample['CollaboratorSampleID'] + if sample_id in sample_ids: + pdo_ids.add(pdo_id) + else: + skipped_pdo_samples[pdo_id].append(record_id) + + if pdo_ids: + session.safe_patch_records_by_id('PDO', pdo_ids, {'PDOStatus': AVAILABLE_PDO_STATUS}) + + skipped_pdo_samples = { + pdo_id: sample_record_ids for pdo_id, sample_record_ids in skipped_pdo_samples.items() if pdo_id in pdo_ids + } + if not skipped_pdo_samples: + return [] + + pdos_to_create = { + f"{pdo.pop('PDO')}_sr": (record_id, pdo) for record_id, pdo in session.fetch_records( + 'PDO', fields=PDO_COPY_FIELDS, or_filters={'RECORD_ID()': list(skipped_pdo_samples.keys())} + ).items() + } + + # Create PDOs and then update Samples with new PDOs + # Does not create PDOs with Samples directly as that would not remove Samples from old PDOs + new_pdos = session.safe_create_records('PDO', [ + {'PDO': pdo_name, **pdo} for pdo_name, (_, pdo) in pdos_to_create.items() + ]) + pdo_id_map = {pdos_to_create[record['fields']['PDO']][0]: record['id'] for record in new_pdos} + for pdo_id, sample_record_ids in skipped_pdo_samples.items(): + new_pdo_id = pdo_id_map.get(pdo_id) + if new_pdo_id: + session.safe_patch_records_by_id('Samples', sample_record_ids, {'PDOID': [new_pdo_id]}) + + return sorted(pdos_to_create.keys()) + @staticmethod def _reload_shared_variant_annotations(data_type, genome_version, updated_variants_by_id=None, exclude_families=None): dataset_type = data_type.split('_')[0] diff --git a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py index af2423c818..7a59b27460 100644 --- a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py +++ b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py @@ -47,6 +47,72 @@ f'Test Reprocessed Project' \ f'

All the best,
The seqr team' +PDO_QUERY_FIELDS = '&'.join([f'fields[]={field}' for field in [ + 'PDO', 'PDOStatus', 'SeqrLoadingDate', 'GATKShortReadCallsetPath', 'SeqrProjectURL', 'TerraProjectURL', + 'SequencingProduct', 'PDOName', 'SequencingSubmissionDate', 'SequencingCompletionDate', 'CallsetRequestedDate', + 'CallsetCompletionDate', 'Project', 'Metrics Checked', 'gCNV_SV_CallsetPath', 'DRAGENShortReadCallsetPath', +]]) +AIRTABLE_SAMPLE_RECORDS = { + 'records': [ + { + 'id': 'rec2B6OGmQpAkQW3s', + 'fields': { + 'CollaboratorSampleID': 'NA19675_1', + 'PDOID': ['recW24C2CJW5lT64K'], + }, + }, + { + 'id': 'recfMYDEZpPtzAIeV', + 'fields': { + 'CollaboratorSampleID': 'NA19678', + 'PDOID': ['recW24C2CJW5lT64K'], + }, + }, + { + 'id': 'rec2B67GmXpAkQW8z', + 'fields': { + 'CollaboratorSampleID': 'NA19679', + 'PDOID': ['rec2Nkg10N1KssPc3'], + }, + }, + { + 'id': 'rec2Nkg10N1KssPc3', + 'fields': { + 'SeqrCollaboratorSampleID': 'HG00731', + 'CollaboratorSampleID': 'VCGS_FAM203_621_D2', + 'PDOID': ['recW24C2CJW5lT64K'], + }, + }, + { + 'id': 'recrbZh9Hn1UFtMi2', + 'fields': { + 'SeqrCollaboratorSampleID': 'NA20888', + 'CollaboratorSampleID': 'NA20888_D1', + 'PDOID': ['recW24C2CJW5lT64K'], + }, + }, + { + 'id': 'rec2Nkg1fKssJc7', + 'fields': { + 'CollaboratorSampleID': 'NA20889', + 'PDOID': ['rec0RWBVfDVbtlBSL'], + }, + }, +]} +AIRTABLE_PDO_RECORDS = { + 'records': [ + { + 'id': 'recW24C2CJW5lT64K', + 'fields': { + 'PDO': 'PDO-1234', + 'SeqrProjectURL': 'https://test-seqr.org/project/R0003_test/project_page', + 'PDOStatus': 'Methods (Loading)', + 'PDOName': 'RGP_WGS_12', + } + }, + ] +} + @mock.patch('seqr.utils.search.hail_search_utils.HAIL_BACKEND_SERVICE_HOSTNAME', MOCK_HAIL_HOST) @mock.patch('seqr.models.random.randint', lambda *args: GUID_ID) @@ -102,9 +168,9 @@ def _test_success(self, path, metadata, dataset_type, sample_guids, reload_calls ]) # Test reload saved variants - self.assertEqual(len(responses.calls), len(reload_calls) + (3 if has_additional_requests else 0)) + self.assertEqual(len(responses.calls), len(reload_calls) + (9 if has_additional_requests else 0)) for i, call in enumerate(reload_calls): - resp = responses.calls[i+(1 if has_additional_requests else 0)] + resp = responses.calls[i+(7 if has_additional_requests else 0)] self.assertEqual(resp.request.url, f'{MOCK_HAIL_HOST}:5000/search') self.assertEqual(resp.request.headers.get('From'), 'manage_command') self.assertDictEqual(json.loads(resp.request.body), call) @@ -123,6 +189,8 @@ def _test_success(self, path, metadata, dataset_type, sample_guids, reload_calls ) @mock.patch('seqr.management.commands.check_for_new_samples_from_pipeline.MAX_LOOKUP_VARIANTS', 1) + @mock.patch('seqr.management.commands.check_for_new_samples_from_pipeline.BASE_URL', 'https://test-seqr.org/') + @mock.patch('seqr.views.utils.airtable_utils.MAX_UPDATE_RECORDS', 2) @mock.patch('seqr.views.utils.airtable_utils.logger') @mock.patch('seqr.utils.communication_utils.EmailMultiAlternatives') @responses.activate @@ -131,6 +199,21 @@ def test_command(self, mock_email, mock_airtable_utils): responses.GET, "http://testairtable/appUelDNM3BnWaR7M/AnVIL%20Seqr%20Loading%20Requests%20Tracking?fields[]=Status&pageSize=2&filterByFormula=AND({AnVIL Project URL}='https://seqr.broadinstitute.org/project/R0004_non_analyst_project/project_page',OR(Status='Loading',Status='Loading Requested'))", json={'records': [{'id': 'rec12345', 'fields': {}}, {'id': 'rec67890', 'fields': {}}]}) + airtable_samples_url = 'http://testairtable/app3Y97xtbbaOopVR/Samples' + airtable_pdo_url = 'http://testairtable/app3Y97xtbbaOopVR/PDO' + responses.add( + responses.GET, + f"{airtable_samples_url}?fields[]=CollaboratorSampleID&fields[]=SeqrCollaboratorSampleID&fields[]=PDOID&pageSize=100&filterByFormula=AND({{SeqrProject}}='https://test-seqr.org/project/R0003_test/project_page',OR(PDOStatus='Methods (Loading)',PDOStatus='On hold for phenotips, but ready to load'))", + json=AIRTABLE_SAMPLE_RECORDS) + responses.add( + responses.GET, + f"{airtable_pdo_url}?{PDO_QUERY_FIELDS}&pageSize=100&filterByFormula=OR(RECORD_ID()='recW24C2CJW5lT64K')", + json=AIRTABLE_PDO_RECORDS) + responses.add(responses.PATCH, airtable_samples_url, json=AIRTABLE_SAMPLE_RECORDS) + responses.add(responses.PATCH, airtable_pdo_url, status=400) + responses.add_callback(responses.POST, airtable_pdo_url, callback=lambda request: (200, {}, json.dumps({ + 'records': [{'id': f'rec{i}ABC123', **r} for i, r in enumerate(json.loads(request.body)['records'])] + }))) responses.add(responses.POST, f'{MOCK_HAIL_HOST}:5000/search', status=200, json={ 'results': [{'variantId': '1-248367227-TC-T', 'familyGuids': ['F000014_14'], 'updated_field': 'updated_value'}], 'total': 1, @@ -263,9 +346,40 @@ def test_command(self, mock_email, mock_airtable_utils): ]) self.assertEqual(Family.objects.get(guid='F000014_14').analysis_status, 'Rncc') + # Test airtable PDO updates + update_pdos_request = responses.calls[1].request + self.assertEqual(update_pdos_request.url, airtable_pdo_url) + self.assertEqual(update_pdos_request.method, 'PATCH') + self.assertDictEqual(json.loads(update_pdos_request.body), {'records': [ + {'id': 'rec0RWBVfDVbtlBSL', 'fields': {'PDOStatus': 'Available in seqr'}}, + {'id': 'recW24C2CJW5lT64K', 'fields': {'PDOStatus': 'Available in seqr'}}, + ]}) + create_pdos_request = responses.calls[3].request + self.assertEqual(create_pdos_request.url, airtable_pdo_url) + self.assertEqual(create_pdos_request.method, 'POST') + self.assertDictEqual(json.loads(create_pdos_request.body), {'records': [{'fields': { + 'PDO': 'PDO-1234_sr', + 'SeqrProjectURL': 'https://test-seqr.org/project/R0003_test/project_page', + 'PDOStatus': 'Methods (Loading)', + 'PDOName': 'RGP_WGS_12', + }}]}) + update_samples_request = responses.calls[4].request + self.assertEqual(update_samples_request.url, airtable_samples_url) + self.assertEqual(update_samples_request.method, 'PATCH') + self.assertDictEqual(json.loads(update_samples_request.body), {'records': [ + {'id': 'rec2B6OGmQpAkQW3s', 'fields': {'PDOID': ['rec0ABC123']}}, + {'id': 'rec2Nkg10N1KssPc3', 'fields': {'PDOID': ['rec0ABC123']}}, + ]}) + update_samples_request_2 = responses.calls[5].request + self.assertEqual(update_samples_request_2.url, airtable_samples_url) + self.assertEqual(update_samples_request_2.method, 'PATCH') + self.assertDictEqual(json.loads(update_samples_request_2.body), {'records': [ + {'id': 'recfMYDEZpPtzAIeV', 'fields': {'PDOID': ['rec0ABC123']}}, + ]}) + # Test SavedVariant model updated for i, variant_id in enumerate([['1', 1562437, 'G', 'CA'], ['1', 46859832, 'G', 'A']]): - multi_lookup_request = responses.calls[3+i].request + multi_lookup_request = responses.calls[9+i].request self.assertEqual(multi_lookup_request.url, f'{MOCK_HAIL_HOST}:5000/multi_lookup') self.assertEqual(multi_lookup_request.headers.get('From'), 'manage_command') self.assertDictEqual(json.loads(multi_lookup_request.body), { @@ -348,11 +462,15 @@ def test_command(self, mock_email, mock_airtable_utils): self.assertDictEqual(mock_email.return_value.esp_extra, {'MessageStream': 'seqr-notifications'}) self.assertDictEqual(mock_email.return_value.merge_data, {}) - mock_airtable_utils.error.assert_called_with( + self.assertEqual(mock_airtable_utils.error.call_count, 2) + mock_airtable_utils.error.assert_has_calls([mock.call( + f'Airtable patch "PDO" error: 400 Client Error: Bad Request for url: {airtable_pdo_url}', None, detail={ + 'record_ids': {'rec0RWBVfDVbtlBSL', 'recW24C2CJW5lT64K'}, 'update': {'PDOStatus': 'Available in seqr'}} + ), mock.call( 'Airtable patch "AnVIL Seqr Loading Requests Tracking" error: Unable to identify record to update', None, detail={ 'or_filters': {'Status': ['Loading', 'Loading Requested']}, 'and_filters': {'AnVIL Project URL': 'https://seqr.broadinstitute.org/project/R0004_non_analyst_project/project_page'}, - 'update': {'Status': 'Available in Seqr'}}) + 'update': {'Status': 'Available in Seqr'}})]) self.assertEqual(self.manager_user.notifications.count(), 3) self.assertEqual( diff --git a/seqr/urls.py b/seqr/urls.py index a56da19889..52e180c1ae 100644 --- a/seqr/urls.py +++ b/seqr/urls.py @@ -121,7 +121,7 @@ forgot_password from seqr.views.apis.data_manager_api import elasticsearch_status, upload_qc_pipeline_output, delete_index, \ - update_rna_seq, load_rna_seq_sample_data, proxy_to_kibana, load_phenotype_prioritization_data, write_pedigree, \ + update_rna_seq, load_rna_seq_sample_data, proxy_to_kibana, load_phenotype_prioritization_data, \ validate_callset, get_loaded_projects, load_data from seqr.views.apis.report_api import \ anvil_export, \ @@ -333,7 +333,6 @@ 'data_management/update_rna_seq': update_rna_seq, 'data_management/load_rna_seq_sample/(?P[^/]+)': load_rna_seq_sample_data, 'data_management/load_phenotype_prioritization_data': load_phenotype_prioritization_data, - 'data_management/write_pedigree/(?P[^/]+)': write_pedigree, 'data_management/validate_callset': validate_callset, 'data_management/loaded_projects/(?P[^/]+)/(?P[^/]+)': get_loaded_projects, 'data_management/load_data': load_data, diff --git a/seqr/utils/communication_utils.py b/seqr/utils/communication_utils.py index ef4291065d..25291ebf84 100644 --- a/seqr/utils/communication_utils.py +++ b/seqr/utils/communication_utils.py @@ -58,12 +58,16 @@ def send_html_email(email_body, process_message=None, **kwargs): def send_project_notification(project, notification, email, subject): users = project.subscribers.user_set.all() notify.send(project, recipient=users, verb=notification) - send_html_email( + email_kwargs = dict( email_body=BASE_EMAIL_TEMPLATE.format(email), to=list(users.values_list('email', flat=True)), subject=subject, process_message=_set_bulk_notification_stream, ) + try: + send_html_email(**email_kwargs) + except Exception as e: + logger.error(f'Error sending project email for {project.guid}: {e}', extra={'detail': email_kwargs}) def _set_bulk_notification_stream(message): diff --git a/seqr/utils/file_utils.py b/seqr/utils/file_utils.py index ee5b0835bd..76e1258da0 100644 --- a/seqr/utils/file_utils.py +++ b/seqr/utils/file_utils.py @@ -1,3 +1,4 @@ +import glob import gzip import os import subprocess # nosec @@ -47,6 +48,12 @@ def does_file_exist(file_path, user=None): return os.path.isfile(file_path) +def list_files(wildcard_path, user): + if is_google_bucket_file_path(wildcard_path): + return get_gs_file_list(wildcard_path, user, check_subfolders=False, allow_missing=True) + return [file_path for file_path in glob.glob(wildcard_path) if os.path.isfile(file_path)] + + def file_iter(file_path, byte_range=None, raw_content=False, user=None, **kwargs): if is_google_bucket_file_path(file_path): for line in _google_bucket_file_iter(file_path, byte_range=byte_range, raw_content=raw_content, user=user, **kwargs): diff --git a/seqr/utils/search/add_data_utils.py b/seqr/utils/search/add_data_utils.py index ece4f55d48..5a3e0c221c 100644 --- a/seqr/utils/search/add_data_utils.py +++ b/seqr/utils/search/add_data_utils.py @@ -1,13 +1,21 @@ -from seqr.models import Sample +from collections import defaultdict, OrderedDict +from django.contrib.auth.models import User +from django.db.models import F + +from reference_data.models import GENOME_VERSION_LOOKUP +from seqr.models import Sample, Individual, Project from seqr.utils.communication_utils import send_project_notification, safe_post_to_slack +from seqr.utils.logging_utils import SeqrLogger from seqr.utils.search.utils import backend_specific_call from seqr.utils.search.elasticsearch.es_utils import validate_es_index_metadata_and_get_samples from seqr.views.utils.airtable_utils import AirtableSession, ANVIL_REQUEST_TRACKING_TABLE from seqr.views.utils.dataset_utils import match_and_update_search_samples, load_mapping_file -from seqr.views.utils.permissions_utils import is_internal_anvil_project, project_has_anvil +from seqr.views.utils.export_utils import write_multiple_files from settings import SEQR_SLACK_DATA_ALERTS_NOTIFICATION_CHANNEL, BASE_URL, ANVIL_UI_URL, \ SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL +logger = SeqrLogger(__name__) + def _hail_backend_error(*args, **kwargs): raise ValueError('Adding samples is disabled for the hail backend') @@ -42,51 +50,119 @@ def add_new_es_search_samples(request_json, project, user, notify=False, expecte ) if notify: - num_samples = len(sample_ids) - num_skipped updated_sample_data = updated_samples.values('sample_id', 'individual_id') - notify_search_data_loaded(project, dataset_type, sample_type, inactivated_sample_guids, updated_sample_data, num_samples) + _basic_notify_search_data_loaded(project, dataset_type, sample_type, inactivated_sample_guids, updated_sample_data) return inactivated_sample_guids, updated_family_guids, updated_samples -def notify_search_data_loaded(project, dataset_type, sample_type, inactivated_sample_guids, updated_samples, num_samples): - is_internal = not project_has_anvil(project) or is_internal_anvil_project(project) +def _format_email(sample_summary, project_link, *args): + return f'This is to notify you that {sample_summary} have been loaded in seqr project {project_link}' + +def _basic_notify_search_data_loaded(project, dataset_type, sample_type, inactivated_sample_guids, updated_samples, format_email=_format_email): previous_loaded_individuals = set(Sample.objects.filter(guid__in=inactivated_sample_guids).values_list('individual_id', flat=True)) new_sample_ids = [sample['sample_id'] for sample in updated_samples if sample['individual_id'] not in previous_loaded_individuals] url = f'{BASE_URL}project/{project.guid}/project_page' msg_dataset_type = '' if dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS else f' {dataset_type}' - sample_id_list = f'\n```{", ".join(sorted(new_sample_ids))}```' if is_internal else '' num_new_samples = len(new_sample_ids) sample_summary = f'{num_new_samples} new {sample_type}{msg_dataset_type} samples' - summary_message = f'{sample_summary} are loaded in {url}{sample_id_list}' + project_link = f'{project.name}' + email = format_email(sample_summary, project_link, num_new_samples) + + send_project_notification( + project, + notification=f'Loaded {sample_summary}', + email=email, + subject='New data available in seqr', + ) + + return sample_summary, new_sample_ids, url + + +def notify_search_data_loaded(project, is_internal, dataset_type, sample_type, inactivated_sample_guids, updated_samples, num_samples): + if is_internal: + format_email = _format_email + else: + workspace_name = f'{project.workspace_namespace}/{project.workspace_name}' + def format_email(sample_summary, project_link, num_new_samples): + reload_summary = f' and {num_samples - num_new_samples} re-loaded samples' if num_samples > num_new_samples else '' + return '\n'.join([ + f'We are following up on the request to load data from AnVIL on {project.created_date.date().strftime("%B %d, %Y")}.', + f'We have loaded {sample_summary}{reload_summary} from the AnVIL workspace {workspace_name} to the corresponding seqr project {project_link}.', + 'Let us know if you have any questions.', + ]) + + sample_summary, new_sample_ids, url = _basic_notify_search_data_loaded( + project, dataset_type, sample_type, inactivated_sample_guids, updated_samples, format_email=format_email, + ) + + sample_id_list = f'\n```{", ".join(sorted(new_sample_ids))}```' if is_internal else '' + summary_message = f'{sample_summary} are loaded in {url}{sample_id_list}' safe_post_to_slack( SEQR_SLACK_DATA_ALERTS_NOTIFICATION_CHANNEL if is_internal else SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL, summary_message) - project_link = f'{project.name}' - if is_internal: - email = f'This is to notify you that {sample_summary} have been loaded in seqr project {project_link}' - else: + if not is_internal: AirtableSession(user=None, base=AirtableSession.ANVIL_BASE, no_auth=True).safe_patch_records( ANVIL_REQUEST_TRACKING_TABLE, max_records=1, record_or_filters={'Status': ['Loading', 'Loading Requested']}, record_and_filters={'AnVIL Project URL': url}, update={'Status': 'Available in Seqr'}, ) - workspace_name = f'{project.workspace_namespace}/{project.workspace_name}' - reload_summary = f' and {num_samples - num_new_samples} re-loaded samples' if num_samples > num_new_samples else '' - email = '\n'.join([ - f'We are following up on the request to load data from AnVIL on {project.created_date.date().strftime("%B %d, %Y")}.', - f'We have loaded {sample_summary}{reload_summary} from the AnVIL workspace {workspace_name} to the corresponding seqr project {project_link}.', - 'Let us know if you have any questions.', - ]) - send_project_notification( - project, - notification=f'Loaded {sample_summary}', - email=email, - subject='New data available in seqr', - ) + +def prepare_data_loading_request(projects: list[Project], sample_type: str, dataset_type: str, genome_version: str, + data_path: str, user: User, pedigree_dir: str, raise_pedigree_error: bool = False, + individual_ids: list[str] = None): + project_guids = sorted([p.guid for p in projects]) + variables = { + 'projects_to_run': project_guids, + 'callset_path': data_path, + 'sample_type': sample_type, + 'dataset_type': _dag_dataset_type(sample_type, dataset_type), + 'reference_genome': GENOME_VERSION_LOOKUP[genome_version], + } + file_path = _get_pedigree_path(pedigree_dir, genome_version, sample_type, dataset_type) + _upload_data_loading_files(projects, user, file_path, individual_ids, raise_pedigree_error) + return variables, file_path + + +def _dag_dataset_type(sample_type: str, dataset_type: str): + return 'GCNV' if dataset_type == Sample.DATASET_TYPE_SV_CALLS and sample_type == Sample.SAMPLE_TYPE_WES \ + else dataset_type + + +def _upload_data_loading_files(projects: list[Project], user: User, file_path: str, individual_ids: list[str], raise_error: bool): + file_annotations = OrderedDict({ + 'Project_GUID': F('family__project__guid'), 'Family_GUID': F('family__guid'), + 'Family_ID': F('family__family_id'), + 'Individual_ID': F('individual_id'), + 'Paternal_ID': F('father__individual_id'), 'Maternal_ID': F('mother__individual_id'), 'Sex': F('sex'), + }) + annotations = {'project': F('family__project__guid'), **file_annotations} + individual_filter = {'id__in': individual_ids} if individual_ids else {'family__project__in': projects} + data = Individual.objects.filter(**individual_filter).order_by('family_id', 'individual_id').values( + **dict(annotations)) + + data_by_project = defaultdict(list) + for row in data: + data_by_project[row.pop('project')].append(row) + + header = list(file_annotations.keys()) + files = [(f'{project_guid}_pedigree', header, rows) for project_guid, rows in data_by_project.items()] + + try: + write_multiple_files(files, file_path, user, file_format='tsv') + except Exception as e: + logger.error(f'Uploading Pedigrees failed. Errors: {e}', user, detail={ + project: rows for project, _, rows in files + }) + if raise_error: + raise e + + +def _get_pedigree_path(pedigree_dir: str, genome_version: str, sample_type: str, dataset_type: str): + return f'{pedigree_dir}/{GENOME_VERSION_LOOKUP[genome_version]}/{dataset_type}/pedigrees/{sample_type}' diff --git a/seqr/utils/vcf_utils.py b/seqr/utils/vcf_utils.py index 92f9bdd750..7a421db930 100644 --- a/seqr/utils/vcf_utils.py +++ b/seqr/utils/vcf_utils.py @@ -3,7 +3,7 @@ from collections import defaultdict from seqr.utils.middleware import ErrorsWarningsException -from seqr.utils.file_utils import file_iter, does_file_exist, get_gs_file_list +from seqr.utils.file_utils import file_iter, does_file_exist, list_files from seqr.utils.search.constants import VCF_FILE_EXTENSIONS BLOCK_SIZE = 65536 @@ -97,7 +97,7 @@ def validate_vcf_exists(data_path, user, path_name=None, allowed_exts=None): file_to_check = None if '*' in data_path: - files = get_gs_file_list(data_path, user, check_subfolders=False, allow_missing=True) + files = list_files(data_path, user) if files: file_to_check = files[0] elif does_file_exist(data_path, user=user): diff --git a/seqr/views/apis/anvil_workspace_api.py b/seqr/views/apis/anvil_workspace_api.py index 49fb942c2a..df809ff465 100644 --- a/seqr/views/apis/anvil_workspace_api.py +++ b/seqr/views/apis/anvil_workspace_api.py @@ -18,7 +18,7 @@ from seqr.views.utils.airtable_utils import AirtableSession, ANVIL_REQUEST_TRACKING_TABLE from seqr.utils.search.constants import VCF_FILE_EXTENSIONS from seqr.utils.search.utils import get_search_samples -from seqr.views.utils.airflow_utils import trigger_data_loading +from seqr.views.utils.airflow_utils import trigger_airflow_data_loading from seqr.views.utils.json_to_orm_utils import create_model_from_json from seqr.views.utils.json_utils import create_json_response from seqr.views.utils.file_utils import load_uploaded_file @@ -302,20 +302,19 @@ def _trigger_add_workspace_data(project, pedigree_records, user, data_path, samp success_message = f""" *{user.email}* requested to load {num_updated_individuals} new{reload_summary} {sample_type} samples ({GENOME_VERSION_LOOKUP.get(project.genome_version)}) from AnVIL workspace *{project.workspace_namespace}/{project.workspace_name}* at {data_path} to seqr project <{_get_seqr_project_url(project)}|*{project.name}*> (guid: {project.guid})""" - trigger_success = trigger_data_loading( - [project], sample_type, Sample.DATASET_TYPE_VARIANT_CALLS, data_path, user, success_message, - SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL, f'ERROR triggering AnVIL loading for project {project.guid}', - genome_version=project.genome_version, + trigger_success = trigger_airflow_data_loading( + [project], sample_type, Sample.DATASET_TYPE_VARIANT_CALLS, project.genome_version, data_path, user=user, success_message=success_message, + success_slack_channel=SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL, error_message=f'ERROR triggering AnVIL loading for project {project.guid}', ) - AirtableSession(user, base=AirtableSession.ANVIL_BASE).safe_create_record( - ANVIL_REQUEST_TRACKING_TABLE, { + AirtableSession(user, base=AirtableSession.ANVIL_BASE).safe_create_records( + ANVIL_REQUEST_TRACKING_TABLE, [{ 'Requester Name': user.get_full_name(), 'Requester Email': user.email, 'AnVIL Project URL': _get_seqr_project_url(project), 'Initial Request Date': datetime.now().strftime('%Y-%m-%d'), 'Number of Samples': len(sample_ids), 'Status': 'Loading' if trigger_success else 'Loading Requested' - }) + }]) loading_warning_date = ANVIL_LOADING_DELAY_EMAIL_START_DATE and datetime.strptime(ANVIL_LOADING_DELAY_EMAIL_START_DATE, '%Y-%m-%d') if loading_warning_date and loading_warning_date <= datetime.now(): diff --git a/seqr/views/apis/anvil_workspace_api_tests.py b/seqr/views/apis/anvil_workspace_api_tests.py index e45e29f093..36490f516a 100644 --- a/seqr/views/apis/anvil_workspace_api_tests.py +++ b/seqr/views/apis/anvil_workspace_api_tests.py @@ -8,7 +8,7 @@ from seqr.models import Project, Family, Individual from seqr.views.apis.anvil_workspace_api import anvil_workspace_page, create_project_from_workspace, \ validate_anvil_vcf, grant_workspace_access, add_workspace_data, get_anvil_vcf_list, get_anvil_igv_options -from seqr.views.utils.test_utils import AnvilAuthenticationTestCase, AuthenticationTestCase, AirflowTestCase, \ +from seqr.views.utils.test_utils import AnvilAuthenticationTestCase, AuthenticationTestCase, AirflowTestCase, AirtableTest, \ TEST_WORKSPACE_NAMESPACE, TEST_WORKSPACE_NAME, TEST_WORKSPACE_NAME1, TEST_NO_PROJECT_WORKSPACE_NAME, TEST_NO_PROJECT_WORKSPACE_NAME2 from seqr.views.utils.terra_api_utils import remove_token, TerraAPIException, TerraRefreshTokenFailedException from settings import SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL, SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL @@ -67,7 +67,6 @@ TEMP_PATH = '/temp_path/temp_filename' MOCK_AIRTABLE_URL = 'http://testairtable' -MOCK_AIRTABLE_KEY = 'mock_key' # nosec PROJECT1_SAMPLES = ['HG00735', 'NA19678', 'NA20870', 'HG00732', 'NA19675_1', 'NA20874', 'HG00733', 'HG00731'] PROJECT2_SAMPLES = ['NA20885', 'NA19675_1', 'NA19678', 'HG00735'] @@ -484,7 +483,7 @@ def _test_get_workspace_files(self, url, response_key, expected_files, mock_subp ]) -class LoadAnvilDataAPITest(AirflowTestCase): +class LoadAnvilDataAPITest(AirflowTestCase, AirtableTest): fixtures = ['users', 'social_auth', 'reference_data', '1kg_project'] LOADING_PROJECT_GUID = f'P_{TEST_NO_PROJECT_WORKSPACE_NAME}' @@ -509,9 +508,6 @@ def _get_dag_variable_overrides(additional_tasks_check): def setUp(self): # Set up api responses responses.add(responses.POST, f'{MOCK_AIRTABLE_URL}/appUelDNM3BnWaR7M/AnVIL%20Seqr%20Loading%20Requests%20Tracking', status=400) - patcher = mock.patch('seqr.views.utils.airtable_utils.AIRTABLE_API_KEY', MOCK_AIRTABLE_KEY) - patcher.start() - self.addCleanup(patcher.stop) patcher = mock.patch('seqr.views.utils.airtable_utils.AIRTABLE_URL', MOCK_AIRTABLE_URL) patcher.start() self.addCleanup(patcher.stop) @@ -525,6 +521,9 @@ def setUp(self): patcher = mock.patch('seqr.views.utils.airtable_utils.logger') self.mock_airtable_logger = patcher.start() self.addCleanup(patcher.stop) + patcher = mock.patch('seqr.utils.search.add_data_utils.logger') + self.mock_add_data_utils_logger = patcher.start() + self.addCleanup(patcher.stop) patcher = mock.patch('seqr.views.apis.anvil_workspace_api.load_uploaded_file') self.mock_load_file = patcher.start() self.mock_load_file.return_value = LOAD_SAMPLE_DATA @@ -533,9 +532,6 @@ def setUp(self): self.mock_mv_file = patcher.start() self.mock_mv_file.return_value = True self.addCleanup(patcher.stop) - patcher = mock.patch('seqr.views.utils.airflow_utils.run_gsutil_with_wait') - self.mock_gsutil = patcher.start() - self.addCleanup(patcher.stop) patcher = mock.patch('seqr.views.utils.export_utils.TemporaryDirectory') mock_tempdir = patcher.start() mock_tempdir.return_value.__enter__.return_value = TEMP_PATH @@ -757,15 +753,11 @@ def _assert_valid_operation(self, project, test_add_data=True): '\n'.join(['\t'.join(row) for row in [header] + rows]) ) - gs_path = f'gs://seqr-datasets/v02/{genome_version}/AnVIL_WES/{project.guid}/base/' + gs_path = f'gs://seqr-loading-temp/v3.1/{genome_version}/SNV_INDEL/pedigrees/WES/' self.mock_mv_file.assert_called_with( f'{TEMP_PATH}/*', gs_path, self.manager_user ) - self.mock_gsutil.assert_called_with( - f'rsync -r {gs_path}', f'gs://seqr-loading-temp/v3.1/{genome_version}/SNV_INDEL/pedigrees/WES/', self.manager_user, - ) - self.assert_airflow_calls(additional_tasks_check=test_add_data) # create airtable record @@ -777,15 +769,15 @@ def _assert_valid_operation(self, project, test_add_data=True): 'Number of Samples': 8 if test_add_data else 3, 'Status': 'Loading', }}]}) - self.assertEqual(responses.calls[-1].request.headers['Authorization'], 'Bearer {}'.format(MOCK_AIRTABLE_KEY)) + self.assert_expected_airtable_headers(-1) dag_json = { 'projects_to_run': [project.guid], 'callset_path': 'gs://test_bucket/test_path.vcf', - 'sample_source': 'AnVIL', 'sample_type': 'WES', 'dataset_type': 'SNV_INDEL', 'reference_genome': genome_version, + 'sample_source': 'AnVIL', } sample_summary = '3 new' if test_add_data: @@ -794,7 +786,7 @@ def _assert_valid_operation(self, project, test_add_data=True): *test_user_manager@test.com* requested to load {sample_summary} WES samples ({version}) from AnVIL workspace *my-seqr-billing/{workspace_name}* at gs://test_bucket/test_path.vcf to seqr project (guid: {guid}) - Pedigree file has been uploaded to gs://seqr-datasets/v02/{version}/AnVIL_WES/{guid}/base/ + Pedigree files have been uploaded to gs://seqr-loading-temp/v3.1/{version}/SNV_INDEL/pedigrees/WES DAG LOADING_PIPELINE is triggered with following: ```{dag}``` @@ -847,15 +839,15 @@ def _test_mv_file_and_triggering_dag_exception(self, url, workspace, sample_data self.assertEqual(response.status_code, 200) project = Project.objects.get(**workspace) - self.mock_airflow_logger.error.assert_called_with( - 'Uploading Pedigree to Google Storage failed. Errors: Something wrong while moving the file.', - self.manager_user, detail=sample_data) + self.mock_add_data_utils_logger.error.assert_called_with( + 'Uploading Pedigrees failed. Errors: Something wrong while moving the file.', + self.manager_user, detail={f'{project.guid}_pedigree': sample_data}) self.mock_api_logger.error.assert_not_called() self.mock_airflow_logger.warning.assert_called_with( 'LOADING_PIPELINE DAG is running and cannot be triggered again.', self.manager_user) self.mock_airtable_logger.error.assert_called_with( - f'Airtable create "AnVIL Seqr Loading Requests Tracking" error: 400 Client Error: Bad Request for url: ' - f'{MOCK_AIRTABLE_URL}/appUelDNM3BnWaR7M/AnVIL%20Seqr%20Loading%20Requests%20Tracking', self.manager_user) + f'Airtable post "AnVIL Seqr Loading Requests Tracking" error: 400 Client Error: Bad Request for url: ' + f'{MOCK_AIRTABLE_URL}/appUelDNM3BnWaR7M/AnVIL%20Seqr%20Loading%20Requests%20Tracking', self.manager_user, detail=mock.ANY) slack_message_on_failure = """ERROR triggering AnVIL loading for project {guid}: LOADING_PIPELINE DAG is running and cannot be triggered again. @@ -866,10 +858,10 @@ def _test_mv_file_and_triggering_dag_exception(self, url, workspace, sample_data dag=json.dumps({ 'projects_to_run': [project.guid], 'callset_path': 'gs://test_bucket/test_path.vcf', - 'sample_source': 'AnVIL', 'sample_type': 'WES', 'dataset_type': 'SNV_INDEL', 'reference_genome': genome_version, + 'sample_source': 'AnVIL', }, indent=4), ) diff --git a/seqr/views/apis/data_manager_api.py b/seqr/views/apis/data_manager_api.py index 1ad54f5d99..f12550e231 100644 --- a/seqr/views/apis/data_manager_api.py +++ b/seqr/views/apis/data_manager_api.py @@ -15,16 +15,17 @@ from requests.exceptions import ConnectionError as RequestConnectionError from seqr.utils.communication_utils import send_project_notification +from seqr.utils.search.add_data_utils import prepare_data_loading_request from seqr.utils.search.utils import get_search_backend_status, delete_search_backend_data from seqr.utils.file_utils import file_iter, does_file_exist from seqr.utils.logging_utils import SeqrLogger from seqr.utils.middleware import ErrorsWarningsException from seqr.utils.vcf_utils import validate_vcf_exists -from seqr.views.utils.airflow_utils import trigger_data_loading, write_data_loading_pedigree -from seqr.views.utils.airtable_utils import AirtableSession +from seqr.views.utils.airflow_utils import trigger_airflow_data_loading +from seqr.views.utils.airtable_utils import AirtableSession, LOADABLE_PDO_STATUSES, AVAILABLE_PDO_STATUS from seqr.views.utils.dataset_utils import load_rna_seq, load_phenotype_prioritization_data_file, RNA_DATA_TYPE_CONFIGS, \ - post_process_rna_data + post_process_rna_data, convert_django_meta_to_http_headers from seqr.views.utils.file_utils import parse_file, get_temp_file_path, load_uploaded_file, persist_temp_file from seqr.views.utils.json_utils import create_json_response from seqr.views.utils.json_to_orm_utils import update_model_from_json @@ -32,7 +33,8 @@ from seqr.models import Sample, RnaSample, Individual, Project, PhenotypePrioritization -from settings import KIBANA_SERVER, KIBANA_ELASTICSEARCH_PASSWORD, SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, BASE_URL +from settings import KIBANA_SERVER, KIBANA_ELASTICSEARCH_PASSWORD, SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, BASE_URL, \ + LOADING_DATASETS_DIR, PIPELINE_RUNNER_SERVER logger = SeqrLogger(__name__) @@ -431,28 +433,13 @@ def load_phenotype_prioritization_data(request): }) -@data_manager_required -def write_pedigree(request, project_guid): - project = Project.objects.get(guid=project_guid) - try: - write_data_loading_pedigree(project, request.user) - except ValueError as e: - return create_json_response({'error': str(e)}, status=400) - - return create_json_response({'success': True}) - - DATA_TYPE_FILE_EXTS = { Sample.DATASET_TYPE_MITO_CALLS: ('.mt',), Sample.DATASET_TYPE_SV_CALLS: ('.bed', '.bed.gz'), } -LOADABLE_PDO_STATUSES = [ - 'On hold for phenotips, but ready to load', - 'Methods (Loading)', -] AVAILABLE_PDO_STATUSES = { - 'Available in seqr', + AVAILABLE_PDO_STATUS, 'Historic', } @@ -471,16 +458,19 @@ def get_loaded_projects(request, sample_type, dataset_type): projects = get_internal_projects().filter(is_demo=False) project_samples = None if dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS: - project_samples = _fetch_airtable_loadable_project_samples(request.user) - projects = projects.filter(guid__in=project_samples.keys()) + if AirtableSession.is_airtable_enabled(): + project_samples = _fetch_airtable_loadable_project_samples(request.user) + projects = projects.filter(guid__in=project_samples.keys()) exclude_sample_type = Sample.SAMPLE_TYPE_WES if sample_type == Sample.SAMPLE_TYPE_WGS else Sample.SAMPLE_TYPE_WGS # Include projects with either the matched sample type OR with no loaded data projects = projects.exclude(family__individual__sample__sample_type=exclude_sample_type) else: + # All other data types can only be loaded to projects which already have loaded data projects = projects.filter(family__individual__sample__sample_type=sample_type) projects = projects.distinct().order_by('name').values('name', projectGuid=F('guid'), dataTypeLastLoaded=Max( - 'family__individual__sample__loaded_date', filter=Q(family__individual__sample__dataset_type=dataset_type), + 'family__individual__sample__loaded_date', + filter=Q(family__individual__sample__dataset_type=dataset_type) & Q(family__individual__sample__sample_type=sample_type), )) if project_samples: @@ -519,21 +509,28 @@ def load_data(request): missing = sorted(set(project_samples.keys()) - {p.guid for p in project_models}) return create_json_response({'error': f'The following projects are invalid: {", ".join(missing)}'}, status=400) - additional_project_files = None + has_airtable = AirtableSession.is_airtable_enabled() individual_ids = None - if dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS: + if dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS and has_airtable: individual_ids = _get_valid_project_samples(project_samples, sample_type, request.user) - additional_project_files = { - project_guid: (f'{project_guid}_ids', ['s'], [{'s': sample_id} for sample_id in sample_ids]) - for project_guid, sample_ids in project_samples.items() - } - - success_message = f'*{request.user.email}* triggered loading internal {sample_type} {dataset_type} data for {len(projects)} projects' - trigger_data_loading( - project_models, sample_type, dataset_type, request_json['filePath'], request.user, success_message, - SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, f'ERROR triggering internal {sample_type} {dataset_type} loading', - is_internal=True, individual_ids=individual_ids, additional_project_files=additional_project_files, + + loading_args = ( + project_models, sample_type, dataset_type, request_json['genomeVersion'], request_json['filePath'], ) + if has_airtable: + success_message = f'*{request.user.email}* triggered loading internal {sample_type} {dataset_type} data for {len(projects)} projects' + error_message = f'ERROR triggering internal {sample_type} {dataset_type} loading' + trigger_airflow_data_loading( + *loading_args, user=request.user, success_message=success_message, error_message=error_message, + success_slack_channel=SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, is_internal=True, individual_ids=individual_ids, + ) + else: + request_json, _ = prepare_data_loading_request( + *loading_args, user=request.user, pedigree_dir=LOADING_DATASETS_DIR, raise_pedigree_error=True, + ) + response = requests.post(f'{PIPELINE_RUNNER_SERVER}/loading_pipeline_enqueue', json=request_json, timeout=60) + response.raise_for_status() + logger.info('Triggered loading pipeline', request.user, detail=request_json) return create_json_response({'success': True}) @@ -618,7 +615,7 @@ def _is_loaded_airtable_sample(sample, project_guid): @data_manager_required @csrf_exempt def proxy_to_kibana(request): - headers = _convert_django_meta_to_http_headers(request.META) + headers = convert_django_meta_to_http_headers(request) headers['Host'] = KIBANA_SERVER if KIBANA_ELASTICSEARCH_PASSWORD: token = base64.b64encode('kibana:{}'.format(KIBANA_ELASTICSEARCH_PASSWORD).encode('utf-8')) @@ -652,19 +649,3 @@ def proxy_to_kibana(request): except (ConnectionError, RequestConnectionError) as e: logger.error(str(e), request.user) return HttpResponse("Error: Unable to connect to Kibana {}".format(e), status=400) - - -def _convert_django_meta_to_http_headers(request_meta_dict): - """Converts django request.META dictionary into a dictionary of HTTP headers.""" - - def convert_key(key): - # converting Django's all-caps keys (eg. 'HTTP_RANGE') to regular HTTP header keys (eg. 'Range') - return key.replace("HTTP_", "").replace('_', '-').title() - - http_headers = { - convert_key(key): str(value).lstrip() - for key, value in request_meta_dict.items() - if key.startswith("HTTP_") or (key in ('CONTENT_LENGTH', 'CONTENT_TYPE') and value) - } - - return http_headers diff --git a/seqr/views/apis/data_manager_api_tests.py b/seqr/views/apis/data_manager_api_tests.py index 68fbcf59d2..dd83f9fc3f 100644 --- a/seqr/views/apis/data_manager_api_tests.py +++ b/seqr/views/apis/data_manager_api_tests.py @@ -8,7 +8,7 @@ from seqr.utils.communication_utils import _set_bulk_notification_stream from seqr.views.apis.data_manager_api import elasticsearch_status, upload_qc_pipeline_output, delete_index, \ - update_rna_seq, load_rna_seq_sample_data, load_phenotype_prioritization_data, write_pedigree, validate_callset, \ + update_rna_seq, load_rna_seq_sample_data, load_phenotype_prioritization_data, validate_callset, \ get_loaded_projects, load_data from seqr.views.utils.orm_to_json_utils import _get_json_for_models from seqr.views.utils.test_utils import AuthenticationTestCase, AirflowTestCase, AirtableTest @@ -17,6 +17,7 @@ from settings import SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL PROJECT_GUID = 'R0001_1kg' +NON_ANALYST_PROJECT_GUID = 'R0004_non_analyst_project' ES_CAT_ALLOCATION=[{ 'node': 'node-1', @@ -405,8 +406,8 @@ 'dataTypeLastLoaded': None, 'name': 'Empty Project', 'projectGuid': 'R0002_empty', - 'sampleIds': ['HG00738', 'HG00739'], } +EMPTY_PROJECT_SAMPLES_OPTION = {**EMPTY_PROJECT_OPTION, 'sampleIds': ['HG00738', 'HG00739']} AIRTABLE_PDO_RECORDS = { 'records': [ @@ -461,10 +462,14 @@ ], } +PIPELINE_RUNNER_URL = 'http://pipeline-runner:6000/loading_pipeline_enqueue' + @mock.patch('seqr.views.utils.permissions_utils.PM_USER_GROUP', 'project-managers') class DataManagerAPITest(AirtableTest): + PROJECTS = [PROJECT_GUID, NON_ANALYST_PROJECT_GUID] + @urllib3_responses.activate def test_elasticsearch_status(self): url = reverse(elasticsearch_status) @@ -1361,84 +1366,18 @@ def _assert_expected_notifications(mock_send_email, expected_notifs: list[dict], ) mock_send_email.assert_has_calls(calls) - @staticmethod - def _ls_subprocess_calls(file, is_error=True): - calls = [ - mock.call(f'gsutil ls {file}',stdout=-1, stderr=-2, shell=True), # nosec - mock.call().wait(), - ] - if is_error: - calls.append(mock.call().stdout.__iter__()) - return calls - - @mock.patch('seqr.views.utils.export_utils.open') - @mock.patch('seqr.views.utils.export_utils.TemporaryDirectory') - @mock.patch('seqr.utils.file_utils.subprocess.Popen') - def test_write_pedigree(self, mock_subprocess, mock_temp_dir, mock_open): - mock_temp_dir.return_value.__enter__.return_value = '/mock/tmp' - mock_subprocess.return_value.wait.return_value = 1 - - url = reverse(write_pedigree, args=[PROJECT_GUID]) - self.check_data_manager_login(url) - - response = self.client.get(url) - self.assertEqual(response.status_code, 400) - self.assertEqual(response.json()['error'], f'No gs://seqr-datasets/v02 project directory found for {PROJECT_GUID}') - - project_directory_paths = [ - 'gs://seqr-datasets/v02/GRCh37/RDG_WGS_Broad_Internal/base/projects/R0001_1kg/', - 'gs://seqr-datasets/v02/GRCh37/RDG_WES_Broad_Internal/base/projects/R0001_1kg/', - 'gs://seqr-datasets/v02/GRCh37/RDG_WGS_Broad_External/base/projects/R0001_1kg/', - 'gs://seqr-datasets/v02/GRCh37/RDG_WES_Broad_External/base/projects/R0001_1kg/', - 'gs://seqr-datasets/v02/GRCh37/AnVIL_WGS/R0001_1kg/base/', - 'gs://seqr-datasets/v02/GRCh37/AnVIL_WES/R0001_1kg/base/', - ] - expected_calls = [] - for path in project_directory_paths: - expected_calls += self._ls_subprocess_calls(path) - mock_subprocess.assert_has_calls(expected_calls) - - # Test success - self._test_write_success( - 'gs://seqr-datasets/v02/GRCh37/RDG_WES_Broad_Internal/base/projects/R0001_1kg/', - url, mock_subprocess, mock_open, project_directory_paths, - ) - self._test_write_success( - 'gs://seqr-datasets/v02/GRCh37/AnVIL_WES/R0001_1kg/base/', - url, mock_subprocess, mock_open, project_directory_paths, - ) - - def _test_write_success(self, success_path, url, mock_subprocess, mock_open, project_directory_paths): - success_index = project_directory_paths.index(success_path) - mock_subprocess.reset_mock() - mock_subprocess.return_value.wait.side_effect = [1 for _ in range(success_index)] + [0, 0] - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - self.assertDictEqual(response.json(), {'success': True}) - - mock_open.assert_called_with(f'/mock/tmp/{PROJECT_GUID}_pedigree.tsv', 'w') - write_call = mock_open.return_value.__enter__.return_value.write.call_args.args[0] - file = [row.split('\t') for row in write_call.split('\n')] - self.assertEqual(len(file), 15) - self.assertListEqual(file[:5], [PEDIGREE_HEADER] + EXPECTED_PEDIGREE_ROWS) - - expected_calls = [] - for path in project_directory_paths[:success_index]: - expected_calls += self._ls_subprocess_calls(path) - expected_calls += self._ls_subprocess_calls(success_path, is_error=False) + [ - mock.call('gsutil mv /mock/tmp/* ' + success_path, stdout=-1, stderr=-2, shell=True), # nosec - mock.call().wait(), - ] - mock_subprocess.assert_has_calls(expected_calls) - + @mock.patch('seqr.utils.file_utils.os.path.isfile') + @mock.patch('seqr.utils.file_utils.glob.glob') @mock.patch('seqr.utils.file_utils.subprocess.Popen') - def test_validate_callset(self, mock_subprocess): + def test_validate_callset(self, mock_subprocess, mock_glob, mock_os_isfile): url = reverse(validate_callset) self.check_pm_login(url) + mock_os_isfile.return_value = False + mock_glob.return_value = [] mock_subprocess.return_value.wait.return_value = -1 mock_subprocess.return_value.stdout = [b'File not found'] - body = {'filePath': 'gs://test_bucket/mito_callset.mt', 'datasetType': 'SV'} + body = {'filePath': f'{self.CALLSET_DIR}/mito_callset.mt', 'datasetType': 'SV'} response = self.client.post(url, content_type='application/json', data=json.dumps(body)) self.assertEqual(response.status_code, 400) self.assertListEqual(response.json()['errors'], [ @@ -1448,20 +1387,39 @@ def test_validate_callset(self, mock_subprocess): body['datasetType'] = 'MITO' response = self.client.post(url, content_type='application/json', data=json.dumps(body)) self.assertEqual(response.status_code, 400) - self.assertListEqual(response.json()['errors'], ['Data file or path gs://test_bucket/mito_callset.mt is not found.']) + self.assertListEqual(response.json()['errors'], [f'Data file or path {self.CALLSET_DIR}/mito_callset.mt is not found.']) + mock_os_isfile.return_value = True mock_subprocess.return_value.wait.return_value = 0 response = self.client.post(url, content_type='application/json', data=json.dumps(body)) self.assertEqual(response.status_code, 200) self.assertDictEqual(response.json(), {'success': True}) + mock_subprocess.return_value.communicate.return_value = ( + b'', b'CommandException: One or more URLs matched no objects.', + ) + body = {'filePath': f'{self.CALLSET_DIR}/sharded_vcf/part0*.vcf', 'datasetType': 'SNV_INDEL'} + response = self.client.post(url, content_type='application/json', data=json.dumps(body)) + self.assertEqual(response.status_code, 400) + self.assertListEqual( + response.json()['errors'], [f'Data file or path {self.CALLSET_DIR}/sharded_vcf/part0*.vcf is not found.'], + ) + + mock_subprocess.return_value.communicate.return_value = ( + b'gs://test_bucket/sharded_vcf/part001.vcf\ngs://test_bucket/sharded_vcf/part002.vcf\n', b'', + ) + mock_glob.return_value = ['/local_dir/sharded_vcf/part001.vcf', '/local_dir/sharded_vcf/part002.vcf'] + response = self.client.post(url, content_type='application/json', data=json.dumps(body)) + self.assertEqual(response.status_code, 200) + self.assertDictEqual(response.json(), {'success': True}) + # test data manager access self.login_data_manager_user() response = self.client.post(url, content_type='application/json', data=json.dumps(body)) self.assertEqual(response.status_code, 200) + @mock.patch('seqr.views.utils.permissions_utils.INTERNAL_NAMESPACES', ['my-seqr-billing', 'ext-data']) @mock.patch('seqr.views.apis.data_manager_api.BASE_URL', 'https://seqr.broadinstitute.org/') - @mock.patch('seqr.views.utils.airtable_utils.is_google_authenticated', lambda x: True) @responses.activate def test_get_loaded_projects(self): url = reverse(get_loaded_projects, args=['WGS', 'SV']) @@ -1487,21 +1445,127 @@ def test_get_loaded_projects(self): snv_indel_url = url.replace('SV', 'SNV_INDEL') response = self.client.get(snv_indel_url) self.assertEqual(response.status_code, 200) - self.assertDictEqual(response.json(), {'projects': [EMPTY_PROJECT_OPTION, PROJECT_SAMPLES_OPTION]}) - self.assert_expected_airtable_call( - call_index=0, filter_formula="OR(PDOStatus='Methods (Loading)',PDOStatus='On hold for phenotips, but ready to load')", - fields=['PassingCollaboratorSampleIDs', 'SeqrIDs', 'SeqrProjectURL'], - ) + self.assertDictEqual(response.json(), {'projects': self.WGS_PROJECT_OPTIONS}) + self._assert_expected_get_projects_requests() # test projects with no data loaded are returned for any sample type response = self.client.get(snv_indel_url.replace('WGS', 'WES')) self.assertEqual(response.status_code, 200) - self.assertDictEqual(response.json(), {'projects': [EMPTY_PROJECT_OPTION]}) + self.assertDictEqual(response.json(), {'projects': self.WES_PROJECT_OPTIONS}) + + @responses.activate + @mock.patch('seqr.views.apis.data_manager_api.LOADING_DATASETS_DIR', '/local_datasets') + @mock.patch('seqr.views.apis.data_manager_api.BASE_URL', 'https://seqr.broadinstitute.org/') + @mock.patch('seqr.views.utils.export_utils.os.makedirs') + @mock.patch('seqr.views.utils.export_utils.open') + @mock.patch('seqr.views.utils.export_utils.TemporaryDirectory') + def test_load_data(self, mock_temp_dir, mock_open, mock_mkdir): + url = reverse(load_data) + self.check_pm_login(url) + + responses.add(responses.POST, PIPELINE_RUNNER_URL) + mock_temp_dir.return_value.__enter__.return_value = '/mock/tmp' + body = {'filePath': f'{self.CALLSET_DIR}/mito_callset.mt', 'datasetType': 'MITO', 'sampleType': 'WGS', 'genomeVersion': '38', 'projects': [ + json.dumps({'projectGuid': 'R0001_1kg'}), json.dumps(PROJECT_OPTION), json.dumps({'projectGuid': 'R0005_not_project'}), + ]} + response = self.client.post(url, content_type='application/json', data=json.dumps(body)) + self.assertEqual(response.status_code, 400) + self.assertDictEqual(response.json(), {'error': 'The following projects are invalid: R0005_not_project'}) + + self.reset_logs() + body['projects'] = body['projects'][:-1] + response = self.client.post(url, content_type='application/json', data=json.dumps(body)) + self.assertEqual(response.status_code, 200) + self.assertDictEqual(response.json(), {'success': True}) + + self._assert_expected_load_data_requests() + self._has_expected_ped_files(mock_open, mock_mkdir, 'MITO') + + dag_json = { + 'projects_to_run': [ + 'R0001_1kg', + 'R0004_non_analyst_project' + ], + 'callset_path': f'{self.CALLSET_DIR}/mito_callset.mt', + 'sample_type': 'WGS', + 'dataset_type': 'MITO', + 'reference_genome': 'GRCh38', + } + self._assert_success_notification(dag_json) + + # Test loading trigger error + self._set_loading_trigger_error() + mock_open.reset_mock() + mock_mkdir.reset_mock() + responses.calls.reset() + self.reset_logs() + + body.update({'datasetType': 'SV', 'filePath': f'{self.CALLSET_DIR}/sv_callset.vcf', 'sampleType': 'WES'}) + response = self.client.post(url, content_type='application/json', data=json.dumps(body)) + self._assert_trigger_error(response, body, dag_json) + self._assert_expected_load_data_requests(trigger_error=True, dataset_type='GCNV', sample_type='WES') + self._has_expected_ped_files(mock_open, mock_mkdir, 'SV', sample_type='WES') + + # Test loading with sample subset + responses.add(responses.POST, PIPELINE_RUNNER_URL) + responses.calls.reset() + mock_open.reset_mock() + mock_mkdir.reset_mock() + body.update({'datasetType': 'SNV_INDEL', 'sampleType': 'WGS', 'projects': [json.dumps(PROJECT_SAMPLES_OPTION)]}) + response = self.client.post(url, content_type='application/json', data=json.dumps(body)) + self._test_load_sample_subset(mock_open, mock_mkdir, response, url, body) + + # Test write pedigree error + self.reset_logs() + responses.calls.reset() + mock_mkdir.reset_mock() + mock_open.reset_mock() + mock_open.side_effect = OSError('Restricted filesystem') + self.login_data_manager_user() + response = self.client.post(url, content_type='application/json', data=json.dumps(body)) + self._assert_write_pedigree_error(response) + self.assert_json_logs(self.data_manager_user, [ + ('Uploading Pedigrees failed. Errors: Restricted filesystem', { + 'severity': 'ERROR', + '@type': 'type.googleapis.com/google.devtools.clouderrorreporting.v1beta1.ReportedErrorEvent', + 'detail': {'R0004_non_analyst_project_pedigree': mock.ANY}, + }), + ]) + + def _has_expected_ped_files(self, mock_open, mock_mkdir, dataset_type, sample_type='WGS', has_project_subset=False, single_project=False): + mock_open.assert_has_calls([ + mock.call(f'{self._local_pedigree_path(dataset_type, sample_type)}/{project}_pedigree.tsv', 'w') + for project in self.PROJECTS[(1 if single_project else 0):] + ], any_order=True) + files = [ + [row.split('\t') for row in write_call.args[0].split('\n')] + for write_call in mock_open.return_value.__enter__.return_value.write.call_args_list + ] + self.assertEqual(len(files), 1 if single_project else 2) + + num_rows = 4 if has_project_subset else 15 + if not single_project: + self.assertEqual(len(files[0]), num_rows) + self.assertListEqual(files[0][:5], [PEDIGREE_HEADER] + EXPECTED_PEDIGREE_ROWS[:num_rows-1]) + file = files[0 if single_project else 1] + self.assertEqual(len(file), 3) + self.assertListEqual(file, [ + PEDIGREE_HEADER, + ['R0004_non_analyst_project', 'F000014_14', '14', 'NA21234', '', '', 'F'], + ['R0004_non_analyst_project', 'F000014_14', '14', 'NA21987', '', '', 'M'], + ]) class LocalDataManagerAPITest(AuthenticationTestCase, DataManagerAPITest): fixtures = ['users', '1kg_project', 'reference_data'] + CALLSET_DIR = '/local_datasets' + WGS_PROJECT_OPTIONS = [EMPTY_PROJECT_OPTION, PROJECT_OPTION] + WES_PROJECT_OPTIONS = [ + {'name': '1kg project nåme with uniçøde', 'projectGuid': 'R0001_1kg', 'dataTypeLastLoaded': '2017-02-05T06:25:55.397Z'}, + EMPTY_PROJECT_OPTION, + ] + def setUp(self): patcher = mock.patch('seqr.utils.file_utils.os.path.isfile') self.mock_does_file_exist = patcher.start() @@ -1522,13 +1586,67 @@ def _add_file_iter(self, stdout): self.mock_does_file_exist.return_value = True self.mock_file_iter.return_value += stdout + def _assert_expected_get_projects_requests(self): + self.assertEqual(len(responses.calls), 0) + + def _assert_expected_load_data_requests(self, dataset_type='MITO', sample_type='WGS', trigger_error=False, skip_project=False): + self.assertEqual(len(responses.calls), 1) + projects = [PROJECT_GUID, NON_ANALYST_PROJECT_GUID] + if skip_project: + projects = projects[1:] + self.assertDictEqual(json.loads(responses.calls[0].request.body), { + 'projects_to_run': projects, + 'callset_path': '/local_datasets/sv_callset.vcf' if trigger_error else '/local_datasets/mito_callset.mt', + 'sample_type': sample_type, + 'dataset_type': dataset_type, + 'reference_genome': 'GRCh38', + }) + + @staticmethod + def _local_pedigree_path(dataset_type, sample_type): + return f'/local_datasets/GRCh38/{dataset_type}/pedigrees/{sample_type}' + + def _has_expected_ped_files(self, mock_open, mock_mkdir, dataset_type, *args, sample_type='WGS', **kwargs): + super()._has_expected_ped_files(mock_open, mock_mkdir, dataset_type, *args, sample_type, **kwargs) + mock_mkdir.assert_called_once_with(self._local_pedigree_path(dataset_type, sample_type), exist_ok=True) + + def _assert_success_notification(self, dag_json): + self.maxDiff = None + self.assert_json_logs(self.pm_user, [('Triggered loading pipeline', {'detail': dag_json})]) + + def _set_loading_trigger_error(self): + responses.add(responses.POST, PIPELINE_RUNNER_URL, status=400) + + def _assert_trigger_error(self, response, body, *args): + self.assertEqual(response.status_code, 400) + error = f'400 Client Error: Bad Request for url: {PIPELINE_RUNNER_URL}' + self.assertDictEqual(response.json(), {'error': error}) + self.maxDiff = None + self.assert_json_logs(self.pm_user, [ + (error, {'severity': 'WARNING', 'requestBody': body, 'httpRequest': mock.ANY, 'traceback': mock.ANY}), + ]) + + def _test_load_sample_subset(self, mock_open, mock_mkdir, response, *args): + # Loading with sample subset does not change behavior when airtable is disabled + self.assertEqual(response.status_code, 200) + self._assert_expected_load_data_requests(dataset_type='SNV_INDEL', skip_project=True, trigger_error=True) + self._has_expected_ped_files(mock_open, mock_mkdir, 'SNV_INDEL', single_project=True) + + def _assert_write_pedigree_error(self, response): + self.assertEqual(response.status_code, 500) + self.assertDictEqual(response.json(), {'error': 'Restricted filesystem'}) + self.assertEqual(len(responses.calls), 0) + @mock.patch('seqr.views.utils.permissions_utils.PM_USER_GROUP', 'project-managers') class AnvilDataManagerAPITest(AirflowTestCase, DataManagerAPITest): fixtures = ['users', 'social_auth', '1kg_project', 'reference_data'] - LOADING_PROJECT_GUID = 'R0004_non_analyst_project' - PROJECTS = [PROJECT_GUID, LOADING_PROJECT_GUID] + LOADING_PROJECT_GUID = NON_ANALYST_PROJECT_GUID + CALLSET_DIR = 'gs://test_bucket' + LOCAL_WRITE_DIR = '/mock/tmp' + WGS_PROJECT_OPTIONS = [EMPTY_PROJECT_SAMPLES_OPTION, PROJECT_SAMPLES_OPTION] + WES_PROJECT_OPTIONS = [EMPTY_PROJECT_SAMPLES_OPTION] def setUp(self): patcher = mock.patch('seqr.utils.file_utils.subprocess.Popen') @@ -1575,9 +1693,13 @@ def _assert_expected_delete_index_response(self, response): self.assertEqual(response.status_code, 400) self.assertEqual(response.json()['error'], 'Deleting indices is disabled for the hail backend') - def test_get_loaded_projects(self, *args, **kwargs): - # Test relies on the local-only project data, and has no real difference for local/ non-local behavior - pass + def _assert_expected_get_projects_requests(self): + self.assertEqual(len(responses.calls), 1) + self.assert_expected_airtable_call( + call_index=0, + filter_formula="OR(PDOStatus='Methods (Loading)',PDOStatus='On hold for phenotips, but ready to load')", + fields=['PassingCollaboratorSampleIDs', 'SeqrIDs', 'SeqrProjectURL'], + ) @staticmethod def _get_dag_variable_overrides(*args, **kwargs): @@ -1588,80 +1710,37 @@ def _get_dag_variable_overrides(*args, **kwargs): 'dataset_type': 'MITO', } - @responses.activate - @mock.patch('seqr.views.apis.data_manager_api.BASE_URL', 'https://seqr.broadinstitute.org/') - @mock.patch('seqr.views.utils.export_utils.open') - @mock.patch('seqr.views.utils.export_utils.TemporaryDirectory') - def test_load_data(self, mock_temp_dir, mock_open): - url = reverse(load_data) - self.check_pm_login(url) + def _assert_expected_load_data_requests(self, **kwargs): + self.assert_airflow_calls(**kwargs) - mock_temp_dir.return_value.__enter__.return_value = '/mock/tmp' - mock_subprocess = mock.MagicMock() - self.mock_subprocess.side_effect = None - self.mock_subprocess.return_value = mock_subprocess - mock_subprocess.wait.return_value = 0 - mock_subprocess.communicate.return_value = b'', b'File not found' - body = {'filePath': 'gs://test_bucket/mito_callset.mt', 'datasetType': 'MITO', 'sampleType': 'WGS', 'projects': [ - json.dumps({'projectGuid': 'R0001_1kg'}), json.dumps(PROJECT_OPTION), json.dumps({'projectGuid': 'R0005_not_project'}), - ]} - response = self.client.post(url, content_type='application/json', data=json.dumps(body)) - self.assertEqual(response.status_code, 400) - self.assertDictEqual(response.json(), {'error': 'The following projects are invalid: R0005_not_project'}) - - body['projects'] = body['projects'][:-1] - response = self.client.post(url, content_type='application/json', data=json.dumps(body)) - self.assertEqual(response.status_code, 200) - self.assertDictEqual(response.json(), {'success': True}) + def _set_loading_trigger_error(self): + self.set_dag_trigger_error_response(status=400) + self.mock_authorized_session.reset_mock() - self.assert_airflow_calls() - self._has_expected_gs_calls(mock_open, 'MITO') + def _assert_success_notification(self, dag_json): + dag_json['sample_source'] = 'Broad_Internal' - dag_json = """{ - "projects_to_run": [ - "R0001_1kg", - "R0004_non_analyst_project" - ], - "callset_path": "gs://test_bucket/mito_callset.mt", - "sample_source": "Broad_Internal", - "sample_type": "WGS", - "dataset_type": "MITO", - "reference_genome": "GRCh38" -}""" message = f"""*test_pm_user@test.com* triggered loading internal WGS MITO data for 2 projects - Pedigree file has been uploaded to gs://seqr-datasets/v02/GRCh38/RDG_WGS_Broad_Internal/base/projects/R0001_1kg/ - - Pedigree file has been uploaded to gs://seqr-datasets/v02/GRCh38/RDG_WGS_Broad_Internal/base/projects/R0004_non_analyst_project/ + Pedigree files have been uploaded to gs://seqr-loading-temp/v3.1/GRCh38/MITO/pedigrees/WGS DAG LOADING_PIPELINE is triggered with following: - ```{dag_json}``` + ```{json.dumps(dag_json, indent=4)}``` """ self.mock_slack.assert_called_once_with(SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, message) - - # Test loading trigger error - self.set_dag_trigger_error_response(status=400) - self.mock_authorized_session.reset_mock() self.mock_slack.reset_mock() - mock_open.reset_mock() - responses.calls.reset() - mock_subprocess.reset_mock() - mock_subprocess.communicate.return_value = b'gs://seqr-datasets/v02/GRCh38/RDG_WES_Broad_Internal_SV/\ngs://seqr-datasets/v02/GRCh38/RDG_WGS_Broad_Internal_SV/v01/\ngs://seqr-datasets/v02/GRCh38/RDG_WES_Broad_Internal_GCNV/v02/', b'' - body.update({'datasetType': 'SV', 'filePath': 'gs://test_bucket/sv_callset.vcf', 'sampleType': 'WES'}) - response = self.client.post(url, content_type='application/json', data=json.dumps(body)) + def _assert_trigger_error(self, response, body, dag_json): self.assertEqual(response.status_code, 200) self.assertDictEqual(response.json(), {'success': True}) - self.assert_airflow_calls(trigger_error=True, dataset_type='GCNV') - self._has_expected_gs_calls(mock_open, 'SV', is_second_dag=True, sample_type='WES') self.mock_airflow_logger.warning.assert_not_called() self.mock_airflow_logger.error.assert_called_with(mock.ANY, self.pm_user) errors = [call.args[0] for call in self.mock_airflow_logger.error.call_args_list] for error in errors: self.assertRegex(error, '400 Client Error: Bad Request') - dag_json = dag_json.replace('mito_callset.mt', 'sv_callset.vcf').replace( + dag_json = json.dumps(dag_json, indent=4).replace('mito_callset.mt', 'sv_callset.vcf').replace( 'WGS', 'WES').replace('MITO', 'GCNV').replace('v01', 'v3.1') error_message = f"""ERROR triggering internal WES SV loading: {errors[0]} @@ -1670,11 +1749,7 @@ def test_load_data(self, mock_temp_dir, mock_open): """ self.mock_slack.assert_called_once_with(SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, error_message) - # Test loading with sample subset - mock_open.reset_mock() - mock_subprocess.reset_mock() - body.update({'datasetType': 'SNV_INDEL', 'sampleType': 'WGS', 'projects': [json.dumps(PROJECT_SAMPLES_OPTION)]}) - response = self.client.post(url, content_type='application/json', data=json.dumps(body)) + def _test_load_sample_subset(self, mock_open, mock_mkdir, response, url, body): self.assertEqual(response.status_code, 400) self.assertDictEqual(response.json(), { 'warnings': None, @@ -1710,7 +1785,6 @@ def test_load_data(self, mock_temp_dir, mock_open): fields=['SeqrCollaboratorSampleID', 'PDOStatus', 'SeqrProject'], ) - mock_subprocess.reset_mock() responses.calls.reset() responses.add(responses.GET, airtable_samples_url, json=AIRTABLE_SAMPLE_RECORDS, status=200) body['projects'] = [ @@ -1721,47 +1795,28 @@ def test_load_data(self, mock_temp_dir, mock_open): response = self.client.post(url, content_type='application/json', data=json.dumps(body)) self.assertEqual(response.status_code, 200) self.assertDictEqual(response.json(), {'success': True}) - self._has_expected_gs_calls(mock_open, 'SNV_INDEL', sample_type='WES', has_project_subset=True) + self._has_expected_ped_files(mock_open, mock_mkdir, 'SNV_INDEL', sample_type='WES', has_project_subset=True) self.assert_expected_airtable_call( call_index=0, filter_formula="OR({CollaboratorSampleID}='NA19678')", fields=['CollaboratorSampleID', 'PDOStatus', 'SeqrProject'], ) + body['projects'] = body['projects'][1:] - def _has_expected_gs_calls(self, mock_open, dataset_type, sample_type='WGS', has_project_subset=False, **kwargs): - mock_open.assert_has_calls([ - mock.call(f'/mock/tmp/{project}_pedigree.tsv', 'w') for project in self.PROJECTS - ], any_order=True) - files = [ - [row.split('\t') for row in write_call.args[0].split('\n')] - for write_call in mock_open.return_value.__enter__.return_value.write.call_args_list - ] - self.assertEqual(len(files), 4 if has_project_subset else 2) - if has_project_subset: - self.assertEqual(len(files[1]), 4) - self.assertListEqual(files[1], [['s'], ['NA19675_1'], ['NA19679'], ['NA19678']]) - self.assertEqual(len(files[3]), 3) - self.assertListEqual(files[3], [['s'], ['NA21234'], ['NA21987']]) + @staticmethod + def _local_pedigree_path(*args): + return '/mock/tmp' - num_rows = 4 if has_project_subset else 15 - self.assertEqual(len(files[0]), num_rows) - self.assertListEqual(files[0][:5], [PEDIGREE_HEADER] + EXPECTED_PEDIGREE_ROWS[:num_rows-1]) - ped_file = files[2 if has_project_subset else 1] - self.assertEqual(len(ped_file), 3) - self.assertListEqual(ped_file, [ - PEDIGREE_HEADER, - ['R0004_non_analyst_project', 'F000014_14', '14', 'NA21234', '', '', 'F'], - ['R0004_non_analyst_project', 'F000014_14', '14', 'NA21987', '', '', 'M'], - ]) + def _has_expected_ped_files(self, mock_open, mock_mkdir, dataset_type, *args, sample_type='WGS', **kwargs): + super()._has_expected_ped_files(mock_open, mock_mkdir, dataset_type, sample_type, **kwargs) - self.mock_subprocess.assert_has_calls([ - mock.call( - f'gsutil mv /mock/tmp/* gs://seqr-datasets/v02/GRCh38/RDG_{sample_type}_Broad_Internal/base/projects/{project}/', - stdout=-1, stderr=-2, shell=True, # nosec - ) for project in self.PROJECTS - ] + [ - mock.call( - f'gsutil rsync -r gs://seqr-datasets/v02/GRCh38/RDG_{sample_type}_Broad_Internal/base/projects/{project}/ gs://seqr-loading-temp/v3.1/GRCh38/{dataset_type}/pedigrees/{sample_type}/', - stdout=-1, stderr=-2, shell=True, # nosec - ) for project in self.PROJECTS - ], any_order=True) + mock_mkdir.assert_not_called() + self.mock_subprocess.assert_called_once_with( + f'gsutil mv /mock/tmp/* gs://seqr-loading-temp/v3.1/GRCh38/{dataset_type}/pedigrees/{sample_type}/', + stdout=-1, stderr=-2, shell=True, # nosec + ) + self.mock_subprocess.reset_mock() + + def _assert_write_pedigree_error(self, response): + self.assertEqual(response.status_code, 200) + self.assertEqual(len(responses.calls), 1) diff --git a/seqr/views/apis/dataset_api_tests.py b/seqr/views/apis/dataset_api_tests.py index 9c3d029212..721018a6f6 100644 --- a/seqr/views/apis/dataset_api_tests.py +++ b/seqr/views/apis/dataset_api_tests.py @@ -4,7 +4,6 @@ from datetime import datetime from django.urls.base import reverse from io import StringIO -import responses from seqr.models import Sample, Family from seqr.views.apis.dataset_api import add_variants_dataset_handler @@ -41,25 +40,17 @@ MOCK_OPEN = mock.MagicMock() MOCK_FILE_ITER = MOCK_OPEN.return_value.__enter__.return_value.__iter__ -MOCK_AIRTABLE_URL = 'http://testairtable' -MOCK_RECORD_ID = 'recH4SEO1CeoIlOiE' -MOCK_RECORDS = {'records': [{'id': MOCK_RECORD_ID, 'fields': {'Status': 'Loading'}}]} - @mock.patch('seqr.utils.redis_utils.redis.StrictRedis', lambda **kwargs: MOCK_REDIS) @mock.patch('seqr.utils.file_utils.open', MOCK_OPEN) class DatasetAPITest(object): @mock.patch('seqr.models.random.randint') - @mock.patch('seqr.utils.search.add_data_utils.safe_post_to_slack') + @mock.patch('seqr.utils.communication_utils.logger') @mock.patch('seqr.utils.communication_utils.send_html_email') - @mock.patch('seqr.views.utils.airtable_utils.AIRTABLE_URL', MOCK_AIRTABLE_URL) @mock.patch('seqr.utils.search.add_data_utils.BASE_URL', 'https://seqr.broadinstitute.org/') - @mock.patch('seqr.utils.search.add_data_utils.SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL', 'anvil-data-loading') - @mock.patch('seqr.utils.search.add_data_utils.SEQR_SLACK_DATA_ALERTS_NOTIFICATION_CHANNEL', 'seqr-data-loading') @urllib3_responses.activate - @responses.activate - def test_add_variants_dataset(self, mock_send_email, mock_send_slack, mock_random): + def test_add_variants_dataset(self, mock_send_email, mock_logger, mock_random): url = reverse(add_variants_dataset_handler, args=[PROJECT_GUID]) self.check_data_manager_login(url) @@ -81,12 +72,6 @@ def test_add_variants_dataset(self, mock_send_email, mock_send_slack, mock_rando mock_random.return_value = 98765432101234567890 - airtable_tracking_url = f'{MOCK_AIRTABLE_URL}/appUelDNM3BnWaR7M/AnVIL%20Seqr%20Loading%20Requests%20Tracking' - responses.add( - responses.GET, - airtable_tracking_url + "?fields[]=Status&pageSize=2&filterByFormula=AND({AnVIL Project URL}='https://seqr.broadinstitute.org/project/R0004_non_analyst_project/project_page',OR(Status='Loading',Status='Loading Requested'))", - json=MOCK_RECORDS) - urllib3_responses.add_json('/{}/_mapping'.format(INDEX_NAME), MAPPING_JSON) urllib3_responses.add_json('/{}/_search?size=0'.format(INDEX_NAME), {'aggregations': { 'sample_ids': {'buckets': [{'key': 'NA19675'}, {'key': 'NA19679'}, {'key': 'NA19678_1'}, {'key': 'NA20878'}]} @@ -159,12 +144,11 @@ def test_add_variants_dataset(self, mock_send_email, mock_send_slack, mock_rando self.assertTrue(existing_index_sample_model.is_active) self.assertTrue(str(existing_index_sample_model.loaded_date).startswith('2017-02-05')) - self._assert_expected_notification(mock_send_email, mock_send_slack, sample_type='WES', count=2, samples='NA19679, NA20878') + self._assert_expected_notification(mock_send_email, sample_type='WES', count=2) # Adding an SV index works additively with the regular variants index mock_random.return_value = 1234567 mock_send_email.reset_mock() - mock_send_slack.reset_mock() urllib3_responses.add_json('/{}/_mapping'.format(SV_INDEX_NAME), { SV_INDEX_NAME: {'mappings': {'_meta': { 'sampleType': 'WES', @@ -206,13 +190,11 @@ def test_add_variants_dataset(self, mock_send_email, mock_send_slack, mock_rando {sample.guid for sample in sample_models}) self.assertSetEqual({True}, {sample.is_active for sample in sample_models}) - self._assert_expected_notification(mock_send_email, mock_send_slack, sample_type='WES SV', count=1, samples='NA19675_1') - self.assertEqual(len(responses.calls), 0) + self._assert_expected_notification(mock_send_email, sample_type='WES SV', count=1) # Adding an index for a different sample type works additively mock_random.return_value = 987654 mock_send_email.reset_mock() - mock_send_slack.reset_mock() urllib3_responses.add_json('/{}/_mapping'.format(NEW_SAMPLE_TYPE_INDEX_NAME), { 'sub_index_1': {'mappings': {'_meta': { 'sampleType': 'WGS', @@ -247,7 +229,7 @@ def test_add_variants_dataset(self, mock_send_email, mock_send_slack, mock_rando self.assertSetEqual(set(response_json['individualsByGuid']['I000001_na19675']['sampleGuids']), {sv_sample_guid, existing_index_sample_guid, new_sample_type_sample_guid}) - self._assert_expected_notification(mock_send_email, mock_send_slack, sample_type='WGS', count=1, samples='NA19675_1') + self._assert_expected_notification(mock_send_email, sample_type='WGS', count=1) # Previous variant samples should still be active sample_models = Sample.objects.filter(individual__guid='I000001_na19675') @@ -266,46 +248,32 @@ def test_add_variants_dataset(self, mock_send_email, mock_send_slack, mock_rando }}, method=urllib3_responses.POST) mock_send_email.reset_mock() - mock_send_slack.reset_mock() + mock_send_email.side_effect = Exception('Email server is not configured') response = self.client.post(url, content_type='application/json', data=json.dumps({ 'elasticsearchIndex': INDEX_NAME, 'datasetType': 'SNV_INDEL', })) self.assertEqual(response.status_code, 200) - additional_kwargs = {'samples': 'NA21234'} - if not self.ANVIL_DISABLED: - namespace_path = 'ext-data/anvil-non-analyst-project 1000 Genomes Demo' - additional_kwargs['email_content'] = """We are following up on the request to load data from AnVIL on March 12, 2017. -We have loaded 1 new WES samples from the AnVIL workspace {anvil_link} to the corresponding seqr project {seqr_link}. -Let us know if you have any questions.""".format( - anvil_link=f'{namespace_path}', - seqr_link=f'Non-Analyst Project', - ) - additional_kwargs.update({'slack_channel': 'anvil-data-loading','samples': None}) - - self.assertEqual(responses.calls[1].request.url, f'{airtable_tracking_url}/{MOCK_RECORD_ID}') - self.assertEqual(responses.calls[1].request.method, 'PATCH') - self.assertDictEqual(json.loads(responses.calls[1].request.body), {'fields': {'Status': 'Available in Seqr'}}) - self._assert_expected_notification( - mock_send_email, mock_send_slack, sample_type='WES', count=1, project_guid=NON_ANALYST_PROJECT_GUID, - project_name='Non-Analyst Project', recipient='test_user_collaborator@test.com', **additional_kwargs, + mock_send_email, sample_type='WES', count=1, project_guid=NON_ANALYST_PROJECT_GUID, + project_name='Non-Analyst Project', recipient='test_user_collaborator@test.com', ) + mock_logger.error.assert_called_with( + 'Error sending project email for R0004_non_analyst_project: Email server is not configured', extra={'detail': { + 'email_body': mock.ANY, 'process_message': mock.ANY, + 'subject': 'New data available in seqr', 'to': ['test_user_collaborator@test.com'], + }}) - def _assert_expected_notification(self, mock_send_email, mock_send_slack, sample_type, count, samples, email_content=None, + def _assert_expected_notification(self, mock_send_email, sample_type, count, email_content=None, project_guid=PROJECT_GUID, project_name='1kg project nåme with uniçøde', - recipient='test_user_manager@test.com', slack_channel='seqr-data-loading'): + recipient='test_user_manager@test.com'): if not email_content: email_content = f'This is to notify you that {count} new {sample_type} samples have been loaded in seqr project {project_name}' mock_send_email.assert_called_once_with( email_body=f'Dear seqr user,\n\n{email_content}\n\nAll the best,\nThe seqr team', subject='New data available in seqr', to=[recipient], process_message=mock.ANY, ) - slack_message = f'{count} new {sample_type} samples are loaded in {SEQR_URL}/project/{project_guid}/project_page' - if samples: - slack_message = f'{slack_message}\n```{samples}```' - mock_send_slack.assert_called_with(slack_channel, slack_message) @urllib3_responses.activate def test_add_variants_dataset_errors(self): @@ -477,7 +445,6 @@ def _assert_expected_add_dataset_errors(self, url): # Tests for AnVIL access disabled class LocalDatasetAPITest(AuthenticationTestCase, DatasetAPITest): fixtures = ['users', '1kg_project'] - ANVIL_DISABLED = True def assert_no_anvil_calls(self): @@ -489,7 +456,6 @@ def assert_no_anvil_calls(self): # Test for permissions from AnVIL only class AnvilDatasetAPITest(AnvilAuthenticationTestCase, DatasetAPITest): fixtures = ['users', 'social_auth', '1kg_project'] - ANVIL_DISABLED = False def _assert_expected_add_dataset_errors(self, url): response = self.client.post(url, content_type='application/json', data=ADD_DATASET_PAYLOAD) diff --git a/seqr/views/apis/igv_api.py b/seqr/views/apis/igv_api.py index 0ef01440e4..72751d0cff 100644 --- a/seqr/views/apis/igv_api.py +++ b/seqr/views/apis/igv_api.py @@ -9,6 +9,7 @@ from seqr.models import Individual, IgvSample from seqr.utils.file_utils import file_iter, does_file_exist, is_google_bucket_file_path, run_command, get_google_project from seqr.utils.redis_utils import safe_redis_get_json, safe_redis_set_json +from seqr.views.utils.dataset_utils import convert_django_meta_to_http_headers from seqr.views.utils.file_utils import save_uploaded_file, load_uploaded_file from seqr.views.utils.json_to_orm_utils import get_or_create_model_from_json from seqr.views.utils.json_utils import create_json_response @@ -19,8 +20,9 @@ GS_STORAGE_ACCESS_CACHE_KEY = 'gs_storage_access_cache_entry' GS_STORAGE_URL = 'https://storage.googleapis.com' +S3_KEY = 's3' CLOUD_STORAGE_URLS = { - 's3': 'https://s3.amazonaws.com', + S3_KEY: 'https://s3.amazonaws.com', 'gs': GS_STORAGE_URL, } TIMEOUT = 300 @@ -272,6 +274,8 @@ def igv_genomes_proxy(request, cloud_host, file_path): range_header = request.META.get('HTTP_RANGE') if range_header: headers['Range'] = range_header + if cloud_host == S3_KEY: + headers.update(convert_django_meta_to_http_headers(request)) genome_response = requests.get(f'{CLOUD_STORAGE_URLS[cloud_host]}/{file_path}', headers=headers, timeout=TIMEOUT) proxy_response = HttpResponse( @@ -279,3 +283,4 @@ def igv_genomes_proxy(request, cloud_host, file_path): status=genome_response.status_code, ) return proxy_response + diff --git a/seqr/views/apis/igv_api_tests.py b/seqr/views/apis/igv_api_tests.py index ae4d3f90de..1688b0f199 100644 --- a/seqr/views/apis/igv_api_tests.py +++ b/seqr/views/apis/igv_api_tests.py @@ -317,10 +317,11 @@ def test_igv_genomes_proxy(self, mock_subprocess): responses.GET, 'https://s3.amazonaws.com/igv.org.genomes/foo?query=true', match_querystring=True, content_type='application/json', body=json.dumps(expected_body)) - response = self.client.get(s3_url) + response = self.client.get(s3_url, HTTP_TEST_HEADER='test/value') self.assertEqual(response.status_code, 200) self.assertDictEqual(json.loads(response.content), expected_body) self.assertIsNone(responses.calls[0].request.headers.get('Range')) + self.assertEqual(responses.calls[0].request.headers.get('Test-Header'), 'test/value') # test with range header proxy gs_url = reverse(igv_genomes_proxy, args=['gs', 'test-bucket/foo.fasta']) @@ -329,7 +330,8 @@ def test_igv_genomes_proxy(self, mock_subprocess): responses.GET, 'https://storage.googleapis.com/test-bucket/foo.fasta', match_querystring=True, body=expected_content) - response = self.client.get(gs_url, HTTP_RANGE='bytes=100-200') + response = self.client.get(gs_url, HTTP_RANGE='bytes=100-200', HTTP_TEST_HEADER='test/value') self.assertEqual(response.status_code, 200) self.assertEqual(response.content.decode(), expected_content) self.assertEqual(responses.calls[1].request.headers.get('Range'), 'bytes=100-200') + self.assertIsNone(responses.calls[1].request.headers.get('Test-Header')) diff --git a/seqr/views/apis/project_api_tests.py b/seqr/views/apis/project_api_tests.py index 9dc4a4053f..48e2251e7b 100644 --- a/seqr/views/apis/project_api_tests.py +++ b/seqr/views/apis/project_api_tests.py @@ -108,8 +108,7 @@ def test_create_and_delete_project(self, mock_airtable_logger): responses.GET, f"{self.AIRTABLE_TRACKING_URL}?fields[]=Status&pageSize=100&filterByFormula=AND({{AnVIL Project URL}}='/project/{project_guid}/project_page',OR(Status='Available in Seqr',Status='Loading',Status='Loading Requested'))", json=MOCK_RECORDS) - responses.add(responses.PATCH, f'{self.AIRTABLE_TRACKING_URL}/recH4SEO1CeoIlOiE', status=400) - responses.add(responses.PATCH, f'{self.AIRTABLE_TRACKING_URL}/recSgwrXNkmlIB5eM') + responses.add(responses.PATCH, self.AIRTABLE_TRACKING_URL, status=400) delete_project_url = reverse(delete_project_handler, args=[project_guid]) response = self.client.post(delete_project_url, content_type='application/json') self.assertEqual(response.status_code, 200) @@ -720,16 +719,15 @@ def test_create_and_delete_project(self, *args, **kwargs): ]) def _assert_expected_airtable_requests(self, mock_airtable_logger): - self.assertEqual(responses.calls[1].request.url, f'{self.AIRTABLE_TRACKING_URL}/recH4SEO1CeoIlOiE') + self.assertEqual(responses.calls[1].request.url, self.AIRTABLE_TRACKING_URL) self.assertEqual(responses.calls[1].request.method, 'PATCH') - self.assertDictEqual(json.loads(responses.calls[1].request.body), {'fields': {'Status': 'Project Deleted'}}) - - self.assertEqual(responses.calls[2].request.url, f'{self.AIRTABLE_TRACKING_URL}/recSgwrXNkmlIB5eM') - self.assertEqual(responses.calls[2].request.method, 'PATCH') - self.assertDictEqual(json.loads(responses.calls[2].request.body), {'fields': {'Status': 'Project Deleted'}}) + self.assertDictEqual(json.loads(responses.calls[1].request.body), {'records': [ + {'id': 'recH4SEO1CeoIlOiE', 'fields': {'Status': 'Project Deleted'}}, + {'id': 'recSgwrXNkmlIB5eM', 'fields': {'Status': 'Project Deleted'}}, + ]}) mock_airtable_logger.error.assert_called_with( - 'Airtable patch "AnVIL Seqr Loading Requests Tracking" error: 400 Client Error: Bad Request for url: http://testairtable/appUelDNM3BnWaR7M/AnVIL%20Seqr%20Loading%20Requests%20Tracking/recH4SEO1CeoIlOiE', + 'Airtable patch "AnVIL Seqr Loading Requests Tracking" error: 400 Client Error: Bad Request for url: http://testairtable/appUelDNM3BnWaR7M/AnVIL%20Seqr%20Loading%20Requests%20Tracking', self.pm_user, detail={ 'or_filters': {'Status': ['Loading', 'Loading Requested', 'Available in Seqr']}, 'and_filters': {'AnVIL Project URL': '/project/R0005_new_project/project_page'}, diff --git a/seqr/views/apis/report_api.py b/seqr/views/apis/report_api.py index 8839c48f59..ed42ad2909 100644 --- a/seqr/views/apis/report_api.py +++ b/seqr/views/apis/report_api.py @@ -15,10 +15,10 @@ from seqr.views.utils.anvil_metadata_utils import parse_anvil_metadata, anvil_export_airtable_fields, \ FAMILY_ROW_TYPE, SUBJECT_ROW_TYPE, SAMPLE_ROW_TYPE, DISCOVERY_ROW_TYPE, PARTICIPANT_TABLE, PHENOTYPE_TABLE, \ EXPERIMENT_TABLE, EXPERIMENT_LOOKUP_TABLE, FINDINGS_TABLE, GENE_COLUMN, FAMILY_INDIVIDUAL_FIELDS -from seqr.views.utils.export_utils import export_multiple_files, write_multiple_files_to_gs +from seqr.views.utils.export_utils import export_multiple_files, write_multiple_files from seqr.views.utils.json_utils import create_json_response -from seqr.views.utils.permissions_utils import analyst_required, get_project_and_check_permissions, \ - get_project_guids_user_can_view, get_internal_projects, pm_or_analyst_required +from seqr.views.utils.permissions_utils import user_is_analyst, get_project_and_check_permissions, \ + get_project_guids_user_can_view, get_internal_projects, pm_or_analyst_required, active_user_has_policies_and_passes_test from seqr.views.utils.terra_api_utils import anvil_enabled from seqr.views.utils.variant_utils import DISCOVERY_CATEGORY @@ -31,6 +31,10 @@ MONDO_BASE_URL = 'https://monarchinitiative.org/v3/api/entity' +airtable_enabled_analyst_required = active_user_has_policies_and_passes_test( + lambda user: user_is_analyst(user) and AirtableSession.is_airtable_enabled()) + + @pm_or_analyst_required def seqr_stats(request): non_demo_projects = Project.objects.filter(is_demo=False) @@ -111,7 +115,7 @@ def _get_sample_counts(sample_q, data_type_key='dataset_type'): ] -@analyst_required +@airtable_enabled_analyst_required def anvil_export(request, project_guid): project = get_project_and_check_permissions(project_guid, request.user) @@ -349,7 +353,7 @@ def _add_row(row, family_id, row_type): } -@analyst_required +@airtable_enabled_analyst_required def gregor_export(request): request_json = json.loads(request.body) missing_required_fields = [field for field in ['consentCode', 'deliveryPath'] if not request_json.get(field)] @@ -448,7 +452,7 @@ def _add_row(row, family_id, row_type): else: warnings = errors + warnings - write_multiple_files_to_gs(files, file_path, request.user, file_format='tsv') + write_multiple_files(files, file_path, request.user, file_format='tsv') return create_json_response({ 'info': [f'Successfully validated and uploaded Gregor Report for {len(family_map)} families'], diff --git a/seqr/views/apis/report_api_tests.py b/seqr/views/apis/report_api_tests.py index 9fa87646cd..e469f3cd54 100644 --- a/seqr/views/apis/report_api_tests.py +++ b/seqr/views/apis/report_api_tests.py @@ -677,10 +677,8 @@ def test_seqr_stats(self): self.check_no_analyst_no_access(url, has_override=self.HAS_PM_OVERRIDE) @mock.patch('seqr.views.utils.export_utils.zipfile.ZipFile') - @mock.patch('seqr.views.utils.airtable_utils.is_google_authenticated') @responses.activate - def test_anvil_export(self, mock_google_authenticated, mock_zip): - mock_google_authenticated.return_value = False + def test_anvil_export(self, mock_zip): url = reverse(anvil_export, args=[PROJECT_GUID]) self.check_analyst_login(url) @@ -689,13 +687,19 @@ def test_anvil_export(self, mock_google_authenticated, mock_zip): self.assertEqual(response.status_code, 403) self.assertEqual(response.json()['error'], 'Permission Denied') + responses.add(responses.GET, '{}/app3Y97xtbbaOopVR/Samples'.format(AIRTABLE_URL), json=AIRTABLE_SAMPLE_RECORDS, status=200) + response = self.client.get(url) + self._check_anvil_export_response(response, mock_zip, no_analyst_project_url) + + # Test non-broad analysts do not have access + self.login_pm_user() response = self.client.get(url) self.assertEqual(response.status_code, 403) self.assertEqual(response.json()['error'], 'Permission Denied') - mock_google_authenticated.return_value = True - responses.add(responses.GET, '{}/app3Y97xtbbaOopVR/Samples'.format(AIRTABLE_URL), json=AIRTABLE_SAMPLE_RECORDS, status=200) - response = self.client.get(url) + self.check_no_analyst_no_access(url) + + def _check_anvil_export_response(self, response, mock_zip, no_analyst_project_url): self.assertEqual(response.status_code, 200) self.assertEqual( response.get('content-disposition'), @@ -764,30 +768,27 @@ def test_anvil_export(self, mock_google_authenticated, mock_zip): 'p.Ala196Leu): 19-1912633-G-T, 19-1912634-C-T'], discovery_file) - added_perm = self.add_analyst_project(4) - if added_perm: - response = self.client.get(no_analyst_project_url) - self.assertEqual(response.status_code, 400) - self.assertEqual(response.json()['errors'], ['Discovery variant(s) 1-248367227-TC-T in family 14 have no associated gene']) - - self.check_no_analyst_no_access(url) - - # Test non-broad analysts do not have access - self.login_pm_user() - response = self.client.get(url) - self.assertEqual(response.status_code, 403) - self.assertEqual(response.json()['error'], 'Permission Denied') + self.login_data_manager_user() + self.mock_get_groups.side_effect = lambda user: ['Analysts'] + response = self.client.get(no_analyst_project_url) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()['errors'], + ['Discovery variant(s) 1-248367227-TC-T in family 14 have no associated gene']) @mock.patch('seqr.views.apis.report_api.GREGOR_DATA_MODEL_URL', MOCK_DATA_MODEL_URL) - @mock.patch('seqr.views.utils.airtable_utils.is_google_authenticated') @mock.patch('seqr.views.apis.report_api.datetime') @mock.patch('seqr.views.utils.export_utils.open') @mock.patch('seqr.views.utils.export_utils.TemporaryDirectory') @mock.patch('seqr.utils.file_utils.subprocess.Popen') @responses.activate - def test_gregor_export(self, mock_subprocess, mock_temp_dir, mock_open, mock_datetime, mock_google_authenticated): + def test_gregor_export(self, *args): + url = reverse(gregor_export) + self.check_analyst_login(url) + + self._test_gregor_export(url, *args) + + def _test_gregor_export(self, url, mock_subprocess, mock_temp_dir, mock_open, mock_datetime): mock_datetime.now.return_value.year = 2020 - mock_google_authenticated.return_value = False mock_temp_dir.return_value.__enter__.return_value = '/mock/tmp' mock_subprocess.return_value.wait.return_value = 1 @@ -799,9 +800,6 @@ def test_gregor_export(self, mock_subprocess, mock_temp_dir, mock_open, mock_dat status=200) responses.add(responses.GET, MOCK_DATA_MODEL_URL, status=404) - url = reverse(gregor_export) - self.check_analyst_login(url) - response = self.client.post(url, content_type='application/json', data=json.dumps({})) self.assertEqual(response.status_code, 400) self.assertListEqual(response.json()['errors'], ['Missing required field(s): consentCode, deliveryPath']) @@ -818,11 +816,6 @@ def test_gregor_export(self, mock_subprocess, mock_temp_dir, mock_open, mock_dat mock_subprocess.return_value.wait.return_value = 0 response = self.client.post(url, content_type='application/json', data=json.dumps(body)) - self.assertEqual(response.status_code, 403) - self.assertEqual(response.json()['error'], 'Permission Denied') - - mock_google_authenticated.return_value = True - response = self.client.post(url, content_type='application/json', data=json.dumps(body)) self.assertEqual(response.status_code, 400) self.assertListEqual(response.json()['errors'], [ 'Unable to load data model: 404 Client Error: Not Found for url: http://raw.githubusercontent.com/gregor_data_model.json', @@ -976,7 +969,7 @@ def _get_expected_gregor_files(self, mock_open, mock_subprocess, expected_files) mock_subprocess.assert_has_calls([ mock.call('gsutil ls gs://anvil-upload', stdout=-1, stderr=-2, shell=True), # nosec mock.call().wait(), - mock.call('gsutil mv /mock/tmp/* gs://anvil-upload', stdout=-1, stderr=-2, shell=True), # nosec + mock.call('gsutil mv /mock/tmp/* gs://anvil-upload/', stdout=-1, stderr=-2, shell=True), # nosec mock.call().wait(), ]) @@ -1443,6 +1436,7 @@ def test_variant_metadata(self): class LocalReportAPITest(AuthenticationTestCase, ReportAPITest): + fixtures = ['users', '1kg_project', 'reference_data', 'report_variants'] ADDITIONAL_FAMILIES = ['F000014_14'] ADDITIONAL_FINDINGS = ['NA21234_1_248367227'] @@ -1462,6 +1456,13 @@ class LocalReportAPITest(AuthenticationTestCase, ReportAPITest): }, } + def _check_anvil_export_response(self, response, *args): + self.assertEqual(response.status_code, 403) + + def _test_gregor_export(self, url, *args): + response = self.client.post(url, content_type='application/json', data=json.dumps({})) + self.assertEqual(response.status_code, 403) + class AnvilReportAPITest(AnvilAuthenticationTestCase, ReportAPITest): fixtures = ['users', 'social_auth', '1kg_project', 'reference_data', 'report_variants'] diff --git a/seqr/views/apis/summary_data_api.py b/seqr/views/apis/summary_data_api.py index 775562313d..96cb2b6547 100644 --- a/seqr/views/apis/summary_data_api.py +++ b/seqr/views/apis/summary_data_api.py @@ -274,7 +274,7 @@ def _search_new_saved_variants(family_variant_ids: list[FamilyVariantKey], user: def _get_metadata_projects(request, project_guid): is_analyst = user_is_analyst(request.user) is_all_projects = project_guid == ALL_PROJECTS - include_airtable = 'true' in request.GET.get('includeAirtable', '') and is_analyst and not is_all_projects + include_airtable = 'true' in request.GET.get('includeAirtable', '') and AirtableSession.is_airtable_enabled() and is_analyst and not is_all_projects if is_all_projects: projects = get_internal_projects() if is_analyst else Project.objects.filter( guid__in=get_project_guids_user_can_view(request.user)) diff --git a/seqr/views/apis/summary_data_api_tests.py b/seqr/views/apis/summary_data_api_tests.py index 200c25eab6..dde9e17a00 100644 --- a/seqr/views/apis/summary_data_api_tests.py +++ b/seqr/views/apis/summary_data_api_tests.py @@ -581,11 +581,8 @@ def _has_expected_metadata_response(self, response, expected_individuals, has_ai self.assertEqual(len([r['participant_id'] for r in response_json['rows'] if r['participant_id'] == 'NA20888']), 2) @mock.patch('seqr.views.utils.airtable_utils.MAX_OR_FILTERS', 2) - @mock.patch('seqr.views.utils.airtable_utils.AIRTABLE_API_KEY', 'mock_key') - @mock.patch('seqr.views.utils.airtable_utils.is_google_authenticated') @responses.activate - def test_sample_metadata_export(self, mock_google_authenticated): - mock_google_authenticated.return_value = False + def test_sample_metadata_export(self): url = reverse(individual_metadata, args=['R0003_test']) self.check_require_login(url) @@ -654,11 +651,16 @@ def test_sample_metadata_export(self, mock_google_authenticated): self._has_expected_metadata_response(response, all_project_individuals, has_duplicate=True) # Test invalid airtable responses - response = self.client.get(include_airtable_url) - self.assertEqual(response.status_code, 403) - self.assertEqual(response.json()['error'], 'Permission Denied') - mock_google_authenticated.return_value = True + self._test_metadata_airtable_responses(include_airtable_url, expected_individuals) + + # Test gregor projects + response = self.client.get(gregor_projects_url) + self._has_expected_metadata_response(response, multi_project_individuals, has_duplicate=True) + response = self.client.get(f'{gregor_projects_url}?includeAirtable=true') + self._has_expected_metadata_response(response, multi_project_individuals, has_airtable=self.HAS_AIRTABLE, has_duplicate=True) + + def _test_metadata_airtable_responses(self, include_airtable_url, expected_individuals): responses.add(responses.GET, '{}/app3Y97xtbbaOopVR/Samples'.format(AIRTABLE_URL), status=402) response = self.client.get(include_airtable_url) self.assertEqual(response.status_code, 402) @@ -683,7 +685,6 @@ def test_sample_metadata_export(self, mock_google_authenticated): }) ]) - responses.reset() responses.add(responses.GET, '{}/app3Y97xtbbaOopVR/Samples'.format(AIRTABLE_URL), json=PAGINATED_AIRTABLE_SAMPLE_RECORDS, status=200) @@ -715,14 +716,6 @@ def test_sample_metadata_export(self, mock_google_authenticated): self.assertEqual(len(responses.calls), 8) self.assert_expected_airtable_call( -1, "OR(RECORD_ID()='reca4hcBnbA2cnZf9')", ['CollaboratorID']) - self.assertSetEqual({call.request.headers['Authorization'] for call in responses.calls}, {'Bearer mock_key'}) - - # Test gregor projects - response = self.client.get(gregor_projects_url) - self._has_expected_metadata_response(response, multi_project_individuals, has_duplicate=True) - - response = self.client.get(f'{gregor_projects_url}?includeAirtable=true') - self._has_expected_metadata_response(response, multi_project_individuals, has_airtable=True, has_duplicate=True) @mock.patch('seqr.views.apis.summary_data_api.EmailMessage') def test_send_vlm_email(self, mock_email): @@ -774,6 +767,13 @@ class LocalSummaryDataAPITest(AuthenticationTestCase, SummaryDataAPITest): fixtures = ['users', '1kg_project', 'reference_data', 'report_variants'] NUM_MANAGER_SUBMISSIONS = 4 ADDITIONAL_SAMPLES = ['NA21234', 'NA21987'] + HAS_AIRTABLE = False + + def _test_metadata_airtable_responses(self, include_airtable_url, expected_individuals): + # Returns successfully without airtable data when disabled + response = self.client.get(include_airtable_url) + self.assertEqual(response.status_code, 200) + self._has_expected_metadata_response(response, expected_individuals) def assert_has_expected_calls(self, users, skip_group_call_idxs=None): @@ -790,6 +790,7 @@ class AnvilSummaryDataAPITest(AnvilAuthenticationTestCase, SummaryDataAPITest): fixtures = ['users', 'social_auth', '1kg_project', 'reference_data', 'report_variants'] NUM_MANAGER_SUBMISSIONS = 4 ADDITIONAL_SAMPLES = [] + HAS_AIRTABLE = True def test_mme_details(self, *args): super(AnvilSummaryDataAPITest, self).test_mme_details(*args) diff --git a/seqr/views/utils/airflow_utils.py b/seqr/views/utils/airflow_utils.py index c8a84de65a..63f8b94ec9 100644 --- a/seqr/views/utils/airflow_utils.py +++ b/seqr/views/utils/airflow_utils.py @@ -1,24 +1,17 @@ -from collections import defaultdict, OrderedDict from django.contrib.auth.models import User -from django.db.models import F import google.auth from google.auth.transport.requests import AuthorizedSession -import itertools import json -from reference_data.models import GENOME_VERSION_GRCh38, GENOME_VERSION_LOOKUP -from seqr.models import Individual, Sample, Project from seqr.utils.communication_utils import safe_post_to_slack -from seqr.utils.file_utils import does_file_exist, run_gsutil_with_wait +from seqr.utils.search.add_data_utils import prepare_data_loading_request from seqr.utils.logging_utils import SeqrLogger -from seqr.views.utils.export_utils import write_multiple_files_to_gs from settings import AIRFLOW_WEBSERVER_URL, SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL logger = SeqrLogger(__name__) DAG_NAME = 'LOADING_PIPELINE' AIRFLOW_AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform" -SEQR_V2_DATASETS_GS_PATH = 'gs://seqr-datasets/v02' SEQR_V3_PEDIGREE_GS_PATH = 'gs://seqr-loading-temp/v3.1' @@ -26,31 +19,20 @@ class DagRunningException(Exception): pass -def trigger_data_loading(projects: list[Project], sample_type: str, dataset_type: str, data_path: str, user: User, - success_message: str, success_slack_channel: str, error_message: str, - genome_version: str = GENOME_VERSION_GRCh38, is_internal: bool = False, - individual_ids: list[str] = None, additional_project_files: dict = None): +def trigger_airflow_data_loading(*args, user: User, success_message: str, success_slack_channel: str, + error_message: str, is_internal: bool = False, **kwargs): success = True - project_guids = sorted([p.guid for p in projects]) - updated_variables = { - 'projects_to_run': project_guids, - 'callset_path': data_path, - 'sample_source': 'Broad_Internal' if is_internal else 'AnVIL', - 'sample_type': sample_type, - 'dataset_type': _dag_dataset_type(sample_type, dataset_type), - 'reference_genome': GENOME_VERSION_LOOKUP[genome_version], - } - - upload_info = _upload_data_loading_files( - projects, is_internal, user, genome_version, sample_type, dataset_type=dataset_type, - individual_ids=individual_ids, additional_project_files=additional_project_files, + updated_variables, gs_path = prepare_data_loading_request( + *args, user, pedigree_dir=SEQR_V3_PEDIGREE_GS_PATH, **kwargs, ) + updated_variables['sample_source'] = 'Broad_Internal' if is_internal else 'AnVIL' + upload_info = [f'Pedigree files have been uploaded to {gs_path}'] try: _check_dag_running_state() _update_variables(updated_variables) - _wait_for_dag_variable_update(project_guids) + _wait_for_dag_variable_update(updated_variables['projects_to_run']) _trigger_dag() except Exception as e: logger_call = logger.warning if isinstance(e, DagRunningException) else logger.error @@ -63,21 +45,6 @@ def trigger_data_loading(projects: list[Project], sample_type: str, dataset_type return success -def write_data_loading_pedigree(project: Project, user: User): - match = next(( - (callset, sample_type) for callset, sample_type in itertools.product(['Internal', 'External', 'AnVIL'], ['WGS', 'WES']) - if does_file_exist(_get_dag_project_gs_path( - project.guid, project.genome_version, sample_type, is_internal=callset != 'AnVIL', callset=callset, - ))), None) - if not match: - raise ValueError(f'No {SEQR_V2_DATASETS_GS_PATH} project directory found for {project.guid}') - callset, sample_type = match - _upload_data_loading_files( - [project], is_internal=callset != 'AnVIL', user=user, genome_version=project.genome_version, - sample_type=sample_type, callset=callset, - ) - - def _send_load_data_slack_msg(messages: list[str], channel: str, dag: dict): message = '\n\n '.join(messages) message_content = f"""{message} @@ -105,65 +72,6 @@ def _check_dag_running_state(): raise DagRunningException(f'{DAG_NAME} DAG is running and cannot be triggered again.') -def _dag_dataset_type(sample_type: str, dataset_type: str): - return 'GCNV' if dataset_type == Sample.DATASET_TYPE_SV_CALLS and sample_type == Sample.SAMPLE_TYPE_WES \ - else dataset_type - - -def _upload_data_loading_files(projects: list[Project], is_internal: bool, - user: User, genome_version: str, sample_type: str, dataset_type: str = None, callset: str = 'Internal', - individual_ids: list[str] = None, additional_project_files: dict = None): - file_annotations = OrderedDict({ - 'Project_GUID': F('family__project__guid'), 'Family_GUID': F('family__guid'), - 'Family_ID': F('family__family_id'), - 'Individual_ID': F('individual_id'), - 'Paternal_ID': F('father__individual_id'), 'Maternal_ID': F('mother__individual_id'), 'Sex': F('sex'), - }) - annotations = {'project': F('family__project__guid'), **file_annotations} - individual_filter = {'id__in': individual_ids} if individual_ids else {'family__project__in': projects} - data = Individual.objects.filter(**individual_filter).order_by('family_id', 'individual_id').values( - **dict(annotations)) - - data_by_project = defaultdict(list) - for row in data: - data_by_project[row.pop('project')].append(row) - - info = [] - for project_guid, rows in data_by_project.items(): - gs_path = _get_dag_project_gs_path(project_guid, genome_version, sample_type, is_internal, callset) - try: - files, file_suffixes = _parse_project_upload_files(project_guid, rows, file_annotations.keys(), additional_project_files) - write_multiple_files_to_gs(files, gs_path, user, file_format='tsv', file_suffixes=file_suffixes) - if dataset_type: - additional_gs_path = _get_gs_pedigree_path(genome_version, sample_type, dataset_type) - run_gsutil_with_wait(f'rsync -r {gs_path}', additional_gs_path, user) - except Exception as e: - logger.error(f'Uploading Pedigree to Google Storage failed. Errors: {e}', user, detail=rows) - info.append(f'Pedigree file has been uploaded to {gs_path}') - - return info - - -def _parse_project_upload_files(project_guid, rows, header, additional_project_files): - files = [(f'{project_guid}_pedigree', header, rows)] - file_suffixes = None - additional_file = additional_project_files and additional_project_files.get(project_guid) - if additional_file: - files.append(additional_file) - file_suffixes = {additional_file[0]: 'txt'} - return files, file_suffixes - - -def _get_dag_project_gs_path(project: str, genome_version: str, sample_type: str, is_internal: bool, callset: str): - dag_name = f'RDG_{sample_type}_Broad_{callset}' if is_internal else f'AnVIL_{sample_type}' - dag_path = f'{SEQR_V2_DATASETS_GS_PATH}/{GENOME_VERSION_LOOKUP[genome_version]}/{dag_name}' - return f'{dag_path}/base/projects/{project}/' if is_internal else f'{dag_path}/{project}/base/' - - -def _get_gs_pedigree_path(genome_version: str, sample_type: str, dataset_type: str): - return f'{SEQR_V3_PEDIGREE_GS_PATH}/{GENOME_VERSION_LOOKUP[genome_version]}/{dataset_type}/pedigrees/{sample_type}/' - - def _wait_for_dag_variable_update(projects): dag_projects = _get_task_ids() while all(p not in ''.join(dag_projects) for p in projects): diff --git a/seqr/views/utils/airtable_utils.py b/seqr/views/utils/airtable_utils.py index eb1a4f8d1b..f1eb2a3781 100644 --- a/seqr/views/utils/airtable_utils.py +++ b/seqr/views/utils/airtable_utils.py @@ -11,9 +11,16 @@ PAGE_SIZE = 100 MAX_OR_FILTERS = PAGE_SIZE - 5 +MAX_UPDATE_RECORDS = 10 ANVIL_REQUEST_TRACKING_TABLE = 'AnVIL Seqr Loading Requests Tracking' +LOADABLE_PDO_STATUSES = [ + 'On hold for phenotips, but ready to load', + 'Methods (Loading)', +] +AVAILABLE_PDO_STATUS = 'Available in seqr' + class AirtableSession(object): @@ -24,7 +31,14 @@ class AirtableSession(object): ANVIL_BASE: 'appUelDNM3BnWaR7M', } + @staticmethod + def is_airtable_enabled(): + return bool(AIRTABLE_API_KEY) + def __init__(self, user, base=RDG_BASE, no_auth=False): + if not self.is_airtable_enabled(): + raise ValueError('Airtable is not configured') + self._user = user if not no_auth: self._check_user_access(base) @@ -40,40 +54,53 @@ def _check_user_access(self, base): if not has_access: raise PermissionDenied('Error: To access airtable user must login with Google authentication.') - def safe_create_record(self, record_type, record): - try: - response = self._session.post(f'{self._url}/{record_type}', json={'records': [{'fields': record}]}) - response.raise_for_status() - except Exception as e: - logger.error(f'Airtable create "{record_type}" error: {e}', self._user) + def safe_create_records(self, record_type, records): + return self._safe_bulk_update_records( + 'post', record_type, [{'fields': record} for record in records], error_detail=records, + ) def safe_patch_records(self, record_type, record_or_filters, record_and_filters, update, max_records=PAGE_SIZE - 1): + error_detail = { + 'or_filters': record_or_filters, 'and_filters': record_and_filters, 'update': update, + } try: - self._patch_record(record_type, record_or_filters, record_and_filters, update, max_records) + records = self.fetch_records( + record_type, fields=record_or_filters.keys(), or_filters=record_or_filters, + and_filters=record_and_filters, + page_size=max_records + 1, + ) + if not records or len(records) > max_records: + raise ValueError('Unable to identify record to update') + + self.safe_patch_records_by_id(record_type, list(records.keys()), update, error_detail=error_detail) except Exception as e: - logger.error(f'Airtable patch "{record_type}" error: {e}', self._user, detail={ - 'or_filters': record_or_filters, 'and_filters': record_and_filters, 'update': update, - }) - - def _patch_record(self, record_type, record_or_filters, record_and_filters, update, max_records): - records = self.fetch_records( - record_type, fields=record_or_filters.keys(), or_filters=record_or_filters, and_filters=record_and_filters, - page_size=max_records+1, + logger.error(f'Airtable patch "{record_type}" error: {e}', self._user, detail=error_detail) + + def safe_patch_records_by_id(self, record_type, record_ids, update, error_detail=None): + self._safe_bulk_update_records( + 'patch', record_type, [{'id': record_id, 'fields': update} for record_id in sorted(record_ids)], + error_detail=error_detail or {'record_ids': record_ids, 'update': update}, ) - if not records or len(records) > max_records: - raise ValueError('Unable to identify record to update') + def _safe_bulk_update_records(self, update_type, record_type, records, error_detail=None): self._session.params = {} + update = getattr(self._session, update_type) errors = [] - for record_id in records.keys(): + updated_records = [] + for i in range(0, len(records), MAX_UPDATE_RECORDS): try: - response = self._session.patch(f'{self._url}/{record_type}/{record_id}', json={'fields': update}) + response = update(f'{self._url}/{record_type}', json={'records': records[i:i + MAX_UPDATE_RECORDS]}) response.raise_for_status() + updated_records += response.json()['records'] except Exception as e: errors.append(str(e)) if errors: - raise Exception(';'.join(errors)) + logger.error( + f'Airtable {update_type} "{record_type}" error: {";".join(errors)}', self._user, detail=error_detail, + ) + + return updated_records def fetch_records(self, record_type, fields, or_filters, and_filters=None, page_size=PAGE_SIZE): self._session.params.update({'fields[]': fields, 'pageSize': page_size}) diff --git a/seqr/views/utils/dataset_utils.py b/seqr/views/utils/dataset_utils.py index dd1574be78..c113e9985f 100644 --- a/seqr/views/utils/dataset_utils.py +++ b/seqr/views/utils/dataset_utils.py @@ -620,3 +620,18 @@ def load_phenotype_prioritization_data_file(file_path, user): raise ValueError(f'Multiple tools found {tool} and {row_dict["tool"]}. Only one in a file is supported.') return tool, data_by_project_sample_id + + +def convert_django_meta_to_http_headers(request): + + def convert_key(key): + # converting Django's all-caps keys (eg. 'HTTP_RANGE') to regular HTTP header keys (eg. 'Range') + return key.replace("HTTP_", "").replace('_', '-').title() + + http_headers = { + convert_key(key): str(value).lstrip() + for key, value in request.META.items() + if key.startswith("HTTP_") or (key in ('CONTENT_LENGTH', 'CONTENT_TYPE') and value) + } + + return http_headers diff --git a/seqr/views/utils/export_utils.py b/seqr/views/utils/export_utils.py index 1367477af1..59644436be 100644 --- a/seqr/views/utils/export_utils.py +++ b/seqr/views/utils/export_utils.py @@ -1,12 +1,13 @@ from collections import OrderedDict import json import openpyxl as xl +import os from tempfile import NamedTemporaryFile, TemporaryDirectory import zipfile from django.http.response import HttpResponse -from seqr.utils.file_utils import mv_file_to_gs +from seqr.utils.file_utils import mv_file_to_gs, is_google_bucket_file_path from seqr.views.utils.json_utils import _to_title_case DELIMITERS = { @@ -97,9 +98,14 @@ def export_multiple_files(files, zip_filename, **kwargs): return response -def write_multiple_files_to_gs(files, gs_path, user, **kwargs): +def write_multiple_files(files, file_path, user, **kwargs): + is_gs_path = is_google_bucket_file_path(file_path) + if not is_gs_path: + os.makedirs(file_path, exist_ok=True) with TemporaryDirectory() as temp_dir_name: + dir_name = temp_dir_name if is_gs_path else file_path for filename, content in _format_files_content(files, **kwargs): - with open(f'{temp_dir_name}/{filename}', 'w') as f: + with open(f'{dir_name}/{filename}', 'w') as f: f.write(content) - mv_file_to_gs(f'{temp_dir_name}/*', gs_path, user) + if is_gs_path: + mv_file_to_gs(f'{temp_dir_name}/*', f'{file_path}/', user) diff --git a/seqr/views/utils/test_utils.py b/seqr/views/utils/test_utils.py index 6e79acc07f..19f6c56c57 100644 --- a/seqr/views/utils/test_utils.py +++ b/seqr/views/utils/test_utils.py @@ -30,6 +30,7 @@ class AuthenticationTestCase(TestCase): NO_POLICY_USER = 'no_policy' ES_HOSTNAME = 'testhost' + MOCK_AIRTABLE_KEY = '' super_user = None analyst_user = None @@ -45,6 +46,9 @@ def setUp(self): patcher = mock.patch('seqr.utils.search.elasticsearch.es_utils.ELASTICSEARCH_SERVICE_HOSTNAME', self.ES_HOSTNAME) patcher.start() self.addCleanup(patcher.stop) + patcher = mock.patch('seqr.views.utils.airtable_utils.AIRTABLE_API_KEY', self.MOCK_AIRTABLE_KEY) + patcher.start() + self.addCleanup(patcher.stop) patcher = mock.patch('seqr.views.utils.permissions_utils.SEQR_PRIVACY_VERSION', 2.1) patcher.start() self.addCleanup(patcher.stop) @@ -95,12 +99,6 @@ def add_additional_user_groups(cls): pm_group = Group.objects.get(pk=5) pm_group.user_set.add(cls.pm_user) - @classmethod - def add_analyst_project(cls, project_id): - analyst_group = Group.objects.get(pk=4) - assign_perm(user_or_group=analyst_group, perm=CAN_VIEW, obj=Project.objects.filter(id=project_id)) - return True - def check_require_login(self, url, **request_kwargs): self._check_login(url, self.AUTHENTICATED_USER, **request_kwargs) @@ -509,6 +507,7 @@ def get_group_members_side_effect(user, group, use_sa_credentials=False): class AnvilAuthenticationTestCase(AuthenticationTestCase): ES_HOSTNAME = '' + MOCK_AIRTABLE_KEY = 'airflow_access' # mock the terra apis def setUp(self): @@ -554,10 +553,6 @@ def add_additional_user_groups(cls): analyst_group = Group.objects.get(pk=4) analyst_group.user_set.add(cls.analyst_user, cls.pm_user) - @classmethod - def add_analyst_project(cls, project_id): - return False - def assert_no_extra_anvil_calls(self): self.mock_get_ws_acl.assert_not_called() self.mock_get_groups.assert_not_called() @@ -642,7 +637,7 @@ def set_dag_trigger_error_response(self, status=200): 'state': 'running'} ]}) - def assert_airflow_calls(self, trigger_error=False, additional_tasks_check=False, dataset_type=None): + def assert_airflow_calls(self, trigger_error=False, additional_tasks_check=False, dataset_type=None, **kwargs): self.mock_airflow_logger.info.assert_not_called() # Test triggering anvil dags @@ -658,10 +653,10 @@ def assert_airflow_calls(self, trigger_error=False, additional_tasks_check=False dag_variables = { 'projects_to_run': [dag_variable_overrides['project']] if 'project' in dag_variable_overrides else self.PROJECTS, 'callset_path': f'gs://test_bucket/{dag_variable_overrides["callset_path"]}', - 'sample_source': dag_variable_overrides['sample_source'], 'sample_type': dag_variable_overrides['sample_type'], 'dataset_type': dataset_type or dag_variable_overrides['dataset_type'], 'reference_genome': dag_variable_overrides.get('reference_genome', 'GRCh38'), + 'sample_source': dag_variable_overrides['sample_source'], } self._assert_airflow_calls(dag_variables, call_count) @@ -720,6 +715,10 @@ def assert_expected_airtable_call(self, call_index, filter_formula, fields, addi expected_params.update(additional_params) self.assertDictEqual(responses.calls[call_index].request.params, expected_params) self.assertListEqual(self._get_list_param(responses.calls[call_index].request, 'fields%5B%5D'), fields) + self.assert_expected_airtable_headers(call_index) + + def assert_expected_airtable_headers(self, call_index): + self.assertEqual(responses.calls[call_index].request.headers['Authorization'], f'Bearer {self.MOCK_AIRTABLE_KEY}') @staticmethod def _get_list_param(call, param): diff --git a/settings.py b/settings.py index abfe408e7b..49271566a4 100644 --- a/settings.py +++ b/settings.py @@ -82,8 +82,8 @@ 'https://reg.genome.network') CSP_SCRIPT_SRC = ("'self'", "'unsafe-eval'", 'https://www.googletagmanager.com') CSP_IMG_SRC = ("'self'", 'https://www.google-analytics.com', 'https://storage.googleapis.com', - 'https://user-images.githubusercontent.com', # for images in GitHub discussions on Feature Updates page - 'data:') + 'https://user-images.githubusercontent.com', 'https://private-user-images.githubusercontent.com', # for images in GitHub discussions on Feature Updates page + 'data:') CSP_OBJECT_SRC = ("'none'") CSP_BASE_URI = ("'none'") # IGV js injects CSS into the page head so there is no way to set nonce. Therefore, support hashed value of the CSS @@ -155,6 +155,8 @@ MEDIA_ROOT = os.path.join(GENERATED_FILES_DIR, 'media/') MEDIA_URL = '/media/' +LOADING_DATASETS_DIR = os.environ.get('LOADING_DATASETS_DIR') + LOGGING = { 'version': 1, 'disable_existing_loggers': False, @@ -348,6 +350,10 @@ REDIS_SERVICE_HOSTNAME = os.environ.get('REDIS_SERVICE_HOSTNAME', 'localhost') REDIS_SERVICE_PORT = int(os.environ.get('REDIS_SERVICE_PORT', '6379')) +PIPELINE_RUNNER_HOSTNAME = os.environ.get('PIPELINE_RUNNER_HOSTNAME', 'pipeline-runner') +PIPELINE_RUNNER_PORT = os.environ.get('PIPELINE_RUNNER_PORT', '6000') +PIPELINE_RUNNER_SERVER = f'http://{PIPELINE_RUNNER_HOSTNAME}:{PIPELINE_RUNNER_PORT}' + # Matchmaker MME_DEFAULT_CONTACT_NAME = 'Samantha Baxter' MME_DEFAULT_CONTACT_INSTITUTION = 'Broad Center for Mendelian Genomics' diff --git a/ui/pages/DataManagement/DataManagement.jsx b/ui/pages/DataManagement/DataManagement.jsx index 51a31bd36b..39dec92df8 100644 --- a/ui/pages/DataManagement/DataManagement.jsx +++ b/ui/pages/DataManagement/DataManagement.jsx @@ -14,7 +14,6 @@ import RnaSeq from './components/RnaSeq' import SampleQc from './components/SampleQc' import Users from './components/Users' import PhenotypePrioritization from './components/PhenotypePrioritization' -import WritePedigree from './components/WritePedigree' const IFRAME_STYLE = { position: 'fixed', left: '0', top: '95px' } @@ -28,7 +27,6 @@ const DATA_MANAGEMENT_PAGES = [ ...PM_DATA_MANAGEMENT_PAGES, { path: 'sample_qc', component: SampleQc }, { path: 'users', component: Users }, - { path: 'write_pedigree', component: WritePedigree }, { path: 'phenotype_prioritization', component: PhenotypePrioritization }, ] diff --git a/ui/pages/DataManagement/components/LoadData.jsx b/ui/pages/DataManagement/components/LoadData.jsx index d4a32cf448..c9c24f24e8 100644 --- a/ui/pages/DataManagement/components/LoadData.jsx +++ b/ui/pages/DataManagement/components/LoadData.jsx @@ -11,6 +11,7 @@ import { DATASET_TYPE_SV_CALLS, DATASET_TYPE_MITO_CALLS, DATASET_TYPE_SNV_INDEL_CALLS, + GENOME_VERSION_FIELD, } from 'shared/utils/constants' const formatProjectOption = opt => ({ @@ -70,6 +71,11 @@ const LOAD_DATA_PAGES = [ ].map(value => ({ value, text: value.replace('_', '/') })), validate: validators.required, }, + { + ...GENOME_VERSION_FIELD, + component: ButtonRadioGroup, + validate: validators.required, + }, ], submitUrl: '/api/data_management/validate_callset', }, @@ -91,7 +97,7 @@ const LoadData = () => ( ) diff --git a/ui/pages/DataManagement/components/WritePedigree.jsx b/ui/pages/DataManagement/components/WritePedigree.jsx deleted file mode 100644 index 5d33dfdabb..0000000000 --- a/ui/pages/DataManagement/components/WritePedigree.jsx +++ /dev/null @@ -1,21 +0,0 @@ -import React from 'react' -import PropTypes from 'prop-types' -import { Button, Segment } from 'semantic-ui-react' - -import DispatchRequestButton from 'shared/components/buttons/DispatchRequestButton' -import ProjectSelector from 'shared/components/page/ProjectSelector' -import { HttpRequestHelper } from 'shared/utils/httpRequestHelper' - -const onSubmit = projectGuid => () => new HttpRequestHelper(`/api/data_management/write_pedigree/${projectGuid}`).get() - -const WritePedigree = ({ project }) => (project ? ( - }> -