Skip to content
This repository has been archived by the owner on Jun 11, 2023. It is now read-only.

Commit

Permalink
Merge pull request #10 from Sage-Bionetworks-Workflows/bgrande/ORCA-7…
Browse files Browse the repository at this point in the history
…3/sagetasks-cli

[ORCA-73] Add a sagetasks CLI for launching Tower workflows
  • Loading branch information
Bruno Grande authored Nov 10, 2022
2 parents f081f52 + 2534fb0 commit a2d22da
Show file tree
Hide file tree
Showing 11 changed files with 780 additions and 603 deletions.
1,223 changes: 632 additions & 591 deletions Pipfile.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ install_requires =
synapseclient
sevenbridges-python
pandas
typer
rich

[options.packages.find]
where = src
Expand All @@ -73,6 +75,8 @@ dev =
black
isort
jupyterlab
mypy
types-requests

[options.entry_points]
# Add here console scripts like:
Expand All @@ -84,6 +88,8 @@ dev =
# And any other entry points, for example:
# pyscaffold.cli =
# awesome = pyscaffoldext.awesome.extension:AwesomeExtension
console_scripts =
sagetasks = sagetasks.main:main_app

[tool:pytest]
# Specify command line options as you would do when invoking pytest directly.
Expand Down
3 changes: 3 additions & 0 deletions src/sagetasks/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from sagetasks.main import main_app

main_app(prog_name="sagetasks")
6 changes: 6 additions & 0 deletions src/sagetasks/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from typer import Typer

import sagetasks.nextflowtower.typer

