Skip to content

Commit

Permalink
19 feat create pipeline client (#20)
Browse files Browse the repository at this point in the history
* feat(pipelines.py): adding pipelines client

* feat(pipelines.py): adding pipelines client
  • Loading branch information
odarotto committed Mar 4, 2024
1 parent 7bc52bb commit 214f697
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 86 deletions.
44 changes: 44 additions & 0 deletions pipeline-example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
version: 1
name: example
defaults:
user: test
site: local

deployments:
- name: test-deployment # Name is required
site: canfar # Same valid sites as the work object
image: test/test-image:latest #This is the container image
resources:
cores: 2 # Integer value of cores
ram: 4G #1G, -- either MB or GB of RAMS
gpu: 4 # Optional, int value for GPUs
replicas: 1 #Int value

pipeline:
# Provision top-level
steps:
- name: alpha
stage: 1
work:
function: guidelines.example.alpha
parameters:
mu0: 100.1
sigma0: 22.0
- name: beta
runs-on: test-deployment
# Provision stage-level
stage: 2
work:
function: guidelines.example.beta
parameters:
mu0: 100.1
sigma0: 22.0
# Teardown stage-level
- name: gamma
stage: 3
work:
function: guidelines.example.gamma
parameters:
mu0: 100.1
sigma0: 22.0
# Teardown of top-level
4 changes: 2 additions & 2 deletions workflow/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import click

# from workflow.cli.buckets import buckets
# from workflow.cli.pipelines import pipelines
from workflow.cli.pipelines import pipelines
from workflow.cli.run import run
from workflow.cli.workspace import workspace

Expand All @@ -16,7 +16,7 @@ def cli():

cli.add_command(run)
# cli.add_command(buckets)
# cli.add_command(pipelines)
cli.add_command(pipelines)
cli.add_command(workspace)

if __name__ == "__main__":
Expand Down
170 changes: 88 additions & 82 deletions workflow/cli/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,33 @@
import yaml
from rich import pretty
from rich.console import Console
from rich.json import JSON
from rich.table import Table
from rich.text import Text
from yaml.loader import SafeLoader

from workflow.http.context import HTTPContext

pretty.install()
console = Console()

table = Table(
title="\nWorkflow Pipelines",
show_header=True,
header_style="magenta",
title_style="bold magenta",
)

BASE_URL = "https://frb.chimenet.ca/pipelines"
STATUS = ["created", "queued", "running", "success", "failure", "cancelled"]
status_colors = {
"running": "blue",
"created": "lightblue",
"queued": "yellow",
"success": "green",
"failure": "red",
"cancelled": "orange",
}


@click.group(name="pipelines", help="Manage Workflow Pipelines.")
Expand All @@ -27,24 +46,41 @@ def pipelines():
@pipelines.command("version", help="Backend version.")
def version():
"""Get version of the pipelines service."""
response = requests.get(f"{BASE_URL}/version")
console.print(response.json())
http = HTTPContext()
console.print(http.pipelines.info())


@pipelines.command("ls", help="List pipelines.")
def ls():
"""List all active pipelines."""
pipelines = status()
table = Table(
title="\nWorkflow Pipelines",
show_header=True,
header_style="magenta",
title_style="bold magenta",
)
table.add_column("Pipeline", max_width=50, justify="left")
@click.option("name", "--name", "-n", type=str, required=False)
def ls(name: Optional[str] = None):
"""List all pipelines."""
http = HTTPContext()
pipeline_configs = http.pipelines.list_pipeline_configs(name)
table.add_column("ID", max_width=50, justify="left", style="blue")
table.add_column("Name", max_width=50, justify="left", style="bright_green")
table.add_column("Status", max_width=50, justify="left")
table.add_column("Stage", max_width=50, justify="left")
for config in pipeline_configs:
status = Text(config["status"], style=status_colors[config["status"]])
table.add_row(
config["id"], config["name"], status, str(config["current_stage"])
)
console.print(table)


@pipelines.command("count", help="Count pipeline configurations per collection.")
def count():
"""Count pipeline configurations."""
http = HTTPContext()
counts = http.pipelines.count()
table.add_column("Name", max_width=50, justify="left", style="blue")
table.add_column("Count", max_width=50, justify="left")
for key, value in pipelines.items():
table.add_row(str(key), str(value))
total = int()
for k, v in counts.items():
table.add_row(k, str(v))
total += v
table.add_section()
table.add_row("Total", str(total))
console.print(table)


Expand All @@ -56,95 +92,65 @@ def ls():
)
def deploy(filename: click.Path):
"""Deploy a workflow pipeline."""
http = HTTPContext()
filepath: str = str(filename)
data: Dict[str, Any] = {}
with open(filepath) as reader:
data = yaml.load(reader, Loader=SafeLoader) # type: ignore
response = requests.post(f"{BASE_URL}/v1/pipelines", json=data)
response.raise_for_status()
pipeline: str = response.json().get("id")
console.print(f"{pipeline}")
deploy_result = http.pipelines.deploy(data)
table.add_column("IDs")
for _id in deploy_result:
table.add_row(_id)
console.print(table)


