Skip to content
Merged

Dev #5061

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions clickhouse_search/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,11 +667,6 @@ def delete_clickhouse_project(project, dataset_type, sample_type=None):
return f'Deleted all {dataset_type} search data for project {project.name}'


def delete_clickhouse_family(project, family_guid, dataset_type, sample_type=None):
dataset_type = _clickhouse_dataset_type(dataset_type, sample_type)
return f'Clickhouse does not support deleting individual families from project. Manually delete {dataset_type} data for {family_guid} in project {project.guid}'


SV_DATASET_TYPES = {
Sample.SAMPLE_TYPE_WGS: Sample.DATASET_TYPE_SV_CALLS,
Sample.SAMPLE_TYPE_WES: 'GCNV',
Expand Down
20 changes: 3 additions & 17 deletions seqr/management/commands/transfer_families_to_different_project.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,13 @@
from django.core.management.base import BaseCommand

from clickhouse_search.search import delete_clickhouse_family
from seqr.models import Project, Family, VariantTag, VariantTagType, Sample
from seqr.models import Project, Family, VariantTag, VariantTagType
from seqr.utils.search.utils import backend_specific_call
from seqr.utils.search.add_data_utils import trigger_delete_families_search

import logging
logger = logging.getLogger(__name__)


def _disable_search(families, from_project):
search_samples = Sample.objects.filter(is_active=True, individual__family__in=families)
if search_samples:
updated_families = search_samples.values_list("individual__family__family_id", flat=True).distinct()
updated_family_dataset_types = list(search_samples.values_list('individual__family__guid', 'dataset_type', 'sample_type').distinct())
family_summary = ", ".join(sorted(updated_families))
num_updated = search_samples.update(is_active=False)
logger.info(
f'Disabled search for {num_updated} samples in the following {len(updated_families)} families: {family_summary}'
)
for update in updated_family_dataset_types:
logger.info(delete_clickhouse_family(from_project, *update))


class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('--from-project', required=True)
Expand Down Expand Up @@ -49,7 +35,7 @@ def handle(self, *args, **options):
]
logger.info(f'Skipping {num_found - len(families)} families with analysis groups in the project: {", ".join(group_families)}')

backend_specific_call(lambda *args: None, _disable_search)(families, from_project)
backend_specific_call(lambda *args: None, trigger_delete_families_search)(from_project, list(families.values_list('guid', flat=True)))

for variant_tag_type in VariantTagType.objects.filter(project=from_project):
variant_tags = VariantTag.objects.filter(saved_variants__family__in=families, variant_tag_type=variant_tag_type)
Expand Down
19 changes: 15 additions & 4 deletions seqr/management/tests/check_for_new_samples_from_pipeline_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,10 +800,12 @@ class AirtableCheckNewSamplesTest(AnvilAuthenticationTestCase, CheckNewSamplesTe
('Fetched 2 AnVIL Seqr Loading Requests Tracking records from airtable', None),
]
VALIDATION_LOGS = [
'==> gsutil ls gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-14/validation_errors.json',
'==> gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-14/validation_errors.json',
'Fetching AnVIL Seqr Loading Requests Tracking records 0-2 from airtable',
'Fetched 1 AnVIL Seqr Loading Requests Tracking records from airtable',
'==> gsutil mv /mock/tmp/* gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-14/',
'==> gsutil ls gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-24/validation_errors.json',
'==> gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-24/validation_errors.json',
'==> gsutil mv /mock/tmp/* gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-24/',
]
Expand Down Expand Up @@ -892,22 +894,30 @@ def _set_reloading_loading_files(self):
def _set_loading_files(self):
responses.calls.reset()
self.mock_subprocess.reset_mock()
self.mock_subprocess.side_effect = [self.mock_ls_process] + [
mock_opened_file(i) for i in range(len(OPENED_RUN_JSON_FILES) - 1)
] + [self.mock_mv_process, mock_opened_file(-1), self.mock_mv_process]
subprocesses = [self.mock_ls_process]
for i in range(len(OPENED_RUN_JSON_FILES) - 1):
subprocesses += [self.mock_mv_process, mock_opened_file(i)]
subprocesses += [self.mock_mv_process, self.mock_mv_process, mock_opened_file(-1), self.mock_mv_process]
self.mock_subprocess.side_effect = subprocesses

