Skip to content

Commit

Permalink
fix: wes_endpoint validation and minor
Browse files Browse the repository at this point in the history
  • Loading branch information
JaeAeich committed Jan 18, 2024
1 parent 62e8b0e commit cab1605
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
run: pylint pro_wes/ setup.py
- name: Check code format with Black
run: black --check setup.py pro_wes/ tests/
- name: Type check with mypy
- name: Check types with mypy
run: mypy setup.py pro_wes/
test:
name: Run tests
Expand Down
11 changes: 4 additions & 7 deletions pro_wes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from pathlib import Path

from typing import Dict
from connexion import App
from flask import current_app
from foca import Foca
Expand Down Expand Up @@ -33,14 +32,12 @@ def _setup_first_start(app: App) -> None:
work_dir = Path(current_app.config.foca.custom.post_runs.storage_path.resolve())
work_dir.mkdir(parents=True, exist_ok=True)
# set service info
service_info: Dict # pylint: disable=unused-variable
try:
service_info = ServiceInfo().get_service_info()
ServiceInfo().get_service_info()
except NotFound:
service_info_data: Dict = current_app.config.foca.custom.service_info.dict()
service_info_object = ServiceInfo()
service_info_object.set_service_info(data=service_info_data)
service_info = service_info_object.get_service_info() # noqa: F841
ServiceInfo().set_service_info(
data=current_app.config.foca.custom.service_info.dict()
)


def run_app(app: App) -> None:
Expand Down
8 changes: 8 additions & 0 deletions pro_wes/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class StorageUnavailableProblem(OSError):
"""Storage unavailable for OS operations."""


class WesEndpointProblem(NotFound):
"""No/few reuirements provided for WES endpoint."""


