Skip to content

Commit

Permalink
refact!: use pydantic models for db objects/requests/some responses
Browse files Browse the repository at this point in the history
  • Loading branch information
davidlougheed committed Jul 12, 2023
1 parent 2722349 commit edf753b
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 274 deletions.
102 changes: 60 additions & 42 deletions bento_wes/backends/_wes_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
from typing import Optional, Tuple, Union

from bento_wes import states
from bento_wes.constants import SERVICE_ARTIFACT, RUN_PARAM_FROM_CONFIG
from bento_wes.db import get_db, finish_run, update_run_state_and_commit
from bento_wes.models import Run, RunWithDetails, BentoWorkflowMetadata
from bento_wes.states import STATE_EXECUTOR_ERROR, STATE_SYSTEM_ERROR
from bento_wes.utils import iso_now
from bento_wes.workflows import WorkflowType, WorkflowManager

from .backend_types import Command, ProcessResult
from ..constants import SERVICE_ARTIFACT

__all__ = ["WESBackend"]

Expand Down Expand Up @@ -109,7 +110,7 @@ def _get_supported_types(self) -> Tuple[WorkflowType]:
pass

@abstractmethod
def _get_params_file(self, run: dict) -> str:
def _get_params_file(self, run: Run) -> str:
"""
Returns the name of the params file to use for the workflow run.
:param run: The run description
Expand All @@ -126,35 +127,34 @@ def _serialize_params(self, workflow_params: ParamDict) -> str:
"""
pass

def workflow_path(self, run: dict) -> str:
def workflow_path(self, run: RunWithDetails) -> str:
"""
Gets the local filesystem path to the workflow file specified by a run's workflow URI.
"""
return self._workflow_manager.workflow_path(run["request"]["workflow_url"],
WorkflowType(run["request"]["workflow_type"]))
return self._workflow_manager.workflow_path(run.request.workflow_url, WorkflowType(run.request.workflow_type))

def run_dir(self, run: dict) -> str:
def run_dir(self, run: Run) -> str:
"""
Returns a path to the work directory for executing a run.
"""
return os.path.join(self.tmp_dir, run["run_id"])
return os.path.join(self.tmp_dir, run.run_id)

def _params_path(self, run: dict) -> str:
def _params_path(self, run: Run) -> str:
"""
Returns a path to the workflow parameters file for a run.
"""
return os.path.join(self.run_dir(run), self._get_params_file(run))

@abstractmethod
def _check_workflow(self, run: dict) -> Optional[Tuple[str, str]]:
def _check_workflow(self, run: Run) -> Optional[Tuple[str, str]]:
"""
Checks that a workflow can be executed by the backend via the workflow's URI.
:param run: The run, including a request with the workflow URI
:return: None if the workflow is valid; a tuple of an error message and an error state otherwise
"""
pass

def _check_workflow_wdl(self, run: dict) -> Optional[Tuple[str, str]]:
def _check_workflow_wdl(self, run: RunWithDetails) -> Optional[Tuple[str, str]]:
"""
Checks that a particular WDL workflow is valid.
:param run: The run whose workflow is being checked
Expand Down Expand Up @@ -207,14 +207,14 @@ def _check_workflow_wdl(self, run: dict) -> Optional[Tuple[str, str]]:
STATE_EXECUTOR_ERROR
)

def _check_workflow_and_type(self, run: dict) -> Optional[Tuple[str, str]]:
def _check_workflow_and_type(self, run: RunWithDetails) -> Optional[Tuple[str, str]]:
"""
Checks a workflow file's validity.
:param run: The run specifying the workflow in question
:return: None if the workflow is valid; a tuple of an error message and an error state otherwise
"""

workflow_type: WorkflowType = WorkflowType(run["request"]["workflow_type"])
workflow_type: WorkflowType = WorkflowType(run.request.workflow_type)
if workflow_type not in self._get_supported_types():
raise NotImplementedError(f"The specified WES backend cannot execute workflows of type {workflow_type}")

Expand Down Expand Up @@ -264,7 +264,7 @@ def _update_run_state_and_commit(self, run_id: Union[uuid.UUID, str], state: str
"""
update_run_state_and_commit(self.db, self.db.cursor(), run_id, state, event_bus=self.event_bus)

