diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 3774d64..2b5d2c7 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -28,6 +28,8 @@ jobs: run: pylint pro_wes/ setup.py - name: Check code format with Black run: black --check setup.py pro_wes/ tests/ + - name: Check types with mypy + run: mypy setup.py pro_wes/ test: name: Run tests runs-on: ubuntu-latest diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..2ee57fc --- /dev/null +++ b/mypy.ini @@ -0,0 +1,17 @@ +[mypy] + +[mypy-foca.*] +ignore_missing_imports = True +disable_error_code = attr-defined + +[mypy-celery.*] +ignore_missing_imports = True + +[mypy-connexion.*] +ignore_missing_imports = True + +[mypy-pymongo.*] +ignore_missing_imports = True + +[mypy-bson.objectid] +ignore_missing_imports = True \ No newline at end of file diff --git a/pro_wes/app.py b/pro_wes/app.py index 3d9a07f..66aa05a 100644 --- a/pro_wes/app.py +++ b/pro_wes/app.py @@ -29,15 +29,15 @@ def _setup_first_start(app: App) -> None: """Set up application for first start.""" with app.app.app_context(): # create storage directory - work_dir = Path(current_app.config.foca.custom.post_runs.storage_path.resolve()) + _config_foca = current_app.config.foca # type: ignore + work_dir = Path(_config_foca.custom.post_runs.storage_path.resolve()) work_dir.mkdir(parents=True, exist_ok=True) # set service info - service_info = ServiceInfo() try: - service_info = service_info.get_service_info() + ServiceInfo().get_service_info() except NotFound: - service_info.set_service_info( - data=current_app.config.foca.custom.service_info.dict() + ServiceInfo().set_service_info( + data=current_app.config.foca.custom.service_info.dict() # type: ignore ) diff --git a/pro_wes/exceptions.py b/pro_wes/exceptions.py index b7965c7..b4ac405 100644 --- a/pro_wes/exceptions.py +++ b/pro_wes/exceptions.py @@ -39,6 +39,10 @@ class StorageUnavailableProblem(OSError): """Storage unavailable for OS operations.""" +class WesEndpointProblem(NotFound): + """No/few reuirements provided for WES endpoint.""" + + exceptions = { Exception: { "message": "An unexpected error occurred.", @@ -96,4 +100,8 @@ class StorageUnavailableProblem(OSError): "message": "Storage is not accessible.", "code": "500", }, + WesEndpointProblem: { + "message": "No/few reuirements provided for WES endpoint.", + "code": "500", + }, } diff --git a/pro_wes/ga4gh/wes/models.py b/pro_wes/ga4gh/wes/models.py index 8737785..0e0a32f 100644 --- a/pro_wes/ga4gh/wes/models.py +++ b/pro_wes/ga4gh/wes/models.py @@ -38,10 +38,12 @@ class Attachment(BaseModel): Args: filename: Name of the file as indicated in the run request. + object: File object. path: Path to the file on the app's storage system. """ filename: str + object: bytes path: Path @@ -156,7 +158,7 @@ def json_serialized_object_field_valid( # pylint: disable=no-self-argument value does not represent an object/dictionary. """ if value == "" or value == "null" or value is None: - if field.name == "workflow_params": + if field == "workflow_params": raise ValueError("field required") return "{}" try: @@ -171,7 +173,7 @@ def json_serialized_object_field_valid( # pylint: disable=no-self-argument def workflow_type_and_version_supported( # pylint: disable=no-self-argument cls, values: Dict, - ) -> str: + ) -> Dict: """Ensure that workflow type and version are supported by this service instance. @@ -338,7 +340,7 @@ class WesEndpoint(BaseModel): host: str base_path: Optional[str] = "/ga4gh/wes/v1" - run_id: Optional[str] + run_id: Optional[str] = None class DbDocument(BaseModel): diff --git a/pro_wes/ga4gh/wes/service_info.py b/pro_wes/ga4gh/wes/service_info.py index f333811..3b30db2 100644 --- a/pro_wes/ga4gh/wes/service_info.py +++ b/pro_wes/ga4gh/wes/service_info.py @@ -30,7 +30,7 @@ class ServiceInfo: def __init__(self) -> None: """Class constructor.""" self.config: Dict = current_app.config - self.foca_config: Config = self.config.foca + self.foca_config: Config = self.config.foca # type: ignore self.db_client_service_info: Collection = ( self.foca_config.db.dbs["runStore"].collections["service_info"].client ) diff --git a/pro_wes/ga4gh/wes/workflow_runs.py b/pro_wes/ga4gh/wes/workflow_runs.py index 60bd5bf..d3fd27e 100644 --- a/pro_wes/ga4gh/wes/workflow_runs.py +++ b/pro_wes/ga4gh/wes/workflow_runs.py @@ -26,6 +26,7 @@ RunNotFound, StorageUnavailableProblem, Unauthorized, + WesEndpointProblem, ) from pro_wes.ga4gh.wes.models import ( Attachment, @@ -55,7 +56,7 @@ class WorkflowRuns: def __init__(self) -> None: """Class constructor.""" self.config: Dict = current_app.config - self.foca_config: Config = current_app.config.foca + self.foca_config: Config = current_app.config.foca # type: ignore self.db_client: Collection = ( self.foca_config.db.dbs["runStore"].collections["runs"].client ) @@ -91,11 +92,20 @@ def run_workflow( document.user_id = kwargs.get("user_id", None) # create run environment & insert run document into run collection - document_stored = self._create_run_environment(document=document) + document_stored: DbDocument = self._create_run_environment(document=document) # write workflow attachments self._save_attachments(attachments=document_stored.attachments) + # ensure WES endpoint is available + assert document_stored.wes_endpoint is not None, "No WES endpoint available." + assert ( + document_stored.wes_endpoint.base_path is not None + ), "WES endpoint does not have base_path." + + if document_stored.task_id is None: + raise IdsUnavailableProblem + # instantiate WES client wes_client: WesClient = WesClient( host=document_stored.wes_endpoint.host, @@ -136,25 +146,36 @@ def run_workflow( if response.status_code == 403: raise Forbidden raise InternalServerError - document_stored: DbDocument = db_connector.upsert_fields_in_root_object( + updated_document_stored: DbDocument = db_connector.upsert_fields_in_root_object( root="wes_endpoint", run_id=response.run_id, ) + # ensure WES endpoint is available + assert ( + updated_document_stored.wes_endpoint is not None + ), "No WES endpoint available." + assert ( + updated_document_stored.wes_endpoint.base_path is not None + ), "WES endpoint does not have base_path." + + if updated_document_stored.wes_endpoint.run_id is None: + raise WesEndpointProblem + # track workflow progress in background task__track_run_progress.apply_async( None, { "jwt": kwargs.get("jwt", None), - "remote_host": document_stored.wes_endpoint.host, - "remote_base_path": document_stored.wes_endpoint.base_path, - "remote_run_id": document_stored.wes_endpoint.run_id, + "remote_host": updated_document_stored.wes_endpoint.host, + "remote_base_path": updated_document_stored.wes_endpoint.base_path, + "remote_run_id": updated_document_stored.wes_endpoint.run_id, }, - task_id=document_stored.task_id, + task_id=updated_document_stored.task_id, soft_time_limit=controller_config.timeout_job, ) - return {"run_id": document_stored.run_log.run_id} + return {"run_id": updated_document_stored.wes_endpoint.run_id} def list_runs( self, @@ -393,7 +414,7 @@ def _create_run_environment( charset=controller_config.id_charset, length=controller_config.id_length, ) - work_dir = Path(controller_config.storage_path).resolve() / run_id + work_dir: Path = Path(controller_config.storage_path).resolve() / run_id # try to create working directory try: @@ -406,7 +427,7 @@ def _create_run_environment( # populate document document.run_log.run_id = run_id document.task_id = uuid() - document.work_dir = str(work_dir) + document.work_dir = work_dir document.attachments = self._process_attachments( work_dir=work_dir, ) @@ -437,18 +458,17 @@ def _process_attachments(self, work_dir: Path) -> List[Attachment]: attachments = [] files = request.files.getlist("workflow_attachment") for file in files: + assert file is not None, "File object cannot be None." + assert file.filename is not None, "File does not have a filename." + attachments.append( Attachment( filename=file.filename, - object=file.stream, - path=str( - work_dir - / self._secure_filename( - name=Path(file.filename), - ) - ), + object=file.stream.read(), + path=work_dir / self._secure_filename(name=Path(file.filename)), ) ) + return attachments @staticmethod diff --git a/pro_wes/tasks/track_run_progress.py b/pro_wes/tasks/track_run_progress.py index 625750e..2cf9f7b 100644 --- a/pro_wes/tasks/track_run_progress.py +++ b/pro_wes/tasks/track_run_progress.py @@ -18,6 +18,7 @@ from pro_wes.utils.db import DbDocumentConnector from pro_wes.ga4gh.wes.client_wes import WesClient from pro_wes.celery_worker import celery +from pro_wes.exceptions import WesEndpointProblem logger = logging.getLogger(__name__) @@ -28,7 +29,7 @@ ignore_result=True, track_started=True, ) -def task__track_run_progress( +def task__track_run_progress( # pylint: disable=too-many-statements self, remote_host: str, remote_base_path: str, @@ -56,7 +57,7 @@ def task__track_run_progress( pro_wes.exceptions.EngineUnavailable: The remote service is unavailable or is not a valid WES service. """ - foca_config: Config = current_app.config.foca + foca_config: Config = current_app.config.foca # type: ignore controller_config: Dict = foca_config.custom.post_runs logger.info(f"[{self.request.id}] Start processing...") @@ -87,38 +88,43 @@ def task__track_run_progress( try: # workaround for cwl-WES; add .dict() when cwl-WES response conforms # to model - response = wes_client.get_run(run_id=remote_run_id) + response: RunLog = wes_client.get_run(run_id=remote_run_id) except EngineUnavailable: db_client.update_run_state(state=State.SYSTEM_ERROR.value) raise # if not isinstance(response, RunLog): # db_client.update_run_state(state=State.SYSTEM_ERROR.value) # raise EngineProblem("Did not receive expected response.") - response.pop("request", None) + response.pop("request", None) # type: ignore document: DbDocument = db_client.upsert_fields_in_root_object( root="run_log", - **response, + **response.dict(), ) # track workflow run progress run_state: State = State.UNKNOWN attempt: int = 1 while not run_state.is_finished: - sleep(controller_config.polling_wait) + sleep(controller_config.polling_wait) # type: ignore + + # ensure WES endpoint is available + assert document.wes_endpoint is not None, "No WES endpoint available." + if document.wes_endpoint.run_id is None: + raise WesEndpointProblem try: - response = wes_client.get_run_status( + wes_client.get_run_status( run_id=document.wes_endpoint.run_id, timeout=foca_config.custom.defaults.timeout, ) except EngineUnavailable as exc: - if attempt <= controller_config.polling_attempts: + if attempt <= controller_config.polling_attempts: # type: ignore attempt += 1 logger.warning(exc, exc_info=True) continue db_client.update_run_state(state=State.SYSTEM_ERROR.value) raise if not isinstance(response, RunStatus): - if attempt <= controller_config.polling_attempts: + if attempt <= controller_config.polling_attempts: # type: ignore attempt += 1 logger.warning(f"Received error response: {response}") continue @@ -129,6 +135,8 @@ def task__track_run_progress( run_state = response.state db_client.update_run_state(state=run_state.value) + assert response.run_id is not None, "WES run ID not available." + # fetch run log and upsert database document try: # workaround for cwl-WES; add .dict() when cwl-WES response conforms @@ -140,10 +148,11 @@ def task__track_run_progress( # if not isinstance(response, RunLog): # db_client.update_run_state(state=State.SYSTEM_ERROR.value) # raise EngineProblem("Did not receive expected response.") - response.pop("request", None) + response.pop("request", None) # type: ignore document = db_client.upsert_fields_in_root_object( root="run_log", - **response, + **dict(response), ) logger.info(f"[{self.request.id}] Processing completed.") + return self.request.id diff --git a/pro_wes/utils/db.py b/pro_wes/utils/db.py index bbadcf5..8482909 100644 --- a/pro_wes/utils/db.py +++ b/pro_wes/utils/db.py @@ -88,7 +88,7 @@ def upsert_fields_in_root_object( self, root: str, **kwargs, - ) -> Optional[Mapping]: + ) -> DbDocument: """Insert (or update) fields in(to) the same root object and return document. """ diff --git a/requirements_dev.txt b/requirements_dev.txt index 965b86d..fb589bd 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -5,3 +5,7 @@ mongomock>=4.1.2,<5 pylint>=2.15.5,<3 pytest>=7.2.0,<8 python-semantic-release>=7.32.2,<8 +mypy~=1.8 +types-setuptools~=69.0.0.20240106 +types-requests~=2.31.0.20240106 +types-docutils~=0.20.0.20240106 \ No newline at end of file diff --git a/setup.py b/setup.py index 071492e..33a2896 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ setup( name="pro-wes", - version=__version__, # noqa: F821 # pylint: disable=undefined-variable + version=__version__, # type: ignore # noqa: E501 F821 # pylint: disable=undefined-variable license="Apache License 2.0", description="Proxy/gateway GA4GH WES service", long_description=LONG_DESCRIPTION, diff --git a/tests/ga4gh/wes/endpoints/test_service_info.py b/tests/ga4gh/wes/endpoints/test_service_info.py index db68dc9..7d94ba5 100644 --- a/tests/ga4gh/wes/endpoints/test_service_info.py +++ b/tests/ga4gh/wes/endpoints/test_service_info.py @@ -11,7 +11,7 @@ from unittest.mock import MagicMock -from pro_wes.ga4gh.wes.service_info import RegisterServiceInfo +from pro_wes.ga4gh.wes.service_info import RegisterServiceInfo # type: ignore from pro_wes.exceptions import ( NotFound, ValidationError,