From cab16057b39e3b6288cc0d6c4e1c2511722239c6 Mon Sep 17 00:00:00 2001 From: Jae Aeich Date: Thu, 18 Jan 2024 10:33:48 +0530 Subject: [PATCH] fix: wes_endpoint validation and minor --- .github/workflows/main.yml | 2 +- pro_wes/app.py | 11 ++---- pro_wes/exceptions.py | 8 ++++ pro_wes/ga4gh/wes/workflow_runs.py | 60 ++++++++++++++--------------- pro_wes/tasks/track_run_progress.py | 19 +++++---- requirements_dev.txt | 2 +- 6 files changed, 53 insertions(+), 49 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 93d4823..2b5d2c7 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -28,7 +28,7 @@ jobs: run: pylint pro_wes/ setup.py - name: Check code format with Black run: black --check setup.py pro_wes/ tests/ - - name: Type check with mypy + - name: Check types with mypy run: mypy setup.py pro_wes/ test: name: Run tests diff --git a/pro_wes/app.py b/pro_wes/app.py index 9b8b027..21bc254 100644 --- a/pro_wes/app.py +++ b/pro_wes/app.py @@ -2,7 +2,6 @@ from pathlib import Path -from typing import Dict from connexion import App from flask import current_app from foca import Foca @@ -33,14 +32,12 @@ def _setup_first_start(app: App) -> None: work_dir = Path(current_app.config.foca.custom.post_runs.storage_path.resolve()) work_dir.mkdir(parents=True, exist_ok=True) # set service info - service_info: Dict # pylint: disable=unused-variable try: - service_info = ServiceInfo().get_service_info() + ServiceInfo().get_service_info() except NotFound: - service_info_data: Dict = current_app.config.foca.custom.service_info.dict() - service_info_object = ServiceInfo() - service_info_object.set_service_info(data=service_info_data) - service_info = service_info_object.get_service_info() # noqa: F841 + ServiceInfo().set_service_info( + data=current_app.config.foca.custom.service_info.dict() + ) def run_app(app: App) -> None: diff --git a/pro_wes/exceptions.py b/pro_wes/exceptions.py index b7965c7..b4ac405 100644 --- a/pro_wes/exceptions.py +++ b/pro_wes/exceptions.py @@ -39,6 +39,10 @@ class StorageUnavailableProblem(OSError): """Storage unavailable for OS operations.""" +class WesEndpointProblem(NotFound): + """No/few reuirements provided for WES endpoint.""" + + exceptions = { Exception: { "message": "An unexpected error occurred.", @@ -96,4 +100,8 @@ class StorageUnavailableProblem(OSError): "message": "Storage is not accessible.", "code": "500", }, + WesEndpointProblem: { + "message": "No/few reuirements provided for WES endpoint.", + "code": "500", + }, } diff --git a/pro_wes/ga4gh/wes/workflow_runs.py b/pro_wes/ga4gh/wes/workflow_runs.py index cd7aa4c..7356299 100644 --- a/pro_wes/ga4gh/wes/workflow_runs.py +++ b/pro_wes/ga4gh/wes/workflow_runs.py @@ -26,6 +26,7 @@ RunNotFound, StorageUnavailableProblem, Unauthorized, + WesEndpointProblem, ) from pro_wes.ga4gh.wes.models import ( Attachment, @@ -91,16 +92,19 @@ def run_workflow( document.user_id = kwargs.get("user_id", None) # create run environment & insert run document into run collection - document_stored = self._create_run_environment(document=document) + document_stored: DbDocument = self._create_run_environment(document=document) # write workflow attachments self._save_attachments(attachments=document_stored.attachments) - if document_stored.wes_endpoint is None: - raise ValueError("No WES endpoint available.") + # ensure WES endpoint is available + assert document_stored.wes_endpoint is not None, "No WES endpoint available." + assert ( + document_stored.wes_endpoint.base_path is not None + ), "WES endpoint does not have base_path." - if document_stored.wes_endpoint.base_path is None: - raise ValueError("No WES endpoint base path provided.") + if document_stored.task_id is None: + raise IdsUnavailableProblem # instantiate WES client wes_client: WesClient = WesClient( @@ -109,9 +113,6 @@ def run_workflow( token=kwargs.get("jwt", None), ) - if document_stored.task_id is None: - raise ValueError("No task ID available") - # instantiate database connector db_connector = DbDocumentConnector( collection=self.db_client, @@ -150,8 +151,16 @@ def run_workflow( run_id=response.run_id, ) - if updated_document_stored.wes_endpoint is None: - raise ValueError("No WES endpoint available.") + # ensure WES endpoint is available + assert ( + updated_document_stored.wes_endpoint is not None + ), "No WES endpoint available." + assert ( + updated_document_stored.wes_endpoint.base_path is not None + ), "WES endpoint does not have base_path." + + if updated_document_stored.wes_endpoint.run_id is None: + raise WesEndpointProblem # track workflow progress in background task__track_run_progress.apply_async( @@ -166,12 +175,6 @@ def run_workflow( soft_time_limit=controller_config.timeout_job, ) - if updated_document_stored.wes_endpoint is None: - raise ValueError("No WES endpoint available.") - - if updated_document_stored.wes_endpoint.run_id is None: - raise ValueError("WES endpoint does not have run_id.") - return {"run_id": updated_document_stored.wes_endpoint.run_id} def list_runs( @@ -424,7 +427,7 @@ def _create_run_environment( # populate document document.run_log.run_id = run_id document.task_id = uuid() - document.work_dir = work_dir.as_posix() + document.work_dir = str(work_dir) # type: ignore document.attachments = self._process_attachments( work_dir=work_dir, ) @@ -455,20 +458,17 @@ def _process_attachments(self, work_dir: Path) -> List[Attachment]: attachments = [] files = request.files.getlist("workflow_attachment") for file in files: - if file is not None: - if file.filename is None: - raise ValueError("File does not have a filename.") - - attachments.append( - Attachment( - filename=file.filename, - object=file.stream.read(), - path=work_dir - / self._secure_filename( - name=Path(file.filename), - ), - ) + assert file is not None, "File object cannot be None." + assert file.filename is not None, "File does not have a filename." + + attachments.append( + Attachment( + filename=file.filename, + object=file.stream.read(), + path=work_dir / self._secure_filename(name=Path(file.filename)), ) + ) + return attachments @staticmethod diff --git a/pro_wes/tasks/track_run_progress.py b/pro_wes/tasks/track_run_progress.py index a03a7e4..a5be859 100644 --- a/pro_wes/tasks/track_run_progress.py +++ b/pro_wes/tasks/track_run_progress.py @@ -18,6 +18,7 @@ from pro_wes.utils.db import DbDocumentConnector from pro_wes.ga4gh.wes.client_wes import WesClient from pro_wes.celery_worker import celery +from pro_wes.exceptions import WesEndpointProblem logger = logging.getLogger(__name__) @@ -87,7 +88,7 @@ def task__track_run_progress( # pylint: disable=too-many-statements try: # workaround for cwl-WES; add .dict() when cwl-WES response conforms # to model - response = wes_client.get_run(run_id=remote_run_id) + response: RunLog = wes_client.get_run(run_id=remote_run_id) except EngineUnavailable: db_client.update_run_state(state=State.SYSTEM_ERROR.value) raise @@ -105,13 +106,12 @@ def task__track_run_progress( # pylint: disable=too-many-statements attempt: int = 1 while not run_state.is_finished: sleep(controller_config.polling_wait) - try: - if document.wes_endpoint is None: - raise ValueError("WES run ID not available.") - - if document.wes_endpoint.run_id is None: - raise ValueError("WES run ID not available.") + # ensure WES endpoint is available + assert document.wes_endpoint is not None, "No WES endpoint available." + if document.wes_endpoint.run_id is None: + raise WesEndpointProblem + try: wes_client.get_run_status( run_id=document.wes_endpoint.run_id, timeout=foca_config.custom.defaults.timeout, @@ -135,13 +135,12 @@ def task__track_run_progress( # pylint: disable=too-many-statements run_state = response.state db_client.update_run_state(state=run_state.value) + assert response.run_id is not None, "WES run ID not available." + # fetch run log and upsert database document try: # workaround for cwl-WES; add .dict() when cwl-WES response conforms # to model - if response.run_id is None: - raise ValueError("WES run ID not available.") - response = wes_client.get_run(run_id=response.run_id) except EngineUnavailable: db_client.update_run_state(state=State.SYSTEM_ERROR.value) diff --git a/requirements_dev.txt b/requirements_dev.txt index 0281c49..dc0a280 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -5,6 +5,6 @@ mongomock>=4.1.2,<5 pylint>=2.15.5,<3 pytest>=7.2.0,<8 python-semantic-release>=7.32.2,<8 -mypy +mypy~=1.8 types-setuptools types-requests \ No newline at end of file