def _finish_run_and_clean_up(self, run: dict, state: str) -> None:
def _finish_run_and_clean_up(self, run: Run, state: str) -> None:
"""
Performs standard run-finishing operations (updating state, setting end time, etc.) as well as deleting the run
folder if it exists.
Expand All @@ -288,7 +288,12 @@ def _finish_run_and_clean_up(self, run: dict, state: str) -> None:
if not self.debug:
shutil.rmtree(self.run_dir(run), ignore_errors=True)

def _initialize_run_and_get_command(self, run: dict, celery_id, access_token: str) -> tuple[Command, dict] | None:
def _initialize_run_and_get_command(
self,
run: RunWithDetails,
celery_id: int,
access_token: str,
) -> tuple[Command, dict] | None:
"""
Performs "initialization" operations on the run, including setting states, downloading and validating the
workflow file, and generating and logging the workflow-running command.
Expand All @@ -298,7 +303,7 @@ def _initialize_run_and_get_command(self, run: dict, celery_id, access_token: st
:return: The command to execute, if no errors occurred; None otherwise
"""

self._update_run_state_and_commit(run["run_id"], states.STATE_INITIALIZING)
self._update_run_state_and_commit(run.run_id, states.STATE_INITIALIZING)

run_dir = self.run_dir(run)

Expand All @@ -310,10 +315,9 @@ def _initialize_run_and_get_command(self, run: dict, celery_id, access_token: st

c = self.db.cursor()

workflow_id = run["request"]["tags"].get("workflow_id", run["request"]["workflow_url"])

workflow_id = run.request.tags.workflow_id
workflow_params: ParamDict = {
**run["request"]["workflow_params"],
**run.request.workflow_params,
f"{workflow_id}.{PARAM_SECRET_PREFIX}access_token": access_token,

# In export/analysis mode, as we rely on services located in different containers
Expand All @@ -332,6 +336,15 @@ def _initialize_run_and_get_command(self, run: dict, celery_id, access_token: st
# TODO: more special parameters: service URLs, system__run_dir...
}

# Some workflow parameters depend on the WES application configuration
# and need to be added from there.
# The reserved keyword `FROM_CONFIG` is used to detect those inputs.
# All parameters in config are upper case. e.g. drs_url --> DRS_URL
for i in run.request.tags.workflow_metadata.inputs:
if i.value != RUN_PARAM_FROM_CONFIG:
continue
workflow_params[f"{workflow_id}.{i.id}"] = current_app.config.get(i.id, "")

# -- Validate the workflow --------------------------------------------
error = self._check_workflow_and_type(run)
if error is not None:
Expand All @@ -354,9 +367,7 @@ def _initialize_run_and_get_command(self, run: dict, celery_id, access_token: st
pf.write(self._serialize_params(workflow_params))

# -- Create the runner command based on inputs ------------------------
cmd = self._get_command(self.workflow_path(run),
self._params_path(run),
self.run_dir(run))
cmd = self._get_command(self.workflow_path(run), self._params_path(run), self.run_dir(run))

# -- Update run log with command and Celery ID ------------------------
c.execute(
Expand All @@ -366,34 +377,41 @@ def _initialize_run_and_get_command(self, run: dict, celery_id, access_token: st

return cmd, workflow_params

def _build_workflow_outputs(self, run_dir: str, workflow_id: str, workflow_params: dict, c_workflow_metadata: dict):
def _build_workflow_outputs(
self,
run_dir: str,
workflow_id: str,
workflow_params: dict,
workflow_metadata: BentoWorkflowMetadata,
):
self.logger.info(f"Building workflow outputs for workflow ID {workflow_id}")
output_params = w.make_output_params(workflow_id, workflow_params, c_workflow_metadata["inputs"])
output_params = w.make_output_params(workflow_id, workflow_params, [dict(i) for i in workflow_metadata.inputs])

workflow_outputs = {}
for output in c_workflow_metadata["outputs"]:
fo = w.formatted_output(output, output_params)
for output in workflow_metadata.outputs:
o_id = output.id
fo = w.formatted_output(dict(output), output_params)

# Skip optional outputs resulting from optional inputs
if fo is None:
continue

# Rewrite file outputs to include full path to temporary location
if output["type"] == w.WORKFLOW_TYPE_FILE:
workflow_outputs[output["id"]] = os.path.abspath(os.path.join(run_dir, "output", fo))
if output.type == w.WORKFLOW_TYPE_FILE:
workflow_outputs[o_id] = os.path.abspath(os.path.join(run_dir, "output", fo))

elif output["type"] == w.WORKFLOW_TYPE_FILE_ARRAY:
workflow_outputs[output["id"]] = [os.path.abspath(os.path.join(run_dir, wo)) for wo in fo]
elif output.type == w.WORKFLOW_TYPE_FILE_ARRAY:
workflow_outputs[o_id] = [os.path.abspath(os.path.join(run_dir, wo)) for wo in fo]
self.logger.info(
f"Setting workflow output {output['id']} to [{', '.join(workflow_outputs[output['id']])}]")
f"Setting workflow output {o_id} to [{', '.join(workflow_outputs[o_id])}]")

else:
workflow_outputs[output["id"]] = fo
self.logger.info(f"Setting workflow output {output['id']} to {workflow_outputs[output['id']]}")
workflow_outputs[o_id] = fo
self.logger.info(f"Setting workflow output {o_id} to {workflow_outputs[o_id]}")

return workflow_outputs

def _perform_run(self, run: dict, cmd: Command, params_with_extras: ParamDict) -> Optional[ProcessResult]:
def _perform_run(self, run: RunWithDetails, cmd: Command, params_with_extras: ParamDict) -> Optional[ProcessResult]:
"""
Performs a run based on a provided command and returns stdout, stderr, exit code, and whether the process timed
out while running.
Expand All @@ -410,8 +428,8 @@ def _perform_run(self, run: dict, cmd: Command, params_with_extras: ParamDict) -
# -- Start process running the generated command ----------------------
runner_process = subprocess.Popen(
cmd, cwd=self.tmp_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
c.execute("UPDATE runs SET run_log__start_time = ? WHERE id = ?", (iso_now(), run["id"]))
self._update_run_state_and_commit(run["run_id"], states.STATE_RUNNING)
c.execute("UPDATE runs SET run_log__start_time = ? WHERE id = ?", (iso_now(), run.id))
self._update_run_state_and_commit(run.run_id, states.STATE_RUNNING)

# -- Wait for and capture output --------------------------------------

Expand Down Expand Up @@ -441,16 +459,16 @@ def _perform_run(self, run: dict, cmd: Command, params_with_extras: ParamDict) -

# -- Get various Bento-specific data from tags ------------------------

tags = run["request"]["tags"]
tags = run.request.tags

workflow_metadata = tags.get("workflow_metadata", {})
project_id: str = tags["project_id"]
dataset_id: str | None = tags.get("dataset_id")
workflow_metadata = tags.workflow_metadata
project_id: str = tags.project_id
dataset_id: str | None = tags.dataset_id

# -- Update run log with stdout/stderr, exit code ---------------------
# - Explicitly don't commit here; sync with state update
c.execute("UPDATE runs SET run_log__stdout = ?, run_log__stderr = ?, run_log__exit_code = ? WHERE id = ?",
(stdout, stderr, exit_code, run["id"]))
(stdout, stderr, exit_code, run.run_id))

if timed_out:
# TODO: Report error somehow
Expand All @@ -468,7 +486,7 @@ def _perform_run(self, run: dict, cmd: Command, params_with_extras: ParamDict) -

run_dir = self.run_dir(run)
workflow_name = self.get_workflow_name(self.workflow_path(run))
workflow_params: dict = run["request"]["workflow_params"]
workflow_params: dict = run.request.workflow_params

workflow_outputs = self._build_workflow_outputs(run_dir, workflow_name, workflow_params, workflow_metadata)

Expand All @@ -494,7 +512,7 @@ def _perform_run(self, run: dict, cmd: Command, params_with_extras: ParamDict) -

return ProcessResult((stdout, stderr, exit_code, timed_out))

def perform_run(self, run: dict, celery_id, access_token: str) -> Optional[ProcessResult]:
def perform_run(self, run: RunWithDetails, celery_id, access_token: str) -> Optional[ProcessResult]:
"""
Executes a run from start to finish (initialization, startup, and completion / cleanup.)
:param run: The run to execute
Expand Down
5 changes: 3 additions & 2 deletions bento_wes/backends/cromwell_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from bento_wes.backends import WESBackend
from bento_wes.backends.backend_types import Command
from bento_wes.models import Run, RunWithDetails
from bento_wes.workflows import WorkflowType, WES_WORKFLOW_TYPE_WDL


Expand All @@ -24,7 +25,7 @@ def _get_supported_types(self) -> Tuple[WorkflowType]:
"""
return WES_WORKFLOW_TYPE_WDL,

def _get_params_file(self, run: dict) -> str:
def _get_params_file(self, run: Run) -> str:
"""
Returns the name of the params file to use for the workflow run.
:param run: The run description; unused here
Expand All @@ -40,7 +41,7 @@ def _serialize_params(self, workflow_params: dict) -> str:
"""
return json.dumps(workflow_params)

def _check_workflow(self, run: dict) -> Optional[Tuple[str, str]]:
def _check_workflow(self, run: RunWithDetails) -> Optional[Tuple[str, str]]:
return self._check_workflow_wdl(run)

def get_workflow_name(self, workflow_path: str) -> Optional[str]:
Expand Down
4 changes: 3 additions & 1 deletion bento_wes/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import bento_wes
import os

from typing import Literal


__all__ = [
"BENTO_SERVICE_KIND",
Expand All @@ -21,4 +23,4 @@
SERVICE_ID = os.environ.get("SERVICE_ID", ":".join(SERVICE_TYPE.values()))
SERVICE_NAME = "Bento WES"

RUN_PARAM_FROM_CONFIG = "FROM_CONFIG"
RUN_PARAM_FROM_CONFIG: Literal["FROM_CONFIG"] = "FROM_CONFIG"
Loading

0 comments on commit edf753b

Please sign in to comment.