Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: add type checks #88

Merged
merged 6 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ jobs:
run: pylint pro_wes/ setup.py
- name: Check code format with Black
run: black --check setup.py pro_wes/ tests/
- name: Check types with mypy
run: mypy setup.py pro_wes/
test:
name: Run tests
runs-on: ubuntu-latest
Expand Down
6 changes: 6 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[mypy]
ignore_missing_imports = True
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
disable_error_code = attr-defined
disable_error_code = no-redef

# [mypy-pro_wes.*]
5 changes: 2 additions & 3 deletions pro_wes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ def _setup_first_start(app: App) -> None:
work_dir = Path(current_app.config.foca.custom.post_runs.storage_path.resolve())
work_dir.mkdir(parents=True, exist_ok=True)
# set service info
service_info = ServiceInfo()
try:
service_info = service_info.get_service_info()
ServiceInfo().get_service_info()
except NotFound:
service_info.set_service_info(
ServiceInfo().set_service_info(
data=current_app.config.foca.custom.service_info.dict()
)

Expand Down
8 changes: 8 additions & 0 deletions pro_wes/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class StorageUnavailableProblem(OSError):
"""Storage unavailable for OS operations."""


class WesEndpointProblem(NotFound):
"""No/few reuirements provided for WES endpoint."""


exceptions = {
Exception: {
"message": "An unexpected error occurred.",
Expand Down Expand Up @@ -96,4 +100,8 @@ class StorageUnavailableProblem(OSError):
"message": "Storage is not accessible.",
"code": "500",
},
WesEndpointProblem: {
"message": "No/few reuirements provided for WES endpoint.",
"code": "500",
},
}
6 changes: 4 additions & 2 deletions pro_wes/ga4gh/wes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ class Attachment(BaseModel):

Args:
filename: Name of the file as indicated in the run request.
object: File object.
path: Path to the file on the app's storage system.
"""

filename: str
object: bytes
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
path: Path


Expand Down Expand Up @@ -171,7 +173,7 @@ def json_serialized_object_field_valid( # pylint: disable=no-self-argument
def workflow_type_and_version_supported( # pylint: disable=no-self-argument
cls,
values: Dict,
) -> str:
) -> Dict:
"""Ensure that workflow type and version are supported by this service
instance.

Expand Down Expand Up @@ -338,7 +340,7 @@ class WesEndpoint(BaseModel):

host: str
base_path: Optional[str] = "/ga4gh/wes/v1"
run_id: Optional[str]
run_id: Optional[str] = None


class DbDocument(BaseModel):
Expand Down
50 changes: 35 additions & 15 deletions pro_wes/ga4gh/wes/workflow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
RunNotFound,
StorageUnavailableProblem,
Unauthorized,
WesEndpointProblem,
)
from pro_wes.ga4gh.wes.models import (
Attachment,
Expand Down Expand Up @@ -91,11 +92,20 @@ def run_workflow(
document.user_id = kwargs.get("user_id", None)

# create run environment & insert run document into run collection
document_stored = self._create_run_environment(document=document)
document_stored: DbDocument = self._create_run_environment(document=document)

# write workflow attachments
self._save_attachments(attachments=document_stored.attachments)

# ensure WES endpoint is available
assert document_stored.wes_endpoint is not None, "No WES endpoint available."
assert (
document_stored.wes_endpoint.base_path is not None
), "WES endpoint does not have base_path."

if document_stored.task_id is None:
raise IdsUnavailableProblem

# instantiate WES client
wes_client: WesClient = WesClient(
host=document_stored.wes_endpoint.host,
Expand Down Expand Up @@ -136,25 +146,36 @@ def run_workflow(
if response.status_code == 403:
raise Forbidden
raise InternalServerError
document_stored: DbDocument = db_connector.upsert_fields_in_root_object(
updated_document_stored: DbDocument = db_connector.upsert_fields_in_root_object(
root="wes_endpoint",
run_id=response.run_id,
)

# ensure WES endpoint is available
assert (
updated_document_stored.wes_endpoint is not None
), "No WES endpoint available."
assert (
updated_document_stored.wes_endpoint.base_path is not None
), "WES endpoint does not have base_path."

if updated_document_stored.wes_endpoint.run_id is None:
raise WesEndpointProblem

# track workflow progress in background
task__track_run_progress.apply_async(
None,
{
"jwt": kwargs.get("jwt", None),
"remote_host": document_stored.wes_endpoint.host,
"remote_base_path": document_stored.wes_endpoint.base_path,
"remote_run_id": document_stored.wes_endpoint.run_id,
"remote_host": updated_document_stored.wes_endpoint.host,
"remote_base_path": updated_document_stored.wes_endpoint.base_path,
"remote_run_id": updated_document_stored.wes_endpoint.run_id,
},
task_id=document_stored.task_id,
task_id=updated_document_stored.task_id,
soft_time_limit=controller_config.timeout_job,
)

return {"run_id": document_stored.run_log.run_id}
return {"run_id": updated_document_stored.wes_endpoint.run_id}

def list_runs(
self,
Expand Down Expand Up @@ -406,7 +427,7 @@ def _create_run_environment(
# populate document
document.run_log.run_id = run_id
document.task_id = uuid()
document.work_dir = str(work_dir)
document.work_dir = str(work_dir) # type: ignore
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
document.attachments = self._process_attachments(
work_dir=work_dir,
)
Expand Down Expand Up @@ -437,18 +458,17 @@ def _process_attachments(self, work_dir: Path) -> List[Attachment]:
attachments = []
files = request.files.getlist("workflow_attachment")
for file in files:
assert file is not None, "File object cannot be None."
assert file.filename is not None, "File does not have a filename."

attachments.append(
Attachment(
filename=file.filename,
object=file.stream,
path=str(
work_dir
/ self._secure_filename(
name=Path(file.filename),
)
),
object=file.stream.read(),
path=work_dir / self._secure_filename(name=Path(file.filename)),
)
)

return attachments

@staticmethod
Expand Down
19 changes: 14 additions & 5 deletions pro_wes/tasks/track_run_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from pro_wes.utils.db import DbDocumentConnector
from pro_wes.ga4gh.wes.client_wes import WesClient
from pro_wes.celery_worker import celery
from pro_wes.exceptions import WesEndpointProblem

logger = logging.getLogger(__name__)

Expand All @@ -28,7 +29,7 @@
ignore_result=True,
track_started=True,
)
def task__track_run_progress(
def task__track_run_progress( # pylint: disable=too-many-statements
self,
remote_host: str,
remote_base_path: str,
Expand Down Expand Up @@ -87,7 +88,7 @@ def task__track_run_progress(
try:
# workaround for cwl-WES; add .dict() when cwl-WES response conforms
# to model
response = wes_client.get_run(run_id=remote_run_id)
response: RunLog = wes_client.get_run(run_id=remote_run_id)
except EngineUnavailable:
db_client.update_run_state(state=State.SYSTEM_ERROR.value)
raise
Expand All @@ -97,16 +98,21 @@ def task__track_run_progress(
response.pop("request", None)
document: DbDocument = db_client.upsert_fields_in_root_object(
root="run_log",
**response,
**dict(response),
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
)

# track workflow run progress
run_state: State = State.UNKNOWN
attempt: int = 1
while not run_state.is_finished:
sleep(controller_config.polling_wait)

# ensure WES endpoint is available
assert document.wes_endpoint is not None, "No WES endpoint available."
if document.wes_endpoint.run_id is None:
raise WesEndpointProblem
try:
response = wes_client.get_run_status(
wes_client.get_run_status(
run_id=document.wes_endpoint.run_id,
timeout=foca_config.custom.defaults.timeout,
)
Expand All @@ -129,6 +135,8 @@ def task__track_run_progress(
run_state = response.state
db_client.update_run_state(state=run_state.value)

assert response.run_id is not None, "WES run ID not available."

# fetch run log and upsert database document
try:
# workaround for cwl-WES; add .dict() when cwl-WES response conforms
Expand All @@ -143,7 +151,8 @@ def task__track_run_progress(
response.pop("request", None)
document = db_client.upsert_fields_in_root_object(
root="run_log",
**response,
**dict(response),
)

logger.info(f"[{self.request.id}] Processing completed.")
return self.request.id
2 changes: 1 addition & 1 deletion pro_wes/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def upsert_fields_in_root_object(
self,
root: str,
**kwargs,
) -> Optional[Mapping]:
) -> DbDocument:
"""Insert (or update) fields in(to) the same root object and return
document.
"""
Expand Down
3 changes: 3 additions & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ mongomock>=4.1.2,<5
pylint>=2.15.5,<3
pytest>=7.2.0,<8
python-semantic-release>=7.32.2,<8
mypy~=1.8
types-setuptools
types-requests
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

setup(
name="pro-wes",
version=__version__, # noqa: F821 # pylint: disable=undefined-variable
version=__version__, # type: ignore # noqa: E501 F821 # pylint: disable=undefined-variable
uniqueg marked this conversation as resolved.
Show resolved Hide resolved
license="Apache License 2.0",
description="Proxy/gateway GA4GH WES service",
long_description=LONG_DESCRIPTION,
Expand Down