Skip to content

Commit

Permalink
Merge pull request #50 from bento-platform/feat/dataset-and-injected-…
Browse files Browse the repository at this point in the history
…params

feat!: new workflow system from bento_lib, new authz
  • Loading branch information
davidlougheed authored Nov 14, 2023
2 parents 2117bcf + d10cd58 commit 2b14c13
Show file tree
Hide file tree
Showing 17 changed files with 595 additions and 707 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ghcr.io/bento-platform/bento_base_image:python-debian-2023.10.20 AS base-deps
FROM ghcr.io/bento-platform/bento_base_image:python-debian-2023.11.10 AS base-deps

SHELL ["/bin/bash", "-c"]

Expand Down
45 changes: 6 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,8 @@ Parameter:
"tags": {
"workflow_id": "...", // must correspond to the worflow_params namespace
"workflow_metadata": {
"inputs": [{}], // must correspond to the .wdl input section
"outputs": [{}] // must correspond to the .wdl output section
},
"project_id": "...",
"dataset_id": "..."
"inputs": [{}] // Defines setup for injecting values into the .wdl input section. IDs must align.
}
}
}
```
Expand Down Expand Up @@ -260,40 +257,10 @@ validity is disabled in Bento (see the corresponding Dockerfile).
This script contains the routes definitions as [Flask's Blueprints](https://flask.palletsprojects.com/en/2.0.x/blueprints/)

### runner.py
This script contains the implementation of the workflows execution.
Of interest is the code handling the callbacks and
some service specific routines (i.e. code paths specific to Gohan or Katsu ingestions).

The expected output files are
retrieved using the workflow metadata (Bento-specific) with a possibility
to map output files names to input files names. In the following example,
the property `map_from_input` refers to an `id` in the `inputs` section. This
maps the `value` property to use the input file name for string interpolation.
```json5
{
"workflow_name": {
// ...
},
"inputs": [
{
"id": "vcf_files",
"type": "file[]",
"required": true,
"extensions": [".vcf"]
},
// ...
],
"outputs": [
{
"id": "vcf_gz_files",
"type": "file[]",
"map_from_input": "vcf_files",
"value": "{}.gz"
},
// ...
]
}
```
This script contains the implementation of workflow execution.

The expected inputs come from the workflow metadata (Bento-specific), which
also define how `bento_web` will render the workflow set-up UI.

Another extension to the workflow metadata inputs is used to get values from the WES
configuration variables. The special value `FROM_CONFIG` causes the interpolation
Expand Down
9 changes: 3 additions & 6 deletions bento_wes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,11 @@ def service_info():

try:
if res_tag := subprocess.check_output(["git", "describe", "--tags", "--abbrev=0"]):
res_tag_str = res_tag.decode().rstrip()
info["bento"]["gitTag"] = res_tag_str
info["bento"]["gitTag"] = res_tag.decode().rstrip()
if res_branch := subprocess.check_output(["git", "branch", "--show-current"]):
res_branch_str = res_branch.decode().rstrip()
info["bento"]["gitBranch"] = res_branch_str
info["bento"]["gitBranch"] = res_branch.decode().rstrip()
if res_commit := subprocess.check_output(["git", "rev-parse", "HEAD"]):
res_commit_str = res_commit.decode().rstrip()
info["bento"]["gitCommit"] = res_commit_str
info["bento"]["gitCommit"] = res_commit.decode().rstrip()
except Exception as e:
except_name = type(e).__name__
current_app.logger.info(f"Could not retrieve git information: {str(except_name)}: {e}")
Expand Down
5 changes: 0 additions & 5 deletions bento_wes/authz.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,10 @@

__all__ = [
"authz_middleware",
"PERMISSION_INGEST_DATA",
"PERMISSION_VIEW_RUNS",
]

authz_middleware = FlaskAuthMiddleware(
config.AUTHZ_URL,
debug_mode=config.BENTO_DEBUG,
enabled=config.AUTHZ_ENABLED,
)

PERMISSION_INGEST_DATA = "ingest:data"
PERMISSION_VIEW_RUNS = "view:runs"
166 changes: 65 additions & 101 deletions bento_wes/backends/_wes_backend.py

Large diffs are not rendered by default.

22 changes: 8 additions & 14 deletions bento_wes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@ def _get_from_environ_or_fail(var: str) -> str:
return val


def _to_bool(val: str) -> bool:
return val.strip().lower() in TRUTH_VALUES


TRUTH_VALUES = ("true", "1")

AUTHZ_ENABLED = os.environ.get("AUTHZ_ENABLED", "true").strip().lower() in TRUTH_VALUES

BENTO_DEBUG: bool = os.environ.get(
"BENTO_DEBUG",
os.environ.get("FLASK_DEBUG", "false")
).strip().lower() in TRUTH_VALUES

CELERY_DEBUG: bool = os.environ.get(
"CELERY_DEBUG", ""
).strip().lower() in TRUTH_VALUES
BENTO_DEBUG: bool = _to_bool(os.environ.get("BENTO_DEBUG", os.environ.get("FLASK_DEBUG", "false")))
CELERY_DEBUG: bool = _to_bool(os.environ.get("CELERY_DEBUG", ""))

AUTHZ_URL: str = _get_from_environ_or_fail("BENTO_AUTHZ_SERVICE_URL").strip().rstrip("/")
SERVICE_REGISTRY_URL: str = _get_from_environ_or_fail("SERVICE_REGISTRY_URL").strip().rstrip("/")
Expand All @@ -47,8 +45,7 @@ class Config:
BENTO_URL: str = os.environ.get("BENTO_URL", "http://127.0.0.1:5000/")

BENTO_DEBUG: bool = BENTO_DEBUG
BENTO_VALIDATE_SSL: bool = os.environ.get(
"BENTO_VALIDATE_SSL", str(not BENTO_DEBUG)).strip().lower() in TRUTH_VALUES
BENTO_VALIDATE_SSL: bool = _to_bool(os.environ.get("BENTO_VALIDATE_SSL", str(not BENTO_DEBUG)))

DATABASE: str = os.environ.get("DATABASE", "bento_wes.db")
SERVICE_ID = SERVICE_ID
Expand All @@ -74,10 +71,7 @@ class Config:
WES_CLIENT_ID: str = os.environ.get("WES_CLIENT_ID", "bento_wes")
WES_CLIENT_SECRET: str = os.environ.get("WES_CLIENT_SECRET", "")

# Other services, used for interpolating workflow variables and (
DRS_URL: str = os.environ.get("DRS_URL", "").strip().rstrip("/")
GOHAN_URL: str = os.environ.get("GOHAN_URL", "").strip().rstrip("/")
KATSU_URL: str = os.environ.get("KATSU_URL", "").strip().rstrip("/")
# Service registry URL, used for looking up service kinds to inject as workflow input
SERVICE_REGISTRY_URL: str = SERVICE_REGISTRY_URL

# VEP-related configuration
Expand Down
5 changes: 0 additions & 5 deletions bento_wes/constants.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import bento_wes
import os

from typing import Literal


__all__ = [
"BENTO_SERVICE_KIND",
"SERVICE_ARTIFACT",
"SERVICE_TYPE",
"SERVICE_ID",
"SERVICE_NAME",
"RUN_PARAM_FROM_CONFIG",
]

BENTO_SERVICE_KIND = "wes"
Expand All @@ -22,5 +19,3 @@
}
SERVICE_ID = os.environ.get("SERVICE_ID", ":".join(SERVICE_TYPE.values()))
SERVICE_NAME = "Bento WES"

RUN_PARAM_FROM_CONFIG: Literal["FROM_CONFIG"] = "FROM_CONFIG"
44 changes: 2 additions & 42 deletions bento_wes/models.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from bento_lib.workflows.models import WorkflowDefinition
from datetime import datetime
from pydantic import BaseModel, ConfigDict, AnyUrl, Json
from typing import Literal

__all__ = [
"BentoWorkflowInput",
"BentoWorkflowInputWithValue",
"BentoWorkflowOutput",
"BentoWorkflowMetadata",
"BentoRunRequestTags",
"RunRequest",
"RunLog",
Expand All @@ -16,48 +13,11 @@
]


class BentoWorkflowInput(BaseModel):
id: str
type: Literal["string", "string[]", "number", "number[]", "enum", "enum[]", "file", "file[]"]
required: bool = False,
extensions: list[str] | None = None


class BentoWorkflowInputWithFileExtensions(BentoWorkflowInput):
type: Literal["file", "file[]"]
extensions: list[str] | None = None


class BentoWorkflowInputWithValue(BentoWorkflowInput):
value: Literal["FROM_CONFIG"]
hidden: bool = True


class BentoWorkflowOutput(BaseModel):
id: str
type: Literal["string", "string[]", "number", "number[]", "enum", "enum[]", "file", "file[]"]
value: str


# TODO: Move to bento_lib
class BentoWorkflowMetadata(BaseModel):
name: str
description: str
action: Literal["ingestion", "analysis", "export"]
data_type: str | None = None
file: str
inputs: list[BentoWorkflowInputWithValue | BentoWorkflowInputWithFileExtensions | BentoWorkflowInput]
outputs: list[BentoWorkflowOutput]


class BentoRunRequestTags(BaseModel):
model_config = ConfigDict(extra="allow")

workflow_id: str
workflow_metadata: BentoWorkflowMetadata

project_id: str
dataset_id: str | None = None
workflow_metadata: WorkflowDefinition


class RunRequest(BaseModel):
Expand Down
7 changes: 4 additions & 3 deletions bento_wes/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def run_workflow(self, run_id: uuid.UUID):
debug=current_app.config["BENTO_DEBUG"],
)

access_token: str = ""
secrets: dict[str, str] = {"access_token": ""}

# If we have credentials, obtain access token for use inside workflow to ingest data
try:
if (client_id := current_app.config["WES_CLIENT_ID"]) and \
Expand All @@ -70,7 +71,7 @@ def run_workflow(self, run_id: uuid.UUID):
"client_id": client_id,
"client_secret": client_secret,
})
access_token = token_res.json()["access_token"]
secrets["access_token"] = token_res.json()["access_token"]
else:
logger.warning(
"Missing WES credentials: WES_CLIENT_ID and/or WES_CLIENT_SECRET; setting job access token to ''")
Expand All @@ -83,7 +84,7 @@ def run_workflow(self, run_id: uuid.UUID):
# Perform the run
try:
logger.info("Starting workflow execution...")
backend.perform_run(run, self.request.id, access_token)
backend.perform_run(run, self.request.id, secrets)
except Exception as e:
# Intercept any uncaught exceptions and finish with an error state
logger.error(f"Uncaught exception while performing run: {type(e).__name__} {e}")
Expand Down
Loading

0 comments on commit 2b14c13

Please sign in to comment.