Skip to content

Commit

Permalink
Merge pull request #4336 from broadinstitute/local-install-loading-ui
Browse files Browse the repository at this point in the history
Local install loading UI
  • Loading branch information
hanars authored Aug 30, 2024
2 parents 10022d5 + 2355e32 commit 26f2af0
Show file tree
Hide file tree
Showing 14 changed files with 299 additions and 137 deletions.
7 changes: 7 additions & 0 deletions seqr/utils/file_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import glob
import gzip
import os
import subprocess # nosec
Expand Down Expand Up @@ -47,6 +48,12 @@ def does_file_exist(file_path, user=None):
return os.path.isfile(file_path)


def list_files(wildcard_path, user):
if is_google_bucket_file_path(wildcard_path):
return get_gs_file_list(wildcard_path, user, check_subfolders=False, allow_missing=True)
return [file_path for file_path in glob.glob(wildcard_path) if os.path.isfile(file_path)]


def file_iter(file_path, byte_range=None, raw_content=False, user=None, **kwargs):
if is_google_bucket_file_path(file_path):
for line in _google_bucket_file_iter(file_path, byte_range=byte_range, raw_content=raw_content, user=user, **kwargs):
Expand Down
36 changes: 15 additions & 21 deletions seqr/utils/search/add_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,21 @@
from django.contrib.auth.models import User
from django.db.models import F

from reference_data.models import GENOME_VERSION_GRCh38, GENOME_VERSION_LOOKUP
from reference_data.models import GENOME_VERSION_LOOKUP
from seqr.models import Sample, Individual, Project
from seqr.utils.communication_utils import send_project_notification, safe_post_to_slack
from seqr.utils.logging_utils import SeqrLogger
from seqr.utils.search.utils import backend_specific_call
from seqr.utils.search.elasticsearch.es_utils import validate_es_index_metadata_and_get_samples
from seqr.views.utils.airtable_utils import AirtableSession, ANVIL_REQUEST_TRACKING_TABLE
from seqr.views.utils.dataset_utils import match_and_update_search_samples, load_mapping_file
from seqr.views.utils.export_utils import write_multiple_files_to_gs
from seqr.views.utils.export_utils import write_multiple_files
from settings import SEQR_SLACK_DATA_ALERTS_NOTIFICATION_CHANNEL, BASE_URL, ANVIL_UI_URL, \
SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL

logger = SeqrLogger(__name__)


SEQR_V3_PEDIGREE_GS_PATH = 'gs://seqr-loading-temp/v3.1'


def _hail_backend_error(*args, **kwargs):
raise ValueError('Adding samples is disabled for the hail backend')

Expand Down Expand Up @@ -117,29 +114,28 @@ def format_email(sample_summary, project_link, num_new_samples):
)


