From 214f6972a80d41a579c328db7c7e78b34841cfab Mon Sep 17 00:00:00 2001 From: Odarsson <40160237+odarotto@users.noreply.github.com> Date: Mon, 4 Mar 2024 11:48:00 -0400 Subject: [PATCH] 19 feat create pipeline client (#20) * feat(pipelines.py): adding pipelines client * feat(pipelines.py): adding pipelines client --- pipeline-example.yaml | 44 ++++++++++ workflow/cli/main.py | 4 +- workflow/cli/pipelines.py | 170 +++++++++++++++++++------------------ workflow/http/pipelines.py | 166 +++++++++++++++++++++++++++++++++++- 4 files changed, 298 insertions(+), 86 deletions(-) create mode 100644 pipeline-example.yaml diff --git a/pipeline-example.yaml b/pipeline-example.yaml new file mode 100644 index 0000000..34660c0 --- /dev/null +++ b/pipeline-example.yaml @@ -0,0 +1,44 @@ +version: 1 +name: example +defaults: + user: test + site: local + +deployments: + - name: test-deployment # Name is required + site: canfar # Same valid sites as the work object + image: test/test-image:latest #This is the container image + resources: + cores: 2 # Integer value of cores + ram: 4G #1G, -- either MB or GB of RAMS + gpu: 4 # Optional, int value for GPUs + replicas: 1 #Int value + +pipeline: + # Provision top-level + steps: + - name: alpha + stage: 1 + work: + function: guidelines.example.alpha + parameters: + mu0: 100.1 + sigma0: 22.0 + - name: beta + runs-on: test-deployment + # Provision stage-level + stage: 2 + work: + function: guidelines.example.beta + parameters: + mu0: 100.1 + sigma0: 22.0 + # Teardown stage-level + - name: gamma + stage: 3 + work: + function: guidelines.example.gamma + parameters: + mu0: 100.1 + sigma0: 22.0 + # Teardown of top-level diff --git a/workflow/cli/main.py b/workflow/cli/main.py index 47f7701..d47a7dc 100755 --- a/workflow/cli/main.py +++ b/workflow/cli/main.py @@ -3,7 +3,7 @@ import click # from workflow.cli.buckets import buckets -# from workflow.cli.pipelines import pipelines +from workflow.cli.pipelines import pipelines from workflow.cli.run import run from workflow.cli.workspace import workspace @@ -16,7 +16,7 @@ def cli(): cli.add_command(run) # cli.add_command(buckets) -# cli.add_command(pipelines) +cli.add_command(pipelines) cli.add_command(workspace) if __name__ == "__main__": diff --git a/workflow/cli/pipelines.py b/workflow/cli/pipelines.py index 5e32ff0..ef8dfc3 100644 --- a/workflow/cli/pipelines.py +++ b/workflow/cli/pipelines.py @@ -8,14 +8,33 @@ import yaml from rich import pretty from rich.console import Console +from rich.json import JSON from rich.table import Table +from rich.text import Text from yaml.loader import SafeLoader +from workflow.http.context import HTTPContext + pretty.install() console = Console() +table = Table( + title="\nWorkflow Pipelines", + show_header=True, + header_style="magenta", + title_style="bold magenta", +) + BASE_URL = "https://frb.chimenet.ca/pipelines" STATUS = ["created", "queued", "running", "success", "failure", "cancelled"] +status_colors = { + "running": "blue", + "created": "lightblue", + "queued": "yellow", + "success": "green", + "failure": "red", + "cancelled": "orange", +} @click.group(name="pipelines", help="Manage Workflow Pipelines.") @@ -27,24 +46,41 @@ def pipelines(): @pipelines.command("version", help="Backend version.") def version(): """Get version of the pipelines service.""" - response = requests.get(f"{BASE_URL}/version") - console.print(response.json()) + http = HTTPContext() + console.print(http.pipelines.info()) @pipelines.command("ls", help="List pipelines.") -def ls(): - """List all active pipelines.""" - pipelines = status() - table = Table( - title="\nWorkflow Pipelines", - show_header=True, - header_style="magenta", - title_style="bold magenta", - ) - table.add_column("Pipeline", max_width=50, justify="left") +@click.option("name", "--name", "-n", type=str, required=False) +def ls(name: Optional[str] = None): + """List all pipelines.""" + http = HTTPContext() + pipeline_configs = http.pipelines.list_pipeline_configs(name) + table.add_column("ID", max_width=50, justify="left", style="blue") + table.add_column("Name", max_width=50, justify="left", style="bright_green") + table.add_column("Status", max_width=50, justify="left") + table.add_column("Stage", max_width=50, justify="left") + for config in pipeline_configs: + status = Text(config["status"], style=status_colors[config["status"]]) + table.add_row( + config["id"], config["name"], status, str(config["current_stage"]) + ) + console.print(table) + + +@pipelines.command("count", help="Count pipeline configurations per collection.") +def count(): + """Count pipeline configurations.""" + http = HTTPContext() + counts = http.pipelines.count() + table.add_column("Name", max_width=50, justify="left", style="blue") table.add_column("Count", max_width=50, justify="left") - for key, value in pipelines.items(): - table.add_row(str(key), str(value)) + total = int() + for k, v in counts.items(): + table.add_row(k, str(v)) + total += v + table.add_section() + table.add_row("Total", str(total)) console.print(table) @@ -56,95 +92,65 @@ def ls(): ) def deploy(filename: click.Path): """Deploy a workflow pipeline.""" + http = HTTPContext() filepath: str = str(filename) data: Dict[str, Any] = {} with open(filepath) as reader: data = yaml.load(reader, Loader=SafeLoader) # type: ignore - response = requests.post(f"{BASE_URL}/v1/pipelines", json=data) - response.raise_for_status() - pipeline: str = response.json().get("id") - console.print(f"{pipeline}") + deploy_result = http.pipelines.deploy(data) + table.add_column("IDs") + for _id in deploy_result: + table.add_row(_id) + console.print(table) @pipelines.command("ps", help="Get pipeline details.") @click.argument("pipeline", type=str, required=True) -@click.argument("id", type=str, required=False) -@click.option( - "--filter", - "-f", - type=click.Choice(STATUS), - required=False, - help="Filter by status.", -) +@click.argument("id", type=str, required=True) @click.option("--quiet", "-q", is_flag=True, default=False, help="Only display IDs.") -def ps(pipeline: str, id: str, filter: str, quiet: bool): - """List all pipeline in detail.""" - query: Dict[str, Any] = {} - if not id: - if filter: - query = {"status": filter} - response = status( - pipeline=pipeline, query=query, projection={"status": True, "id": True} - ) - if not quiet: - info = response.get(pipeline) - table = Table( - title=f"\nWorkflow Pipeline: {pipeline}", - show_header=True, - header_style="magenta", - title_style="bold magenta", - ) - table.add_column("ID", max_width=50, justify="left") - table.add_column("Status", max_width=50, justify="left") - for item in info: - pid = str(item.get("id")) - pstatus = str(item.get("status")) - table.add_row(pid, pstatus) - console.print(table) - else: - for item in response.get(pipeline): - console.print(item.get("id")) - if id: - query = {"id": id} - response = status( - pipeline=pipeline, query=query, projection={"pipeline.work": False} - ) - info = response.get(pipeline)[0] - console.print(info) +def ps(pipeline: str, id: str): + """List a pipeline configuration in detail.""" + http = HTTPContext() + query: Dict[str, Any] = {"id": id} + payload = http.pipelines.get_pipeline_config(collection=pipeline, query=query) + table.add_column(f"Pipeline: {pipeline}", max_width=120, justify="left") + text = JSON(json.dumps(payload), indent=2) + table.add_row(text) + console.print(table) @pipelines.command("stop", help="Kill a running pipeline.") @click.argument("pipeline", type=str, required=True) -@click.argument("id", type=str, nargs=-1, required=True) +@click.argument("id", type=str, required=True) def stop(pipeline: str, id: Tuple[str]): """Kill a running pipeline.""" - filter: str = json.dumps({"id": {"$in": id}}) - try: - response = requests.put( - f"{BASE_URL}/v1/pipelines/cancel", - params={"name": pipeline, "query": filter}, - ) - response.raise_for_status() - console.print(f"{id}") - except requests.exceptions.HTTPError as err: - console.print(err) + http = HTTPContext() + stop_result = http.pipelines.stop(pipeline, id) + if not any(stop_result): + text = Text("No pipeline configurations were stopped.", style="red") + console.print(text) + return + table.add_column("Deleted IDs", max_width=50, justify="left") + for config in stop_result: + table.add_row(config["id"]) + console.print(table) @pipelines.command("rm", help="Remove a pipeline.") @click.argument("pipeline", type=str, required=True) -@click.argument("id", type=str, nargs=-1, required=True) +@click.argument("id", type=str, required=True) def rm(pipeline: str, id: Tuple[str]): """Remove a pipeline.""" - filter: str = json.dumps({"id": {"$in": id}}) - try: - response = requests.delete( - f"{BASE_URL}/v1/pipelines", - params={"name": pipeline, "query": filter}, - ) - response.raise_for_status() - console.print(f"{id}") - except requests.exceptions.HTTPError as err: - console.print(err) + http = HTTPContext() + delete_result = http.pipelines.remove(pipeline, id) + if not any(delete_result): + text = Text("No pipeline configurations were deleted.", style="red") + console.print(text) + return + table.add_column("Deleted IDs", max_width=50, justify="left") + for config in delete_result: + table.add_row(config["id"]) + console.print(table) def status( diff --git a/workflow/http/pipelines.py b/workflow/http/pipelines.py index d980dbe..31e9293 100644 --- a/workflow/http/pipelines.py +++ b/workflow/http/pipelines.py @@ -1,6 +1,16 @@ """Workflow Pipelines API.""" +from json import dumps +from typing import Any, Dict, List, Optional +from urllib.parse import urlencode + +from requests.models import Response +from tenacity import retry +from tenacity.stop import stop_after_delay +from tenacity.wait import wait_random + from workflow.http.client import Client +from workflow.utils.decorators import try_request class Pipelines(Client): @@ -10,7 +20,159 @@ class Pipelines(Client): Client (workflow.http.client): The base class for interacting with the backend. Returns: - Results: A client for interacting with the Pipelines backend. + Pipelines: A client for interacting with the Pipelines backend. """ - pass + @retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30))) + @try_request + def deploy(self, data: Dict[str, Any]): + """Deploys a PipelineConfig from payload data. + + Parameters + ---------- + data : Dict[str, Any] + YAML data. + + Returns + ------- + List[str] + IDs of PipelineConfig objects generated. + """ + with self.session as session: + url = f"{self.baseurl}/v1/pipelines" + response: Response = session.post(url, json=data) + response.raise_for_status() + return response.json() + + @try_request + def count(self) -> Dict[str, Any]: + """Count all documents in a collection. + + Returns + ------- + Dict[str, Any] + Dictionary with count. + """ + with self.session as session: + response: Response = session.get(url=f"{self.baseurl}/v1/pipelines/count") + response.raise_for_status() + return response.json() + + @try_request + def list_pipeline_configs( + self, config_name: Optional[str] = None + ) -> List[Dict[str, Any]]: + """View the current pipeline configurations in the pipelines backend. + + Parameters + ---------- + config_name : Optional[str], optional + PipelineConfig name, by default None + + Returns + ------- + List[Dict[str, Any]] + List of PipelineConfig payloads. + """ + with self.session as session: + url = ( + f"{self.baseurl}/v1/pipelines" + if config_name is None + else f'{self.baseurl}/v1/pipelines?query={{"name":"{config_name}"}}' + ) + response: Response = session.get(url=url) + response.raise_for_status() + return response.json() + + @try_request + def get_pipeline_config( + self, collection: str, query: Dict[str, Any] + ) -> Dict[str, Any]: + """Gets details for one pipeline configuration. + + Parameters + ---------- + collection : str + PipelineConfig name. + query : Dict[str, Any] + Dictionary with search parameters. + + Returns + ------- + Dict[str, Any] + Pipeline configuration payload. + """ + with self.session as session: + params = {"query": dumps(query), "name": collection} + url = f"{self.baseurl}/v1/pipelines?{urlencode(params)}" + response: Response = session.get(url=url) + response.raise_for_status() + return response.json()[0] + + @retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30))) + @try_request + def remove(self, pipeline: str, id: str) -> List[Dict[str, Any]]: + """Removes a cancelled pipeline configuration. + + Parameters + ---------- + pipeline : str + PipelineConfig name. + id : str + PipelineConfig ID. + + Returns + ------- + List[Dict[str, Any]] + Response payload. + """ + with self.session as session: + query = {"id": id} + params = {"query": dumps(query), "name": pipeline} + url = f"{self.baseurl}/v1/pipelines?{urlencode(params)}" + response: Response = session.delete(url=url) + response.raise_for_status() + return response.json() + + @retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30))) + @try_request + def stop(self, pipeline: str, id: str) -> List[Dict[str, Any]]: + """Stops the manager for a PipelineConfig. + + Parameters + ---------- + pipeline : str + Pipeline name. + id : str + PipelineConfig ID. + + Returns + ------- + List[Dict[str, Any]] + List of stopped PipelineConfig objects. + """ + with self.session as session: + query = {"id": id} + params = {"query": dumps(query), "name": pipeline} + url = f"{self.baseurl}/v1/pipelines/cancel?{urlencode(params)}" + response: Response = session.put(url) + response.raise_for_status() + if response.status_code == 304: + return [] + return response.json() + + @try_request + def info(self) -> Dict[str, Any]: + """Get the version of the pipelines backend. + + Returns + ------- + Dict[str, Any] + Pipelines backend info. + """ + client_info = self.model_dump() + with self.session as session: + response: Response = session.get(url=f"{self.baseurl}/version") + response.raise_for_status() + server_info = response.json() + return {"client": client_info, "server": server_info}