Skip to content

Commit b6fdd9c

Browse files
authored
Merge pull request #5061 from broadinstitute/dev
Dev
2 parents 97915be + 5ba6a49 commit b6fdd9c

18 files changed

+198
-158
lines changed

clickhouse_search/search.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -667,11 +667,6 @@ def delete_clickhouse_project(project, dataset_type, sample_type=None):
667667
return f'Deleted all {dataset_type} search data for project {project.name}'
668668

669669

670-
def delete_clickhouse_family(project, family_guid, dataset_type, sample_type=None):
671-
dataset_type = _clickhouse_dataset_type(dataset_type, sample_type)
672-
return f'Clickhouse does not support deleting individual families from project. Manually delete {dataset_type} data for {family_guid} in project {project.guid}'
673-
674-
675670
SV_DATASET_TYPES = {
676671
Sample.SAMPLE_TYPE_WGS: Sample.DATASET_TYPE_SV_CALLS,
677672
Sample.SAMPLE_TYPE_WES: 'GCNV',

seqr/management/commands/transfer_families_to_different_project.py

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,13 @@
11
from django.core.management.base import BaseCommand
22

3-
from clickhouse_search.search import delete_clickhouse_family
4-
from seqr.models import Project, Family, VariantTag, VariantTagType, Sample
3+
from seqr.models import Project, Family, VariantTag, VariantTagType
54
from seqr.utils.search.utils import backend_specific_call
5+
from seqr.utils.search.add_data_utils import trigger_delete_families_search
66

77
import logging
88
logger = logging.getLogger(__name__)
99

1010

11-
def _disable_search(families, from_project):
12-
search_samples = Sample.objects.filter(is_active=True, individual__family__in=families)
13-
if search_samples:
14-
updated_families = search_samples.values_list("individual__family__family_id", flat=True).distinct()
15-
updated_family_dataset_types = list(search_samples.values_list('individual__family__guid', 'dataset_type', 'sample_type').distinct())
16-
family_summary = ", ".join(sorted(updated_families))
17-
num_updated = search_samples.update(is_active=False)
18-
logger.info(
19-
f'Disabled search for {num_updated} samples in the following {len(updated_families)} families: {family_summary}'
20-
)
21-
for update in updated_family_dataset_types:
22-
logger.info(delete_clickhouse_family(from_project, *update))
23-
24-
2511
class Command(BaseCommand):
2612
def add_arguments(self, parser):
2713
parser.add_argument('--from-project', required=True)
@@ -49,7 +35,7 @@ def handle(self, *args, **options):
4935
]
5036
logger.info(f'Skipping {num_found - len(families)} families with analysis groups in the project: {", ".join(group_families)}')
5137

52-
backend_specific_call(lambda *args: None, _disable_search)(families, from_project)
38+
backend_specific_call(lambda *args: None, trigger_delete_families_search)(from_project, list(families.values_list('guid', flat=True)))
5339

5440
for variant_tag_type in VariantTagType.objects.filter(project=from_project):
5541
variant_tags = VariantTag.objects.filter(saved_variants__family__in=families, variant_tag_type=variant_tag_type)

seqr/management/tests/check_for_new_samples_from_pipeline_tests.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -800,10 +800,12 @@ class AirtableCheckNewSamplesTest(AnvilAuthenticationTestCase, CheckNewSamplesTe
800800
('Fetched 2 AnVIL Seqr Loading Requests Tracking records from airtable', None),
801801
]
802802
VALIDATION_LOGS = [
803+
'==> gsutil ls gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-14/validation_errors.json',
803804
'==> gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-14/validation_errors.json',
804805
'Fetching AnVIL Seqr Loading Requests Tracking records 0-2 from airtable',
805806
'Fetched 1 AnVIL Seqr Loading Requests Tracking records from airtable',
806807
'==> gsutil mv /mock/tmp/* gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-14/',
808+
'==> gsutil ls gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-24/validation_errors.json',
807809
'==> gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-24/validation_errors.json',
808810
'==> gsutil mv /mock/tmp/* gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-24/',
809811
]
@@ -892,22 +894,30 @@ def _set_reloading_loading_files(self):
892894
def _set_loading_files(self):
893895
responses.calls.reset()
894896
self.mock_subprocess.reset_mock()
895-
self.mock_subprocess.side_effect = [self.mock_ls_process] + [
896-
mock_opened_file(i) for i in range(len(OPENED_RUN_JSON_FILES) - 1)
897-
] + [self.mock_mv_process, mock_opened_file(-1), self.mock_mv_process]
897+
subprocesses = [self.mock_ls_process]
898+
for i in range(len(OPENED_RUN_JSON_FILES) - 1):
899+
subprocesses += [self.mock_mv_process, mock_opened_file(i)]
900+
subprocesses += [self.mock_mv_process, self.mock_mv_process, mock_opened_file(-1), self.mock_mv_process]
901+
self.mock_subprocess.side_effect = subprocesses
898902

