Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: add type checks #88

Merged
merged 6 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
run: pylint pro_wes/ setup.py
- name: Check code format with Black
run: black --check setup.py pro_wes/ tests/
- name: Type check with mypy
- name: Check types with mypy
run: mypy setup.py pro_wes/
test:
name: Run tests
Expand Down
11 changes: 4 additions & 7 deletions pro_wes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from pathlib import Path

from typing import Dict
from connexion import App
from flask import current_app
from foca import Foca
Expand Down Expand Up @@ -33,14 +32,12 @@ def _setup_first_start(app: App) -> None:
work_dir = Path(current_app.config.foca.custom.post_runs.storage_path.resolve())
work_dir.mkdir(parents=True, exist_ok=True)
# set service info
service_info: Dict # pylint: disable=unused-variable
try:
service_info = ServiceInfo().get_service_info()
ServiceInfo().get_service_info()
except NotFound:
service_info_data: Dict = current_app.config.foca.custom.service_info.dict()
service_info_object = ServiceInfo()
service_info_object.set_service_info(data=service_info_data)
service_info = service_info_object.get_service_info() # noqa: F841
ServiceInfo().set_service_info(
data=current_app.config.foca.custom.service_info.dict()
)


def run_app(app: App) -> None:
Expand Down
8 changes: 8 additions & 0 deletions pro_wes/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class StorageUnavailableProblem(OSError):
"""Storage unavailable for OS operations."""


class WesEndpointProblem(NotFound):
"""No/few reuirements provided for WES endpoint."""


exceptions = {
Exception: {
"message": "An unexpected error occurred.",
Expand Down Expand Up @@ -96,4 +100,8 @@ class StorageUnavailableProblem(OSError):
"message": "Storage is not accessible.",
"code": "500",
},
WesEndpointProblem: {
"message": "No/few reuirements provided for WES endpoint.",
"code": "500",
},
}
60 changes: 30 additions & 30 deletions pro_wes/ga4gh/wes/workflow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
RunNotFound,
StorageUnavailableProblem,
Unauthorized,
WesEndpointProblem,
)
from pro_wes.ga4gh.wes.models import (
Attachment,
Expand Down Expand Up @@ -91,16 +92,19 @@ def run_workflow(
document.user_id = kwargs.get("user_id", None)

# create run environment & insert run document into run collection
document_stored = self._create_run_environment(document=document)
document_stored: DbDocument = self._create_run_environment(document=document)

# write workflow attachments
self._save_attachments(attachments=document_stored.attachments)

if document_stored.wes_endpoint is None:
raise ValueError("No WES endpoint available.")
# ensure WES endpoint is available
assert document_stored.wes_endpoint is not None, "No WES endpoint available."
assert (
document_stored.wes_endpoint.base_path is not None
), "WES endpoint does not have base_path."

if document_stored.wes_endpoint.base_path is None:
raise ValueError("No WES endpoint base path provided.")
if document_stored.task_id is None:
raise IdsUnavailableProblem

# instantiate WES client
wes_client: WesClient = WesClient(
Expand All @@ -109,9 +113,6 @@ def run_workflow(
token=kwargs.get("jwt", None),
)

if document_stored.task_id is None:
raise ValueError("No task ID available")

# instantiate database connector
db_connector = DbDocumentConnector(
collection=self.db_client,
Expand Down Expand Up @@ -150,8 +151,16 @@ def run_workflow(
run_id=response.run_id,
)

if updated_document_stored.wes_endpoint is None:
raise ValueError("No WES endpoint available.")
# ensure WES endpoint is available
assert (
updated_document_stored.wes_endpoint is not None
), "No WES endpoint available."
assert (
updated_document_stored.wes_endpoint.base_path is not None
), "WES endpoint does not have base_path."

if updated_document_stored.wes_endpoint.run_id is None:
raise WesEndpointProblem

# track workflow progress in background
task__track_run_progress.apply_async(
Expand All @@ -166,12 +175,6 @@ def run_workflow(
soft_time_limit=controller_config.timeout_job,
)

if updated_document_stored.wes_endpoint is None:
raise ValueError("No WES endpoint available.")

if updated_document_stored.wes_endpoint.run_id is None:
raise ValueError("WES endpoint does not have run_id.")

return {"run_id": updated_document_stored.wes_endpoint.run_id}

def list_runs(
Expand Down Expand Up @@ -424,7 +427,7 @@ def _create_run_environment(
# populate document
document.run_log.run_id = run_id
document.task_id = uuid()
document.work_dir = work_dir.as_posix()
document.work_dir = str(work_dir) # type: ignore
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
document.attachments = self._process_attachments(
work_dir=work_dir,
)
Expand Down Expand Up @@ -455,20 +458,17 @@ def _process_attachments(self, work_dir: Path) -> List[Attachment]:
attachments = []
files = request.files.getlist("workflow_attachment")
for file in files:
if file is not None:
if file.filename is None:
raise ValueError("File does not have a filename.")

attachments.append(
Attachment(
filename=file.filename,
object=file.stream.read(),
path=work_dir
/ self._secure_filename(
name=Path(file.filename),
),
)
assert file is not None, "File object cannot be None."
assert file.filename is not None, "File does not have a filename."

attachments.append(
Attachment(
filename=file.filename,
object=file.stream.read(),
path=work_dir / self._secure_filename(name=Path(file.filename)),
)
)

return attachments

@staticmethod
Expand Down
19 changes: 9 additions & 10 deletions pro_wes/tasks/track_run_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from pro_wes.utils.db import DbDocumentConnector
from pro_wes.ga4gh.wes.client_wes import WesClient
from pro_wes.celery_worker import celery
from pro_wes.exceptions import WesEndpointProblem

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -87,7 +88,7 @@ def task__track_run_progress( # pylint: disable=too-many-statements
try:
# workaround for cwl-WES; add .dict() when cwl-WES response conforms
# to model
response = wes_client.get_run(run_id=remote_run_id)
response: RunLog = wes_client.get_run(run_id=remote_run_id)
except EngineUnavailable:
db_client.update_run_state(state=State.SYSTEM_ERROR.value)
raise
Expand All @@ -105,13 +106,12 @@ def task__track_run_progress( # pylint: disable=too-many-statements
attempt: int = 1
while not run_state.is_finished:
sleep(controller_config.polling_wait)
try:
if document.wes_endpoint is None:
raise ValueError("WES run ID not available.")

if document.wes_endpoint.run_id is None:
raise ValueError("WES run ID not available.")

# ensure WES endpoint is available
assert document.wes_endpoint is not None, "No WES endpoint available."
if document.wes_endpoint.run_id is None:
raise WesEndpointProblem
try:
wes_client.get_run_status(
run_id=document.wes_endpoint.run_id,
timeout=foca_config.custom.defaults.timeout,
Expand All @@ -135,13 +135,12 @@ def task__track_run_progress( # pylint: disable=too-many-statements
run_state = response.state
db_client.update_run_state(state=run_state.value)

assert response.run_id is not None, "WES run ID not available."

# fetch run log and upsert database document
try:
# workaround for cwl-WES; add .dict() when cwl-WES response conforms
# to model
if response.run_id is None:
raise ValueError("WES run ID not available.")

response = wes_client.get_run(run_id=response.run_id)
except EngineUnavailable:
db_client.update_run_state(state=State.SYSTEM_ERROR.value)
Expand Down
2 changes: 1 addition & 1 deletion requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ mongomock>=4.1.2,<5
pylint>=2.15.5,<3
pytest>=7.2.0,<8
python-semantic-release>=7.32.2,<8
mypy
mypy~=1.8
types-setuptools
types-requests