def _assert_expected_loading_file_calls(self, single_call):
calls = [
('gsutil ls gs://seqr-hail-search-data/v3.1/*/*/runs/*/*', -1),
('gsutil ls gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/auto__2023-08-09/metadata.json', -2),
('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/auto__2023-08-09/metadata.json', -2),
]
if not single_call:
calls += [
('gsutil ls gs://seqr-hail-search-data/v3.1/GRCh37/SNV_INDEL/runs/manual__2023-11-02/metadata.json', -2),
('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh37/SNV_INDEL/runs/manual__2023-11-02/metadata.json', -2),
('gsutil ls gs://seqr-hail-search-data/v3.1/GRCh38/MITO/runs/auto__2024-08-12/metadata.json', -2),
('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/MITO/runs/auto__2024-08-12/metadata.json', -2),
('gsutil ls gs://seqr-hail-search-data/v3.1/GRCh38/GCNV/runs/auto__2024-09-14/metadata.json', -2),
('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/GCNV/runs/auto__2024-09-14/metadata.json', -2),
('gsutil ls gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-14/validation_errors.json', -2),
('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-14/validation_errors.json', -2),
('gsutil mv /mock/tmp/* gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-14/', -2),
('gsutil ls gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-24/validation_errors.json', -2),
('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-24/validation_errors.json', -2),
('gsutil mv /mock/tmp/* gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-24/', -2),
]
Expand All @@ -916,7 +926,8 @@ def _assert_expected_loading_file_calls(self, single_call):
)

def _additional_loading_logs(self, data_type, version):
return [(f'==> gsutil cat gs://seqr-hail-search-data/v3.1/{data_type.replace("SV", "GCNV")}/runs/{version}/metadata.json', None)]
return [(f'==> gsutil ls gs://seqr-hail-search-data/v3.1/{data_type.replace("SV", "GCNV")}/runs/{version}/metadata.json', None),
(f'==> gsutil cat gs://seqr-hail-search-data/v3.1/{data_type.replace("SV", "GCNV")}/runs/{version}/metadata.json', None)]

def _assert_expected_airtable_calls(self, has_success_run, single_call):
# Test request tracking updates for validation errors
Expand Down
4 changes: 4 additions & 0 deletions seqr/management/tests/load_rna_seq_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ def setUp(self):
mock_gzip_open = patcher.start()
self.mock_gzip_file_iter = mock_gzip_open.return_value.__enter__.return_value.__iter__
self.addCleanup(patcher.stop)
patcher = mock.patch('seqr.utils.file_utils.os')
mock_os = patcher.start()
mock_os.path.isfile.return_value = True
self.addCleanup(patcher.stop)
patcher = mock.patch('seqr.management.commands.load_rna_seq.open')
self.mock_open = patcher.start()
self.addCleanup(patcher.stop)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from django.core.management import call_command
import responses

from seqr.models import Family, VariantTagType, VariantTag, Sample
from seqr.views.utils.test_utils import AuthenticationTestCase, AnvilAuthenticationTestCase
Expand All @@ -9,7 +10,10 @@ class TransferFamiliesTest(object):
DEACTIVATE_SEARCH = True
LOGS = []

@responses.activate
def test_command(self):
responses.add(responses.POST, 'http://pipeline-runner:6000/delete_families_enqueue', status=200)