main_app = Typer(rich_markup_mode="markdown")
main_app.add_typer(sagetasks.nextflowtower.typer.app, name="nextflowtower")
2 changes: 1 addition & 1 deletion src/sagetasks/nextflowtower/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(
or os.environ.get("NXF_TOWER_API_URL")
or os.environ.get("TOWER_API_ENDPOINT") # Backwards-compatible
)
self.debug = debug_mode
self.debug = debug_mode or bool(int(os.environ.get("NXF_TOWER_DEBUG", 0)))
# Check for empty values
if self.tower_token is None:
raise ValueError(
Expand Down
70 changes: 70 additions & 0 deletions src/sagetasks/nextflowtower/general.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from typing import List, Optional

from sagetasks.nextflowtower.utils import TowerUtils

# TODO: Re-enable this function once we've figured out how to best handle
# `kwargs` in Typer. The following links might be useful for this:
# https://typer.tiangolo.com/tutorial/commands/context/#configuring-the-context
# https://peps.python.org/pep-0692/
# def bundle_client_args(auth_token, platform="sage", endpoint=None, **kwargs):
# """Nextflow Tower - Bundle client arguments."""
# return TowerUtils.bundle_client_args(auth_token, platform, endpoint, **kwargs)


def launch_workflow(
compute_env_id: str,
pipeline: str,
workspace_id=None,
revision: Optional[str] = None,
params_yaml: Optional[str] = None,
params_json: Optional[str] = None,
nextflow_config: Optional[str] = None,
run_name: Optional[str] = None,
work_dir: Optional[str] = None,
profiles: Optional[List[str]] = None,
user_secrets: Optional[List[str]] = None,
workspace_secrets: Optional[List[str]] = None,
pre_run_script: Optional[str] = None,
# TODO: Re-enable once we find a way to get Typer to support Mappings
# init_data: Optional[Mapping] = None,
client_args=None,
):
"""Launch a workflow run on Nextflow Tower.
You can provide your Tower credentials with the following
environment variables:
- NXF_TOWER_TOKEN='<tower-access-token>'
- NXF_TOWER_API_URL='<tower-api-url>'
You can optionally enable debug mode (HTTP request logs)
with the following environment variable:
- NXF_TOWER_DEBUG=1
"""
# More specific default values than None
client_args = client_args or dict()
profiles = profiles or ()
user_secrets = user_secrets or ()
workspace_secrets = workspace_secrets or ()
# Prepare and execute the workflow launch
utils = TowerUtils(client_args)
utils.open_workspace(workspace_id)
workflow = utils.launch_workflow(
compute_env_id,
pipeline,
revision=revision,
params_yaml=params_yaml,
params_json=params_json,
nextflow_config=nextflow_config,
run_name=run_name,
work_dir=work_dir,
profiles=profiles,
user_secrets=user_secrets,
workspace_secrets=workspace_secrets,
pre_run_script=pre_run_script,
)
return workflow
5 changes: 5 additions & 0 deletions src/sagetasks/nextflowtower/typer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import sagetasks.nextflowtower.general
from sagetasks.utils import to_typer_commands

# Auto-generate Typer commands from general functions
app = to_typer_commands(sagetasks.nextflowtower.general)
14 changes: 7 additions & 7 deletions src/sagetasks/nextflowtower/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Mapping, Optional, Sequence
from typing import List, Mapping, Optional

from sagetasks.nextflowtower.client import TowerClient
from sagetasks.utils import dedup, update_dict
Expand Down Expand Up @@ -196,9 +196,9 @@ def launch_workflow(
nextflow_config: Optional[str] = None,
run_name: Optional[str] = None,
work_dir: Optional[str] = None,
profiles: Optional[Sequence] = (),
user_secrets: Optional[Sequence] = (),
workspace_secrets: Optional[Sequence] = (),
profiles: Optional[List[str]] = (),
user_secrets: Optional[List[str]] = (),
workspace_secrets: Optional[List[str]] = (),
pre_run_script: Optional[str] = None,
init_data: Optional[Mapping] = None,
) -> dict:
Expand All @@ -224,13 +224,13 @@ def launch_workflow(
work_dir (str, optional): The bucket path where the pipeline
scratch data is stored. Defaults to None, which uses the
default work directory for the given compute environment.
profiles (Sequence, optional): Configuration profile names
profiles (List[str], optional): Configuration profile names
to use for this execution. Defaults to an empty list.
user_secrets (Sequence, optional): Secrets required by the
user_secrets (List[str], optional): Secrets required by the
pipeline execution. Those secrets must be defined in the
launching user's account. User secrets take precedence over
workspace secrets. Defaults to an empty list.
workspace_secrets (Sequence, optional): Secrets required by the
workspace_secrets (List[str], optional): Secrets required by the
pipeline execution. Those secrets must be defined in the
opened workspace. Defaults to an empty list.
pre_run_script (str, optional): A Bash script that's executed
Expand Down
4 changes: 2 additions & 2 deletions src/sagetasks/sevenbridges/prefect.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sagetasks.sevenbridges.general as general
import sagetasks.sevenbridges.general
from sagetasks.utils import to_prefect_tasks

# Auto-generate Prefect tasks from general functions
to_prefect_tasks(__name__, general)
to_prefect_tasks(__name__, sagetasks.sevenbridges.general)
4 changes: 2 additions & 2 deletions src/sagetasks/synapse/prefect.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sagetasks.synapse.general as general
import sagetasks.synapse.general
from sagetasks.utils import to_prefect_tasks

# Auto-generate Prefect tasks from general functions
to_prefect_tasks(__name__, general)
to_prefect_tasks(__name__, sagetasks.synapse.general)
46 changes: 46 additions & 0 deletions src/sagetasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
import sys
from collections.abc import Mapping, Sequence
from copy import copy
from functools import wraps

from prefect import task
from rich import print as rich_print
from typer import Typer


def to_prefect_tasks(module_name: str, general_module: str) -> None:
Expand All @@ -22,6 +25,49 @@ def to_prefect_tasks(module_name: str, general_module: str) -> None:
setattr(this_module, name, task_func)


def to_typer_commands(general_module: str) -> None:
"""Wrap functions inside a general module as Typer commands.
Most functions being converted into Typer commands have
a return value. In Python, that return value can be used
for other purposes. At the CLI, this return value isn't
visible by default. Hence, before being passed to Typer,
`to_typer_commands()` wraps each function such that the
return value is printed on standard output. For the time
being, the `print()` function from the `rich` package is
being used for colored and formatted output.
Args:
general_module (str): General submodule name.
"""

# This weird setup is to avoid a flake8 B023 linting
# error, which is associated with the following gotcha:
# https://docs.python-guide.org/writing/gotchas/#late-binding-closures
def add_print(func):
@wraps(func)
def printing_func(*args, **kwargs):
result = func(*args, **kwargs)
rich_print(result)
# TODO: Use this after we add a global JSON output CLI option
# try:
# output = json.dumps(result, indent=2)
# except TypeError:
# output = repr(result)
# print(output)

return printing_func

typer_app = Typer(rich_markup_mode="markdown")
general_funcs = inspect.getmembers(general_module, inspect.isfunction)

for _, func in general_funcs:
printing_func = add_print(func)
typer_app.command()(printing_func)

return typer_app


def update_dict(base_dict: Mapping, overrides: Mapping) -> Mapping:
"""Update a dictionary recursively with a set of overrides.
Expand Down

0 comments on commit a2d22da

Please sign in to comment.