899903
def _assert_expected_loading_file_calls(self, single_call):
900904
calls = [
901905
('gsutil ls gs://seqr-hail-search-data/v3.1/*/*/runs/*/*', -1),
906+
('gsutil ls gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/auto__2023-08-09/metadata.json', -2),
902907
('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/auto__2023-08-09/metadata.json', -2),
903908
]
904909
if not single_call:
905910
calls += [
911+
('gsutil ls gs://seqr-hail-search-data/v3.1/GRCh37/SNV_INDEL/runs/manual__2023-11-02/metadata.json', -2),
906912
('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh37/SNV_INDEL/runs/manual__2023-11-02/metadata.json', -2),
913+
('gsutil ls gs://seqr-hail-search-data/v3.1/GRCh38/MITO/runs/auto__2024-08-12/metadata.json', -2),
907914
('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/MITO/runs/auto__2024-08-12/metadata.json', -2),
915+
('gsutil ls gs://seqr-hail-search-data/v3.1/GRCh38/GCNV/runs/auto__2024-09-14/metadata.json', -2),
908916
('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/GCNV/runs/auto__2024-09-14/metadata.json', -2),
917+
('gsutil ls gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-14/validation_errors.json', -2),
909918
('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-14/validation_errors.json', -2),
910919
('gsutil mv /mock/tmp/* gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-14/', -2),
920+
('gsutil ls gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-24/validation_errors.json', -2),
911921
('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-24/validation_errors.json', -2),
912922
('gsutil mv /mock/tmp/* gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-24/', -2),
913923
]
@@ -916,7 +926,8 @@ def _assert_expected_loading_file_calls(self, single_call):
916926
)
917927

918928
def _additional_loading_logs(self, data_type, version):
919-
return [(f'==> gsutil cat gs://seqr-hail-search-data/v3.1/{data_type.replace("SV", "GCNV")}/runs/{version}/metadata.json', None)]
929+
return [(f'==> gsutil ls gs://seqr-hail-search-data/v3.1/{data_type.replace("SV", "GCNV")}/runs/{version}/metadata.json', None),
930+
(f'==> gsutil cat gs://seqr-hail-search-data/v3.1/{data_type.replace("SV", "GCNV")}/runs/{version}/metadata.json', None)]
920931

921932
def _assert_expected_airtable_calls(self, has_success_run, single_call):
922933
# Test request tracking updates for validation errors

seqr/management/tests/load_rna_seq_tests.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ def setUp(self):
2020
mock_gzip_open = patcher.start()
2121
self.mock_gzip_file_iter = mock_gzip_open.return_value.__enter__.return_value.__iter__
2222
self.addCleanup(patcher.stop)
23+
patcher = mock.patch('seqr.utils.file_utils.os')
24+
mock_os = patcher.start()
25+
mock_os.path.isfile.return_value = True
26+
self.addCleanup(patcher.stop)
2327
patcher = mock.patch('seqr.management.commands.load_rna_seq.open')
2428
self.mock_open = patcher.start()
2529
self.addCleanup(patcher.stop)

seqr/management/tests/transfer_families_to_different_project_tests.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from django.core.management import call_command
2+
import responses
23