call_command(
'transfer_families_to_different_project', '--from-project=R0001_1kg', '--to-project=R0003_test', '2', '4', '5', '12',
)
Expand Down Expand Up @@ -59,7 +63,5 @@ class TransferFamiliesClickhouseTest(TransferFamiliesTest, AnvilAuthenticationTe
ES_HOSTNAME = ''
LOGS = [
('Disabled search for 7 samples in the following 1 families: 2', None),
('Clickhouse does not support deleting individual families from project. Manually delete MITO data for F000002_2 in project R0001_1kg', None),
('Clickhouse does not support deleting individual families from project. Manually delete SNV_INDEL data for F000002_2 in project R0001_1kg', None),
('Clickhouse does not support deleting individual families from project. Manually delete GCNV data for F000002_2 in project R0001_1kg', None),
('Triggered Delete Families', {'detail': {'project_guid': 'R0001_1kg', 'family_guids': ['F000002_2', 'F000004_4']}}),
]
6 changes: 4 additions & 2 deletions seqr/management/tests/update_individuals_sample_qc_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def setUp(self):
self.mock_subprocess = patcher.start()
self.addCleanup(patcher.stop)
self.mock_ls_process = mock.MagicMock()
self.mock_does_exist_process = mock.MagicMock()
self.mock_does_exist_process.wait.return_value = 0
self.mock_metadata_file = mock.MagicMock()
super().setUp()

Expand All @@ -100,14 +102,14 @@ def test_command(self):
self.mock_ls_process.communicate.return_value = b'gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-02-24/metadata.json\n', b''
self.mock_metadata_file.stdout = [json.dumps({}).encode()]
self.mock_metadata_file.wait.return_value = 0
self.mock_subprocess.side_effect = [self.mock_ls_process, self.mock_metadata_file]
self.mock_subprocess.side_effect = [self.mock_ls_process, self.mock_does_exist_process, self.mock_metadata_file]

with self.assertRaises(CommandError):
call_command('update_individuals_sample_qc', 'SNV_INDEL', 'GRCh38', 'manual__2025-02-24')

# Test valid case
self.mock_metadata_file.stdout = [json.dumps(METADATA_JSON).encode()]
self.mock_subprocess.side_effect = [self.mock_ls_process, self.mock_metadata_file]
self.mock_subprocess.side_effect = [self.mock_ls_process, self.mock_does_exist_process, self.mock_metadata_file]
call_command('update_individuals_sample_qc', 'SNV_INDEL', 'GRCh38', 'manual__2025-02-24')

# Individual model properly updated with sample qc results
Expand Down
6 changes: 4 additions & 2 deletions seqr/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def does_file_exist(file_path, user=None):
success = process.wait() == 0
if not success:
errors = [line.decode('utf-8').strip() for line in process.stdout]
logger.info(' '.join(errors), user)
logger.warning(' '.join(errors), user)
return success
return os.path.isfile(file_path)

Expand All @@ -57,6 +57,8 @@ def list_files(wildcard_path, user, check_subfolders=False, allow_missing=True):


def file_iter(file_path, byte_range=None, raw_content=False, user=None, **kwargs):
if not does_file_exist(file_path, user=user):
raise FileNotFoundError(f'Could not access file {file_path}')
if is_google_bucket_file_path(file_path):
for line in _google_bucket_file_iter(file_path, byte_range=byte_range, raw_content=raw_content, user=user, **kwargs):
yield line
Expand All @@ -82,7 +84,7 @@ def file_iter(file_path, byte_range=None, raw_content=False, user=None, **kwargs
def _google_bucket_file_iter(gs_path, byte_range=None, raw_content=False, user=None, **kwargs):
"""Iterate over lines in the given file"""
range_arg = ' -r {}-{}'.format(byte_range[0], byte_range[1]) if byte_range else ''
process = run_gsutil_with_wait(
process = _run_gsutil_command(
'cat{}'.format(range_arg), gs_path, gunzip=gs_path.endswith("gz") and not raw_content, user=user, **kwargs)
for line in process.stdout:
if not raw_content:
Expand Down
59 changes: 41 additions & 18 deletions seqr/utils/search/add_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from seqr.utils.middleware import ErrorsWarningsException
from seqr.views.utils.airtable_utils import AirtableSession, ANVIL_REQUEST_TRACKING_TABLE
from seqr.views.utils.export_utils import write_multiple_files
from seqr.views.utils.json_utils import _to_title_case
from seqr.views.utils.pedigree_info_utils import JsonConstants
from settings import SEQR_SLACK_DATA_ALERTS_NOTIFICATION_CHANNEL, BASE_URL, ANVIL_UI_URL, PIPELINE_RUNNER_SERVER, \
SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL, SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, LOADING_DATASETS_DIR
Expand Down Expand Up @@ -65,6 +66,22 @@ def update_airtable_loading_tracking_status(project, status, additional_update=N
update={'Status': status, **(additional_update or {})},
)

def trigger_delete_families_search(project, family_guids, user=None):
search_samples = Sample.objects.filter(is_active=True, individual__family__guid__in=family_guids)
info = []
if search_samples:
updated_families = search_samples.values_list("individual__family__family_id", flat=True).distinct()
family_summary = ", ".join(sorted(updated_families))
num_updated = search_samples.update(is_active=False)
message = f'Disabled search for {num_updated} samples in the following {len(updated_families)} families: {family_summary}'
info.append(message)
logger.info(message, user)

variables = {'project_guid': project.guid, 'family_guids': family_guids}
_enqueue_pipeline_request('delete_families', variables, user)
info.append('Triggered delete family data')
return info

def trigger_data_loading(projects: list[Project], individual_ids: list[int], sample_type: str, dataset_type: str,
genome_version: str, data_path: str, user: User, raise_error: bool = False, skip_expect_tdr_metrics: bool = True,
skip_check_sex_and_relatedness: bool = True, vcf_sample_id_map=None,
Expand All @@ -85,34 +102,40 @@ def trigger_data_loading(projects: list[Project], individual_ids: list[int], sam
_upload_data_loading_files(individual_ids, vcf_sample_id_map or {}, user, file_path, raise_error)
_write_gene_id_file(user)

response = requests.post(f'{PIPELINE_RUNNER_SERVER}/loading_pipeline_enqueue', json=variables, timeout=60)
success = True
error = _enqueue_pipeline_request('loading_pipeline', variables, user, raise_error)
if error:
safe_post_to_slack(
SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL,
f'{error_message}: {error}\nLoading pipeline should be triggered with:\n```{json.dumps(variables, indent=4)}```',
)

success = not error
if success_message and (success or success_slack_channel != SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL):
safe_post_to_slack(success_slack_channel, '\n\n'.join([
success_message,
f'Pedigree files have been uploaded to {file_path}',
f'Loading pipeline is triggered with:\n```{json.dumps(variables, indent=4)}```',
]))

return success


def _enqueue_pipeline_request(name: str, variables: dict, user: User, raise_error: bool = True):
response = requests.post(f'{PIPELINE_RUNNER_SERVER}/{name}_enqueue', json=variables, timeout=60)
error = None
try:
response.raise_for_status()
logger.info('Triggered loading pipeline', user, detail=variables)
logger.info(f'Triggered {_to_title_case(name)}', user, detail=variables)
except requests.HTTPError as e:
success = False
error = str(e)
if response.status_code == 409:
error = 'Loading pipeline is already running. Wait for it to complete and resubmit'
e = ErrorsWarningsException([error])
if raise_error:
raise e
else:
logger.warning(f'Error triggering loading pipeline: {error}', user, detail=variables)
safe_post_to_slack(
SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL,
f'{error_message}: {error}\nLoading pipeline should be triggered with:\n```{json.dumps(variables, indent=4)}```',
)

if success_message and (success or success_slack_channel != SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL):
safe_post_to_slack(success_slack_channel, '\n\n'.join([
success_message,
f'Pedigree files have been uploaded to {file_path}',
f'Loading pipeline is triggered with:\n```{json.dumps(variables, indent=4)}```',
]))

return success
logger.warning(f'Error Triggering {_to_title_case(name)}: {error}', user, detail=variables)
return error


def _loading_dataset_type(sample_type: str, dataset_type: str):
Expand Down
21 changes: 11 additions & 10 deletions seqr/utils/vcf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,17 @@ def _get_vcf_meta_info(line):
def validate_vcf_and_get_samples(data_path, user, genome_version, path_name=None, dataset_type=None):
allowed_exts = DATA_TYPE_FILE_EXTS.get(dataset_type)

vcf_filename = _validate_vcf_exists(data_path, user, path_name, allowed_exts)
vcf_filename = _validate_valid_vcf_name(data_path, user, allowed_exts)

if allowed_exts and vcf_filename.endswith(allowed_exts):
if vcf_filename is None:
return None

byte_range = None if vcf_filename.endswith('.vcf') else (0, BLOCK_SIZE)
meta = defaultdict(dict)
try:
header_line = next(_get_vcf_header_line(file_iter(vcf_filename, byte_range=byte_range), meta))
header_line = next(_get_vcf_header_line(file_iter(vcf_filename, byte_range=byte_range, user=user), meta))
except FileNotFoundError:
raise ErrorsWarningsException([f'Data file or path {path_name or data_path} is not found.'], [])
except StopIteration:
raise ErrorsWarningsException(['No header found in the VCF file.'], [])
except UnicodeDecodeError:
Expand Down Expand Up @@ -117,23 +119,22 @@ def _get_vcf_header_line(vcf_file, meta):
meta[meta_info['field']].update({meta_info['id']: meta_info['type']})


def _validate_vcf_exists(data_path, user, path_name, allowed_exts):
def _validate_valid_vcf_name(data_path, user, allowed_exts):
file_extensions = (allowed_exts or ()) + VCF_FILE_EXTENSIONS
if not data_path.endswith(file_extensions):
raise ErrorsWarningsException([
'Invalid VCF file format - file path must end with {}'.format(' or '.join(file_extensions))
])

file_to_check = None
file_to_check = data_path
if '*' in data_path:
files = list_files(data_path, user)
if files:
file_to_check = files[0]
elif does_file_exist(data_path, user=user):
file_to_check = data_path

if not file_to_check:
raise ErrorsWarningsException(['Data file or path {} is not found.'.format(path_name or data_path)])
elif allowed_exts and data_path.endswith(allowed_exts):
if not does_file_exist(data_path, user=user):
raise ErrorsWarningsException([f'Data file or path {data_path} is not found.'])
file_to_check = None

return file_to_check

Expand Down
Loading