exceptions = {
Exception: {
"message": "An unexpected error occurred.",
Expand Down Expand Up @@ -96,4 +100,8 @@ class StorageUnavailableProblem(OSError):
"message": "Storage is not accessible.",
"code": "500",
},
WesEndpointProblem: {
"message": "No/few reuirements provided for WES endpoint.",
"code": "500",
},
}
60 changes: 30 additions & 30 deletions pro_wes/ga4gh/wes/workflow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
RunNotFound,
StorageUnavailableProblem,
Unauthorized,
WesEndpointProblem,
)
from pro_wes.ga4gh.wes.models import (
Attachment,
Expand Down Expand Up @@ -91,16 +92,19 @@ def run_workflow(
document.user_id = kwargs.get("user_id", None)

# create run environment & insert run document into run collection
document_stored = self._create_run_environment(document=document)
document_stored: DbDocument = self._create_run_environment(document=document)

# write workflow attachments
self._save_attachments(attachments=document_stored.attachments)

if document_stored.wes_endpoint is None:
raise ValueError("No WES endpoint available.")
# ensure WES endpoint is available
assert document_stored.wes_endpoint is not None, "No WES endpoint available."
assert (
document_stored.wes_endpoint.base_path is not None
), "WES endpoint does not have base_path."

if document_stored.wes_endpoint.base_path is None:
raise ValueError("No WES endpoint base path provided.")
if document_stored.task_id is None:
raise IdsUnavailableProblem

# instantiate WES client
wes_client: WesClient = WesClient(
Expand All @@ -109,9 +113,6 @@ def run_workflow(
token=kwargs.get("jwt", None),
)

if document_stored.task_id is None:
raise ValueError("No task ID available")

# instantiate database connector
db_connector = DbDocumentConnector(
collection=self.db_client,
Expand Down Expand Up @@ -150,8 +151,16 @@ def run_workflow(
run_id=response.run_id,
)

if updated_document_stored.wes_endpoint is None:
raise ValueError("No WES endpoint available.")
# ensure WES endpoint is available
assert (
updated_document_stored.wes_endpoint is not None
), "No WES endpoint available."
assert (
updated_document_stored.wes_endpoint.base_path is not None
), "WES endpoint does not have base_path."

if updated_document_stored.wes_endpoint.run_id is None:
raise WesEndpointProblem

# track workflow progress in background
task__track_run_progress.apply_async(
Expand All @@ -166,12 +175,6 @@ def run_workflow(
soft_time_limit=controller_config.timeout_job,
)

if updated_document_stored.wes_endpoint is None:
raise ValueError("No WES endpoint available.")

if updated_document_stored.wes_endpoint.run_id is None:
raise ValueError("WES endpoint does not have run_id.")

return {"run_id": updated_document_stored.wes_endpoint.run_id}

def list_runs(
Expand Down Expand Up @@ -424,7 +427,7 @@ def _create_run_environment(
# populate document
document.run_log.run_id = run_id
document.task_id = uuid()
document.work_dir = work_dir.as_posix()
document.work_dir = str(work_dir) # type: ignore
document.attachments = self._process_attachments(
work_dir=work_dir,
)
Expand Down Expand Up @@ -455,20 +458,17 @@ def _process_attachments(self, work_dir: Path) -> List[Attachment]:
attachments = []
files = request.files.getlist("workflow_attachment")
for file in files:
if file is not None:
if file.filename is None:
raise ValueError("File does not have a filename.")

attachments.append(
Attachment(
filename=file.filename,
object=file.stream.read(),
path=work_dir
/ self._secure_filename(
name=Path(file.filename),
),
)
assert file is not None, "File object cannot be None."
assert file.filename is not None, "File does not have a filename."

attachments.append(
Attachment(
filename=file.filename,
object=file.stream.read(),
path=work_dir / self._secure_filename(name=Path(file.filename)),
)
)

return attachments

@staticmethod
Expand Down
19 changes: 9 additions & 10 deletions pro_wes/tasks/track_run_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from pro_wes.utils.db import DbDocumentConnector
from pro_wes.ga4gh.wes.client_wes import WesClient
from pro_wes.celery_worker import celery
from pro_wes.exceptions import WesEndpointProblem

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -87,7 +88,7 @@ def task__track_run_progress( # pylint: disable=too-many-statements
try:
# workaround for cwl-WES; add .dict() when cwl-WES response conforms
# to model
response = wes_client.get_run(run_id=remote_run_id)
response: RunLog = wes_client.get_run(run_id=remote_run_id)
except EngineUnavailable:
db_client.update_run_state(state=State.SYSTEM_ERROR.value)
raise
Expand All @@ -105,13 +106,12 @@ def task__track_run_progress( # pylint: disable=too-many-statements
attempt: int = 1
while not run_state.is_finished:
sleep(controller_config.polling_wait)
try:
if document.wes_endpoint is None:
raise ValueError("WES run ID not available.")

if document.wes_endpoint.run_id is None:
raise ValueError("WES run ID not available.")

# ensure WES endpoint is available
assert document.wes_endpoint is not None, "No WES endpoint available."
if document.wes_endpoint.run_id is None:
raise WesEndpointProblem
try:
wes_client.get_run_status(
run_id=document.wes_endpoint.run_id,
timeout=foca_config.custom.defaults.timeout,
Expand All @@ -135,13 +135,12 @@ def task__track_run_progress( # pylint: disable=too-many-statements
run_state = response.state
db_client.update_run_state(state=run_state.value)

assert response.run_id is not None, "WES run ID not available."

# fetch run log and upsert database document
try:
# workaround for cwl-WES; add .dict() when cwl-WES response conforms
# to model
if response.run_id is None:
raise ValueError("WES run ID not available.")

response = wes_client.get_run(run_id=response.run_id)
except EngineUnavailable:
db_client.update_run_state(state=State.SYSTEM_ERROR.value)
Expand Down
2 changes: 1 addition & 1 deletion requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ mongomock>=4.1.2,<5
pylint>=2.15.5,<3
pytest>=7.2.0,<8
python-semantic-release>=7.32.2,<8
mypy
mypy~=1.8
types-setuptools
types-requests

0 comments on commit cab1605

Please sign in to comment.