Skip to content
This repository has been archived by the owner on Jun 11, 2023. It is now read-only.

Commit

Permalink
Merge pull request #9 from Sage-Bionetworks-Workflows/bgrande/ORCA-72…
Browse files Browse the repository at this point in the history
…/tower-wf-submission

[ORCA-72] Create function for launching workflows on Tower
  • Loading branch information
Bruno Grande authored Nov 1, 2022
2 parents 1b397b1 + 80e4d29 commit f081f52
Show file tree
Hide file tree
Showing 10 changed files with 1,229 additions and 91 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ __pycache__/*
.vscode
tags
.env
*.ipynb

# Package files
*.egg
Expand Down
769 changes: 688 additions & 81 deletions Pipfile.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ dev =
sphinx-rtd-theme
black
isort
jupyterlab

[options.entry_points]
# Add here console scripts like:
Expand Down
1 change: 1 addition & 0 deletions src/sagetasks/nextflowtower/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def request(self, method: str, endpoint: str, **kwargs) -> dict:
url = self.tower_api_base_url + endpoint
kwargs["headers"] = {"Authorization": f"Bearer {self.tower_token}"}
response = requests.request(method, url, **kwargs)
response.raise_for_status()
try:
result = response.json()
except json.decoder.JSONDecodeError:
Expand Down
272 changes: 272 additions & 0 deletions src/sagetasks/nextflowtower/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
from datetime import datetime
from typing import Mapping, Optional, Sequence

from sagetasks.nextflowtower.client import TowerClient
from sagetasks.utils import dedup, update_dict

ENDPOINTS = {
"tower.nf": "https://tower.nf/api",
"sage": "https://tower.sagebionetworks.org/api",
"sage-dev": "https://tower-dev.sagebionetworks.org/api",
}


class TowerUtils:
def __init__(
self, client_args: Mapping, workspace_id: Optional[int] = None
) -> None:
"""Initialize TowerUtils for interacting with the Tower API.
Args:
client_args (Mapping): Bundled client arguments, which can
be generated with the `TowerUtils.bundle_client_args()`
static method.
workspace (int, optional): Tower workspace identifier.
Defaults to None with no opened workspace.
"""
# TODO: Validate access token (by attempting a simple auth'ed request)
self.client = TowerClient(**client_args)
self._workspace: Optional[int] = None
self.open_workspace(workspace_id)

@property
def workspace(self) -> int:
"""Retrieve default workspace for workspace-related requests.
Raises:
ValueError: If a workspace isn't open. You can open a workspace
when initializing `TowerUtils()` or with `open_workspace()`.
Returns:
int: Tower workspace identifier.
"""
if self._workspace is None:
raise ValueError("Workspace not opened yet. Use `open_workspace()`.")
return self._workspace

def open_workspace(self, workspace_id: Optional[int]) -> None:
"""Configure default workspace for workspace-related requests.
Args:
workspace_id (Optional[int]): Tower workspace identifier.
If `None`, this will close any opened workspace.
"""
if workspace_id is None:
self.close_workspace()
# TODO: Validate workspace
else:
self._workspace = int(workspace_id)

def close_workspace(self) -> None:
"""Clear default workspace for workspace-related requests."""
self._workspace = None

def init_params(self) -> dict:
"""Initialize query-string parameters with workspace ID.
Returns:
dict: Parameters for workspace-related requests,
which can be passed to `requests.request`.
"""
return {"workspaceId": self.workspace}

@staticmethod
def bundle_client_args(
auth_token: Optional[str] = None,
platform: Optional[str] = "sage",
endpoint: Optional[str] = None,
**kwargs,
) -> dict:
"""Bundle the information needed for authenticating a Tower client.
Args:
auth_token (str, optional): Tower access token, which gets
included in the HTTP header as an Authorization Bearer
token. Defaults to None, which prompts the use of the
`NXF_TOWER_TOKEN` environment variable.
platform (str, optional): Compact identifier for commonly
used platforms to populate the endpoint. Options include
"sage" and "tower.nf". Defaults to "sage".
endpoint (str, optional): Full Tower API URL. This argument
will override the value associated with the specified
`platform`. Defaults to None.
Raises:
ValueError: If `platform` is not valid.
ValueError: If `platform` and `endpoint` are not provided.
Returns:
dict: Bundle of Tower client arguments.
"""
if platform is not None and platform not in ENDPOINTS:
raise ValueError(f"`platform` must be among {list(ENDPOINTS)} (or None).")
if platform and not endpoint and platform in ENDPOINTS:
endpoint = ENDPOINTS[platform]
if endpoint is None:
raise ValueError("You must provide a value to `platform` or `endpoint`.")
client_args = dict(tower_token=auth_token, tower_api_url=endpoint, **kwargs)
return client_args

def get_compute_env(self, compute_env_id: str) -> dict:
"""Retrieve information about a given compute environment.
Args:
compute_env_id (str): Compute environment alphanumerical ID.
Returns:
dict: Information about the compute environment.
"""
endpoint = f"/compute-envs/{compute_env_id}"
params = self.init_params()
response = self.client.request("GET", endpoint, params=params)
compute_env = response["computeEnv"]
return compute_env

def get_workflow(self, workflow_id: str) -> dict:
"""Retrieve information about a given workflow run.
Args:
workflow_id (str): Workflow run alphanumerical ID.
Returns:
dict: Information about the workflow run.
"""
endpoint = f"/workflow/{workflow_id}"
params = self.init_params()
response = self.client.request("GET", endpoint, params=params)
return response

def init_launch_workflow_data(self, compute_env_id: str) -> dict:
"""Initialize request for `/workflow/launch` endpoint.
You can use this method to modify the contents before
passing the adjusted payload to the `init_data` argument
on the `launch_workflow()` method.
Args:
compute_env_id (str): Compute environment alphanumerical ID.
Raises:
ValueError: If the compute environment is not available.
Returns:
dict: Initial request for `/workflow/launch` endpoint.
"""
# Retrieve compute environment for default values
compute_env = self.get_compute_env(compute_env_id)
if compute_env["status"] != "AVAILABLE":
ce_name = compute_env["name"]
raise ValueError(f"The compute environment ({ce_name}) is not available.")
ce_config = compute_env["config"]
# Replicating date format in requests made by Tower frontend
now_utc = datetime.now().isoformat()[:-3] + "Z"
data = {
"launch": {
"computeEnvId": compute_env_id,
"configProfiles": [],
"configText": None,
"dateCreated": now_utc,
"entryName": None,
"id": None,
"mainScript": None,
"paramsText": None,
"pipeline": None,
"postRunScript": ce_config["postRunScript"],
"preRunScript": ce_config["preRunScript"],
"pullLatest": None,
"revision": None,
"runName": None,
"schemaName": None,
"stubRun": None,
"towerConfig": None,
"userSecrets": [],
"workDir": ce_config["workDir"],
"workspaceSecrets": [],
}
}
return data

def launch_workflow(
self,
compute_env_id: str,
pipeline: str,
revision: Optional[str] = None,
params_yaml: Optional[str] = None,
params_json: Optional[str] = None,
nextflow_config: Optional[str] = None,
run_name: Optional[str] = None,
work_dir: Optional[str] = None,
profiles: Optional[Sequence] = (),
user_secrets: Optional[Sequence] = (),
workspace_secrets: Optional[Sequence] = (),
pre_run_script: Optional[str] = None,
init_data: Optional[Mapping] = None,
) -> dict:
"""Launch a workflow using the given compute environment.
This method will use any opened workspace if available.
Args:
compute_env_id (str): Compute environment ID where the
execution will be launched.
pipeline (str): Nextflow pipeline URL. This can be a GitHub
shorthand like `nf-core/rnaseq`.
revision (str, optional): A valid repository commit ID (SHA),
tag, or branch name. Defaults to None.
params_yaml (str, optional): Pipeline parameters in YAML format.
Defaults to None.
params_json (str, optional): Pipeline parameters in JSON format.
Defaults to None.
nextflow_config (str, optional): Additional Nextflow configuration
settings can be provided here. Defaults to None.
run_name (str, optional): Custom workflow run name. Defaults to
None, which will automatically assign a random run name.
work_dir (str, optional): The bucket path where the pipeline
scratch data is stored. Defaults to None, which uses the
default work directory for the given compute environment.
profiles (Sequence, optional): Configuration profile names
to use for this execution. Defaults to an empty list.
user_secrets (Sequence, optional): Secrets required by the
pipeline execution. Those secrets must be defined in the
launching user's account. User secrets take precedence over
workspace secrets. Defaults to an empty list.
workspace_secrets (Sequence, optional): Secrets required by the
pipeline execution. Those secrets must be defined in the
opened workspace. Defaults to an empty list.
pre_run_script (str, optional): A Bash script that's executed
in the same environment where Nextflow runs just before
the pipeline is launched. Defaults to None.
init_data (Mapping, optional): An alternate request payload for
launching a workflow. It's recommended to generate a basic
request using `init_launch_workflow_data()` and modifying
it before passing it to `init_data`. Defaults to None.
Returns:
dict: Information about the just-launched workflow run.
"""
endpoint = "/workflow/launch"
params = self.init_params()
arguments = {
"launch": {
"configProfiles": dedup(profiles),
"configText": nextflow_config,
# TODO: Validate YAML or JSON
"paramsText": params_yaml or params_json,
"pipeline": pipeline,
"preRunScript": pre_run_script,
"revision": revision,
# TODO: Avoid duplicate run names
"runName": run_name,
"userSecrets": dedup(user_secrets),
"workDir": work_dir,
"workspaceSecrets": dedup(workspace_secrets),
}
}
# Update default data with argument values and user-provided overrides
data = init_data or self.init_launch_workflow_data(compute_env_id)
data = update_dict(data, arguments)
# Launch workflow and obtain workflow ID
response = self.client.request("POST", endpoint, params=params, json=data)
# Get more information about workflow run
workflow = self.get_workflow(response["workflowId"])
return workflow
50 changes: 49 additions & 1 deletion src/sagetasks/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,62 @@
import inspect
import sys
from collections.abc import Mapping, Sequence
from copy import copy

from prefect import task


def to_prefect_tasks(module_name, general_module):
def to_prefect_tasks(module_name: str, general_module: str) -> None:
"""Wrap functions inside a general module as Prefect tasks.
Args:
module_name (str): Module name.
general_module (str): General submodule name.
"""
this_module = sys.modules[module_name]
general_funcs = inspect.getmembers(general_module, inspect.isfunction)
for name, func in general_funcs:
docstring = getattr(func, "__doc__", "")
first_line = docstring.splitlines()[0]
task_func = task(func, name=first_line)
setattr(this_module, name, task_func)


def update_dict(base_dict: Mapping, overrides: Mapping) -> Mapping:
"""Update a dictionary recursively with a set of overrides.
Args:
base_dict (Mapping): Base dictionary.
overrides (Mapping): Dictionary with overrides.
Raises:
ValueError: If there is an attempt to create a new key.
Returns:
Mapping: Updated dictionary.
"""
dict_copy = copy(base_dict)
for k, v in overrides.items():
oldv = dict_copy.get(k, {})
if k not in dict_copy:
valid = set(dict_copy)
raise ValueError(f"Cannot update {k}. Not among {valid}.")
elif isinstance(oldv, Mapping) and isinstance(v, Mapping):
dict_copy[k] = update_dict(oldv, v)
elif v is not None:
dict_copy[k] = v
return dict_copy


def dedup(x: Sequence) -> list:
"""Deduplicate elements in a sequence (such as a list).
Args:
x (Sequence): List of elements.
Returns:
list: Deduplicated list of elements.
"""
if isinstance(x, Sequence):
x = list(set(x))
return x
12 changes: 8 additions & 4 deletions tests/nextflowtower/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ def tower_client():
return client.TowerClient(EG_TOKEN, EG_API_URL)


def test_import():
assert client


class TestTowerClient:
def test_missing_args(self):
# Clear environment
Expand All @@ -36,6 +32,10 @@ def test_missing_args(self):
# Ensure exception when initializing
with pytest.raises(ValueError):
client.TowerClient()
with pytest.raises(ValueError):
client.TowerClient(tower_token=EG_TOKEN)
with pytest.raises(ValueError):
client.TowerClient(tower_api_url=EG_API_URL)

def test_get_valid_name(self, tower_client):
expectations = {
Expand All @@ -50,6 +50,10 @@ def test_get_valid_name(self, tower_client):
for name, output in expectations.items():
assert tower_client.get_valid_name(name) == output

def test_request_nonmethod(self, tower_client):
with pytest.raises(ValueError):
tower_client.request("FOO", EG_ENDPOINT)

def test_request_nonempty(self, mocker, capfd, tower_client):
# Setup
eg_kwargs = {"foo": "bar"}
Expand Down
Loading

0 comments on commit f081f52

Please sign in to comment.