34
from seqr.models import Family, VariantTagType, VariantTag, Sample
45
from seqr.views.utils.test_utils import AuthenticationTestCase, AnvilAuthenticationTestCase
@@ -9,7 +10,10 @@ class TransferFamiliesTest(object):
910
DEACTIVATE_SEARCH = True
1011
LOGS = []
1112

13+
@responses.activate
1214
def test_command(self):
15+
responses.add(responses.POST, 'http://pipeline-runner:6000/delete_families_enqueue', status=200)
16+
1317
call_command(
1418
'transfer_families_to_different_project', '--from-project=R0001_1kg', '--to-project=R0003_test', '2', '4', '5', '12',
1519
)
@@ -59,7 +63,5 @@ class TransferFamiliesClickhouseTest(TransferFamiliesTest, AnvilAuthenticationTe
5963
ES_HOSTNAME = ''
6064
LOGS = [
6165
('Disabled search for 7 samples in the following 1 families: 2', None),
62-
('Clickhouse does not support deleting individual families from project. Manually delete MITO data for F000002_2 in project R0001_1kg', None),
63-
('Clickhouse does not support deleting individual families from project. Manually delete SNV_INDEL data for F000002_2 in project R0001_1kg', None),
64-
('Clickhouse does not support deleting individual families from project. Manually delete GCNV data for F000002_2 in project R0001_1kg', None),
66+
('Triggered Delete Families', {'detail': {'project_guid': 'R0001_1kg', 'family_guids': ['F000002_2', 'F000004_4']}}),
6567
]

seqr/management/tests/update_individuals_sample_qc_tests.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ def setUp(self):
8282
self.mock_subprocess = patcher.start()
8383
self.addCleanup(patcher.stop)
8484
self.mock_ls_process = mock.MagicMock()
85+
self.mock_does_exist_process = mock.MagicMock()
86+
self.mock_does_exist_process.wait.return_value = 0
8587
self.mock_metadata_file = mock.MagicMock()
8688
super().setUp()
8789

@@ -100,14 +102,14 @@ def test_command(self):
100102
self.mock_ls_process.communicate.return_value = b'gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-02-24/metadata.json\n', b''
101103
self.mock_metadata_file.stdout = [json.dumps({}).encode()]
102104
self.mock_metadata_file.wait.return_value = 0
103-
self.mock_subprocess.side_effect = [self.mock_ls_process, self.mock_metadata_file]
105+
self.mock_subprocess.side_effect = [self.mock_ls_process, self.mock_does_exist_process, self.mock_metadata_file]
104106

105107
with self.assertRaises(CommandError):
106108
call_command('update_individuals_sample_qc', 'SNV_INDEL', 'GRCh38', 'manual__2025-02-24')
107109

108110
# Test valid case
109111
self.mock_metadata_file.stdout = [json.dumps(METADATA_JSON).encode()]
110-
self.mock_subprocess.side_effect = [self.mock_ls_process, self.mock_metadata_file]
112+
self.mock_subprocess.side_effect = [self.mock_ls_process, self.mock_does_exist_process, self.mock_metadata_file]
111113
call_command('update_individuals_sample_qc', 'SNV_INDEL', 'GRCh38', 'manual__2025-02-24')
112114

113115
# Individual model properly updated with sample qc results

seqr/utils/file_utils.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def does_file_exist(file_path, user=None):
4343
success = process.wait() == 0
4444
if not success:
4545
errors = [line.decode('utf-8').strip() for line in process.stdout]
46-
logger.info(' '.join(errors), user)
46+
logger.warning(' '.join(errors), user)
4747
return success
4848
return os.path.isfile(file_path)
4949

@@ -57,6 +57,8 @@ def list_files(wildcard_path, user, check_subfolders=False, allow_missing=True):
5757

5858

5959
def file_iter(file_path, byte_range=None, raw_content=False, user=None, **kwargs):
60+
if not does_file_exist(file_path, user=user):
61+
raise FileNotFoundError(f'Could not access file {file_path}')
6062
if is_google_bucket_file_path(file_path):
6163
for line in _google_bucket_file_iter(file_path, byte_range=byte_range, raw_content=raw_content, user=user, **kwargs):
6264
yield line
@@ -82,7 +84,7 @@ def file_iter(file_path, byte_range=None, raw_content=False, user=None, **kwargs
8284
def _google_bucket_file_iter(gs_path, byte_range=None, raw_content=False, user=None, **kwargs):
8385
"""Iterate over lines in the given file"""
8486
range_arg = ' -r {}-{}'.format(byte_range[0], byte_range[1]) if byte_range else ''
85-
process = run_gsutil_with_wait(
87+
process = _run_gsutil_command(
8688
'cat{}'.format(range_arg), gs_path, gunzip=gs_path.endswith("gz") and not raw_content, user=user, **kwargs)
8789
for line in process.stdout:
8890
if not raw_content:

seqr/utils/search/add_data_utils.py

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from seqr.utils.middleware import ErrorsWarningsException
1414
from seqr.views.utils.airtable_utils import AirtableSession, ANVIL_REQUEST_TRACKING_TABLE
1515
from seqr.views.utils.export_utils import write_multiple_files
16+
from seqr.views.utils.json_utils import _to_title_case
1617
from seqr.views.utils.pedigree_info_utils import JsonConstants
1718
from settings import SEQR_SLACK_DATA_ALERTS_NOTIFICATION_CHANNEL, BASE_URL, ANVIL_UI_URL, PIPELINE_RUNNER_SERVER, \
1819
SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL, SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, LOADING_DATASETS_DIR
@@ -65,6 +66,22 @@ def update_airtable_loading_tracking_status(project, status, additional_update=N
6566
update={'Status': status, **(additional_update or {})},
6667
)
6768

69+
def trigger_delete_families_search(project, family_guids, user=None):
70+
search_samples = Sample.objects.filter(is_active=True, individual__family__guid__in=family_guids)
71+
info = []
72+
if search_samples:
73+
updated_families = search_samples.values_list("individual__family__family_id", flat=True).distinct()
74+
family_summary = ", ".join(sorted(updated_families))
75+
num_updated = search_samples.update(is_active=False)
76+
message = f'Disabled search for {num_updated} samples in the following {len(updated_families)} families: {family_summary}'
77+
info.append(message)
78+
logger.info(message, user)
79+
80+
variables = {'project_guid': project.guid, 'family_guids': family_guids}
81+
_enqueue_pipeline_request('delete_families', variables, user)
82+
info.append('Triggered delete family data')
83+
return info
84+
6885
def trigger_data_loading(projects: list[Project], individual_ids: list[int], sample_type: str, dataset_type: str,
6986
genome_version: str, data_path: str, user: User, raise_error: bool = False, skip_expect_tdr_metrics: bool = True,
7087
skip_check_sex_and_relatedness: bool = True, vcf_sample_id_map=None,
@@ -85,34 +102,40 @@ def trigger_data_loading(projects: list[Project], individual_ids: list[int], sam
85102
_upload_data_loading_files(individual_ids, vcf_sample_id_map or {}, user, file_path, raise_error)
86103
_write_gene_id_file(user)
87104

88-
response = requests.post(f'{PIPELINE_RUNNER_SERVER}/loading_pipeline_enqueue', json=variables, timeout=60)
89-
success = True
105+
error = _enqueue_pipeline_request('loading_pipeline', variables, user, raise_error)
106+
if error:
107+
safe_post_to_slack(
108+
SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL,
109+
f'{error_message}: {error}\nLoading pipeline should be triggered with:\n```{json.dumps(variables, indent=4)}```',
110+
)
111+
112+
success = not error
113+
if success_message and (success or success_slack_channel != SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL):
114+
safe_post_to_slack(success_slack_channel, '\n\n'.join([
115+
success_message,
116+
f'Pedigree files have been uploaded to {file_path}',
117+
f'Loading pipeline is triggered with:\n```{json.dumps(variables, indent=4)}```',
118+
]))
119+
120+
return success
121+
122+
123+
def _enqueue_pipeline_request(name: str, variables: dict, user: User, raise_error: bool = True):
124+
response = requests.post(f'{PIPELINE_RUNNER_SERVER}/{name}_enqueue', json=variables, timeout=60)
125+
error = None
90126
try:
91127
response.raise_for_status()
92-
logger.info('Triggered loading pipeline', user, detail=variables)
128+
logger.info(f'Triggered {_to_title_case(name)}', user, detail=variables)
93129
except requests.HTTPError as e:
94-
success = False
95130
error = str(e)
96131
if response.status_code == 409:
97132
error = 'Loading pipeline is already running. Wait for it to complete and resubmit'
98133
e = ErrorsWarningsException([error])
99134
if raise_error:
100135
raise e
101136
else:
102-
logger.warning(f'Error triggering loading pipeline: {error}', user, detail=variables)
103-
safe_post_to_slack(
104-
SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL,
105-
f'{error_message}: {error}\nLoading pipeline should be triggered with:\n```{json.dumps(variables, indent=4)}```',
106-
)
107-
108-
if success_message and (success or success_slack_channel != SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL):
109-
safe_post_to_slack(success_slack_channel, '\n\n'.join([
110-
success_message,
111-
f'Pedigree files have been uploaded to {file_path}',
112-
f'Loading pipeline is triggered with:\n```{json.dumps(variables, indent=4)}```',
113-
]))
114-
115-
return success
137+
logger.warning(f'Error Triggering {_to_title_case(name)}: {error}', user, detail=variables)
138+
return error
116139

117140

118141
def _loading_dataset_type(sample_type: str, dataset_type: str):

seqr/utils/vcf_utils.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,17 @@ def _get_vcf_meta_info(line):
7676
def validate_vcf_and_get_samples(data_path, user, genome_version, path_name=None, dataset_type=None):
7777
allowed_exts = DATA_TYPE_FILE_EXTS.get(dataset_type)
7878

79-
vcf_filename = _validate_vcf_exists(data_path, user, path_name, allowed_exts)
79+
vcf_filename = _validate_valid_vcf_name(data_path, user, allowed_exts)
8080

81-
if allowed_exts and vcf_filename.endswith(allowed_exts):
81+
if vcf_filename is None:
8282
return None
8383

8484
byte_range = None if vcf_filename.endswith('.vcf') else (0, BLOCK_SIZE)
8585
meta = defaultdict(dict)
8686
try:
87-
header_line = next(_get_vcf_header_line(file_iter(vcf_filename, byte_range=byte_range), meta))
87+
header_line = next(_get_vcf_header_line(file_iter(vcf_filename, byte_range=byte_range, user=user), meta))
88+
except FileNotFoundError:
89+
raise ErrorsWarningsException([f'Data file or path {path_name or data_path} is not found.'], [])
8890
except StopIteration:
8991
raise ErrorsWarningsException(['No header found in the VCF file.'], [])
9092
except UnicodeDecodeError:
@@ -117,23 +119,22 @@ def _get_vcf_header_line(vcf_file, meta):
117119
meta[meta_info['field']].update({meta_info['id']: meta_info['type']})
118120

119121

120-
def _validate_vcf_exists(data_path, user, path_name, allowed_exts):
122+
def _validate_valid_vcf_name(data_path, user, allowed_exts):
121123
file_extensions = (allowed_exts or ()) + VCF_FILE_EXTENSIONS
122124
if not data_path.endswith(file_extensions):
123125
raise ErrorsWarningsException([
124126
'Invalid VCF file format - file path must end with {}'.format(' or '.join(file_extensions))
125127
])
126128

127-
file_to_check = None
129+
file_to_check = data_path
128130
if '*' in data_path:
129131
files = list_files(data_path, user)
130132
if files:
131133
file_to_check = files[0]
132-
elif does_file_exist(data_path, user=user):
133-
file_to_check = data_path
134-
135-
if not file_to_check:
136-
raise ErrorsWarningsException(['Data file or path {} is not found.'.format(path_name or data_path)])
134+
elif allowed_exts and data_path.endswith(allowed_exts):
135+
if not does_file_exist(data_path, user=user):
136+
raise ErrorsWarningsException([f'Data file or path {data_path} is not found.'])
137+
file_to_check = None
137138

138139
return file_to_check
139140

0 commit comments

Comments
 (0)