def prepare_data_loading_request(projects: list[Project], sample_type: str, dataset_type: str, data_path: str, user: User,
genome_version: str = GENOME_VERSION_GRCh38, is_internal: bool = False,
def prepare_data_loading_request(projects: list[Project], sample_type: str, dataset_type: str, genome_version: str,
data_path: str, user: User, pedigree_dir: str, raise_pedigree_error: bool = False,
individual_ids: list[str] = None):
project_guids = sorted([p.guid for p in projects])
variables = {
'projects_to_run': project_guids,
'callset_path': data_path,
'sample_source': 'Broad_Internal' if is_internal else 'AnVIL',
'sample_type': sample_type,
'dataset_type': _dag_dataset_type(sample_type, dataset_type),
'reference_genome': GENOME_VERSION_LOOKUP[genome_version],
}
upload_info = _upload_data_loading_files(projects, user, genome_version, sample_type, dataset_type, individual_ids)
return variables, upload_info
file_path = _get_pedigree_path(pedigree_dir, genome_version, sample_type, dataset_type)
_upload_data_loading_files(projects, user, file_path, individual_ids, raise_pedigree_error)
return variables, file_path


def _dag_dataset_type(sample_type: str, dataset_type: str):
return 'GCNV' if dataset_type == Sample.DATASET_TYPE_SV_CALLS and sample_type == Sample.SAMPLE_TYPE_WES \
else dataset_type


def _upload_data_loading_files(projects: list[Project], user: User, genome_version: str, sample_type: str, dataset_type: str,
individual_ids: list[str]):
def _upload_data_loading_files(projects: list[Project], user: User, file_path: str, individual_ids: list[str], raise_error: bool):
file_annotations = OrderedDict({
'Project_GUID': F('family__project__guid'), 'Family_GUID': F('family__guid'),
'Family_ID': F('family__family_id'),
Expand All @@ -155,20 +151,18 @@ def _upload_data_loading_files(projects: list[Project], user: User, genome_versi
for row in data:
data_by_project[row.pop('project')].append(row)

info = []
header = list(file_annotations.keys())
files = [(f'{project_guid}_pedigree', header, rows) for project_guid, rows in data_by_project.items()]
gs_path = _get_gs_pedigree_path(genome_version, sample_type, dataset_type)

try:
write_multiple_files_to_gs(files, gs_path, user, file_format='tsv')
write_multiple_files(files, file_path, user, file_format='tsv')
except Exception as e:
logger.error(f'Uploading Pedigrees to Google Storage failed. Errors: {e}', user, detail={
logger.error(f'Uploading Pedigrees failed. Errors: {e}', user, detail={
project: rows for project, _, rows in files
})
info.append(f'Pedigree files have been uploaded to {gs_path}')

return info
if raise_error:
raise e


def _get_gs_pedigree_path(genome_version: str, sample_type: str, dataset_type: str):
return f'{SEQR_V3_PEDIGREE_GS_PATH}/{GENOME_VERSION_LOOKUP[genome_version]}/{dataset_type}/pedigrees/{sample_type}/'
def _get_pedigree_path(pedigree_dir: str, genome_version: str, sample_type: str, dataset_type: str):
return f'{pedigree_dir}/{GENOME_VERSION_LOOKUP[genome_version]}/{dataset_type}/pedigrees/{sample_type}'
4 changes: 2 additions & 2 deletions seqr/utils/vcf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections import defaultdict

from seqr.utils.middleware import ErrorsWarningsException
from seqr.utils.file_utils import file_iter, does_file_exist, get_gs_file_list
from seqr.utils.file_utils import file_iter, does_file_exist, list_files
from seqr.utils.search.constants import VCF_FILE_EXTENSIONS

BLOCK_SIZE = 65536
Expand Down Expand Up @@ -97,7 +97,7 @@ def validate_vcf_exists(data_path, user, path_name=None, allowed_exts=None):

file_to_check = None
if '*' in data_path:
files = get_gs_file_list(data_path, user, check_subfolders=False, allow_missing=True)
files = list_files(data_path, user)
if files:
file_to_check = files[0]
elif does_file_exist(data_path, user=user):
Expand Down
7 changes: 3 additions & 4 deletions seqr/views/apis/anvil_workspace_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from seqr.views.utils.airtable_utils import AirtableSession, ANVIL_REQUEST_TRACKING_TABLE
from seqr.utils.search.constants import VCF_FILE_EXTENSIONS
from seqr.utils.search.utils import get_search_samples
from seqr.views.utils.airflow_utils import trigger_data_loading
from seqr.views.utils.airflow_utils import trigger_airflow_data_loading
from seqr.views.utils.json_to_orm_utils import create_model_from_json
from seqr.views.utils.json_utils import create_json_response
from seqr.views.utils.file_utils import load_uploaded_file
Expand Down Expand Up @@ -302,10 +302,9 @@ def _trigger_add_workspace_data(project, pedigree_records, user, data_path, samp
success_message = f"""
*{user.email}* requested to load {num_updated_individuals} new{reload_summary} {sample_type} samples ({GENOME_VERSION_LOOKUP.get(project.genome_version)}) from AnVIL workspace *{project.workspace_namespace}/{project.workspace_name}* at
{data_path} to seqr project <{_get_seqr_project_url(project)}|*{project.name}*> (guid: {project.guid})"""
trigger_success = trigger_data_loading(
[project], sample_type, Sample.DATASET_TYPE_VARIANT_CALLS, data_path, user=user, success_message=success_message,
trigger_success = trigger_airflow_data_loading(
[project], sample_type, Sample.DATASET_TYPE_VARIANT_CALLS, project.genome_version, data_path, user=user, success_message=success_message,
success_slack_channel=SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL, error_message=f'ERROR triggering AnVIL loading for project {project.guid}',
genome_version=project.genome_version,
)
AirtableSession(user, base=AirtableSession.ANVIL_BASE).safe_create_records(
ANVIL_REQUEST_TRACKING_TABLE, [{
Expand Down
8 changes: 4 additions & 4 deletions seqr/views/apis/anvil_workspace_api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,10 +774,10 @@ def _assert_valid_operation(self, project, test_add_data=True):
dag_json = {
'projects_to_run': [project.guid],
'callset_path': 'gs://test_bucket/test_path.vcf',
'sample_source': 'AnVIL',
'sample_type': 'WES',
'dataset_type': 'SNV_INDEL',
'reference_genome': genome_version,
'sample_source': 'AnVIL',
}
sample_summary = '3 new'
if test_add_data:
Expand All @@ -786,7 +786,7 @@ def _assert_valid_operation(self, project, test_add_data=True):
*[email protected]* requested to load {sample_summary} WES samples ({version}) from AnVIL workspace *my-seqr-billing/{workspace_name}* at
gs://test_bucket/test_path.vcf to seqr project <http://testserver/project/{guid}/project_page|*{project_name}*> (guid: {guid})
Pedigree files have been uploaded to gs://seqr-loading-temp/v3.1/{version}/SNV_INDEL/pedigrees/WES/
Pedigree files have been uploaded to gs://seqr-loading-temp/v3.1/{version}/SNV_INDEL/pedigrees/WES
DAG LOADING_PIPELINE is triggered with following:
```{dag}```
Expand Down Expand Up @@ -840,7 +840,7 @@ def _test_mv_file_and_triggering_dag_exception(self, url, workspace, sample_data
project = Project.objects.get(**workspace)

self.mock_add_data_utils_logger.error.assert_called_with(
'Uploading Pedigrees to Google Storage failed. Errors: Something wrong while moving the file.',
'Uploading Pedigrees failed. Errors: Something wrong while moving the file.',
self.manager_user, detail={f'{project.guid}_pedigree': sample_data})
self.mock_api_logger.error.assert_not_called()
self.mock_airflow_logger.warning.assert_called_with(
Expand All @@ -858,10 +858,10 @@ def _test_mv_file_and_triggering_dag_exception(self, url, workspace, sample_data
dag=json.dumps({
'projects_to_run': [project.guid],
'callset_path': 'gs://test_bucket/test_path.vcf',
'sample_source': 'AnVIL',
'sample_type': 'WES',
'dataset_type': 'SNV_INDEL',
'reference_genome': genome_version,
'sample_source': 'AnVIL',
}, indent=4),
)

Expand Down
31 changes: 22 additions & 9 deletions seqr/views/apis/data_manager_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
from requests.exceptions import ConnectionError as RequestConnectionError

from seqr.utils.communication_utils import send_project_notification
from seqr.utils.search.add_data_utils import prepare_data_loading_request
from seqr.utils.search.utils import get_search_backend_status, delete_search_backend_data
from seqr.utils.file_utils import file_iter, does_file_exist
from seqr.utils.logging_utils import SeqrLogger
from seqr.utils.middleware import ErrorsWarningsException
from seqr.utils.vcf_utils import validate_vcf_exists

from seqr.views.utils.airflow_utils import trigger_data_loading
from seqr.views.utils.airflow_utils import trigger_airflow_data_loading
from seqr.views.utils.airtable_utils import AirtableSession, LOADABLE_PDO_STATUSES, AVAILABLE_PDO_STATUS
from seqr.views.utils.dataset_utils import load_rna_seq, load_phenotype_prioritization_data_file, RNA_DATA_TYPE_CONFIGS, \
post_process_rna_data, convert_django_meta_to_http_headers
Expand All @@ -32,7 +33,8 @@

from seqr.models import Sample, RnaSample, Individual, Project, PhenotypePrioritization

from settings import KIBANA_SERVER, KIBANA_ELASTICSEARCH_PASSWORD, SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, BASE_URL
from settings import KIBANA_SERVER, KIBANA_ELASTICSEARCH_PASSWORD, SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, BASE_URL, \
LOADING_DATASETS_DIR, PIPELINE_RUNNER_SERVER

logger = SeqrLogger(__name__)

Expand Down Expand Up @@ -507,17 +509,28 @@ def load_data(request):
missing = sorted(set(project_samples.keys()) - {p.guid for p in project_models})
return create_json_response({'error': f'The following projects are invalid: {", ".join(missing)}'}, status=400)

has_airtable = AirtableSession.is_airtable_enabled()
individual_ids = None
if dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS:
if dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS and has_airtable:
individual_ids = _get_valid_project_samples(project_samples, sample_type, request.user)

success_message = f'*{request.user.email}* triggered loading internal {sample_type} {dataset_type} data for {len(projects)} projects'
error_message = f'ERROR triggering internal {sample_type} {dataset_type} loading'
trigger_data_loading(
project_models, sample_type, dataset_type, request_json['filePath'], user=request.user, success_message=success_message,
success_slack_channel=SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, error_message=error_message,
is_internal=True, individual_ids=individual_ids,
loading_args = (
project_models, sample_type, dataset_type, request_json['genomeVersion'], request_json['filePath'],
)
if has_airtable:
success_message = f'*{request.user.email}* triggered loading internal {sample_type} {dataset_type} data for {len(projects)} projects'
error_message = f'ERROR triggering internal {sample_type} {dataset_type} loading'
trigger_airflow_data_loading(
*loading_args, user=request.user, success_message=success_message, error_message=error_message,
success_slack_channel=SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, is_internal=True, individual_ids=individual_ids,
)
else:
request_json, _ = prepare_data_loading_request(
*loading_args, user=request.user, pedigree_dir=LOADING_DATASETS_DIR, raise_pedigree_error=True,
)
response = requests.post(f'{PIPELINE_RUNNER_SERVER}/loading_pipeline_enqueue', json=request_json, timeout=60)
response.raise_for_status()
logger.info('Triggered loading pipeline', request.user, detail=request_json)

return create_json_response({'success': True})

Expand Down
Loading

0 comments on commit 26f2af0

Please sign in to comment.