@pipelines.command("ps", help="Get pipeline details.")
@click.argument("pipeline", type=str, required=True)
@click.argument("id", type=str, required=False)
@click.option(
"--filter",
"-f",
type=click.Choice(STATUS),
required=False,
help="Filter by status.",
)
@click.argument("id", type=str, required=True)
@click.option("--quiet", "-q", is_flag=True, default=False, help="Only display IDs.")
def ps(pipeline: str, id: str, filter: str, quiet: bool):
"""List all pipeline in detail."""
query: Dict[str, Any] = {}
if not id:
if filter:
query = {"status": filter}
response = status(
pipeline=pipeline, query=query, projection={"status": True, "id": True}
)
if not quiet:
info = response.get(pipeline)
table = Table(
title=f"\nWorkflow Pipeline: {pipeline}",
show_header=True,
header_style="magenta",
title_style="bold magenta",
)
table.add_column("ID", max_width=50, justify="left")
table.add_column("Status", max_width=50, justify="left")
for item in info:
pid = str(item.get("id"))
pstatus = str(item.get("status"))
table.add_row(pid, pstatus)
console.print(table)
else:
for item in response.get(pipeline):
console.print(item.get("id"))
if id:
query = {"id": id}
response = status(
pipeline=pipeline, query=query, projection={"pipeline.work": False}
)
info = response.get(pipeline)[0]
console.print(info)
def ps(pipeline: str, id: str):
"""List a pipeline configuration in detail."""
http = HTTPContext()
query: Dict[str, Any] = {"id": id}
payload = http.pipelines.get_pipeline_config(collection=pipeline, query=query)
table.add_column(f"Pipeline: {pipeline}", max_width=120, justify="left")
text = JSON(json.dumps(payload), indent=2)
table.add_row(text)
console.print(table)


@pipelines.command("stop", help="Kill a running pipeline.")
@click.argument("pipeline", type=str, required=True)
@click.argument("id", type=str, nargs=-1, required=True)
@click.argument("id", type=str, required=True)
def stop(pipeline: str, id: Tuple[str]):
"""Kill a running pipeline."""
filter: str = json.dumps({"id": {"$in": id}})
try:
response = requests.put(
f"{BASE_URL}/v1/pipelines/cancel",
params={"name": pipeline, "query": filter},
)
response.raise_for_status()
console.print(f"{id}")
except requests.exceptions.HTTPError as err:
console.print(err)
http = HTTPContext()
stop_result = http.pipelines.stop(pipeline, id)
if not any(stop_result):
text = Text("No pipeline configurations were stopped.", style="red")
console.print(text)
return
table.add_column("Deleted IDs", max_width=50, justify="left")
for config in stop_result:
table.add_row(config["id"])
console.print(table)


@pipelines.command("rm", help="Remove a pipeline.")
@click.argument("pipeline", type=str, required=True)
@click.argument("id", type=str, nargs=-1, required=True)
@click.argument("id", type=str, required=True)
def rm(pipeline: str, id: Tuple[str]):
"""Remove a pipeline."""
filter: str = json.dumps({"id": {"$in": id}})
try:
response = requests.delete(
f"{BASE_URL}/v1/pipelines",
params={"name": pipeline, "query": filter},
)
response.raise_for_status()
console.print(f"{id}")
except requests.exceptions.HTTPError as err:
console.print(err)
http = HTTPContext()
delete_result = http.pipelines.remove(pipeline, id)
if not any(delete_result):
text = Text("No pipeline configurations were deleted.", style="red")
console.print(text)
return
table.add_column("Deleted IDs", max_width=50, justify="left")
for config in delete_result:
table.add_row(config["id"])
console.print(table)


def status(
Expand Down
Loading

0 comments on commit 214f697

Please sign in to comment.