Skip to content

Commit

Permalink
feat(cli): added buckets cli
Browse files Browse the repository at this point in the history
  • Loading branch information
shinybrar committed Jun 4, 2024
1 parent c90bc7a commit de14f28
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 128 deletions.
145 changes: 66 additions & 79 deletions workflow/cli/buckets.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Common Workflow Utilities."""

from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

import click
from rich import pretty, print
from rich import pretty
from rich.console import Console
from rich.table import Table

Expand All @@ -14,7 +14,7 @@


table = Table(
title="\nWorkflow Buckets",
title="\nWorkflow",
show_header=True,
header_style="magenta",
title_style="bold magenta",
Expand All @@ -23,103 +23,90 @@

@click.group(name="buckets", help="Manage Workflow Buckets.")
def buckets():
"""Manage workflow pipelines."""
"""Manage Workflow Buckets."""
pass


@buckets.command("version", help="Show the version.")
def version():
"""Show the version."""
http = HTTPContext()
console.print(http.buckets.info())


@buckets.command("rm", help="Remove a bucket.")
@click.argument("name", type=str, required=True)
@click.option("event", "--event", type=int, required=False, help="CHIME/FRB Event ID.")
@buckets.command("rm", help="Remove work from a bucket.")
@click.argument("bucket", type=str, required=True)
@click.option("-s", "--status", type=str, required=False, help="Filter by status.")
@click.option(
"-e", "--event", type=int, required=False, multiple=True, help="Filter by event."
)
@click.option(
"status",
"--status",
type=str,
required=False,
help="Remove works with only a particular status.",
"-t", "--tag", type=str, required=False, multiple=True, help="Filter by tag."
)
def prune_work(name: str, event: Optional[int] = None, status: Optional[str] = None):
"""Remove work[s] from the buckets backend.
@click.option("-p", "--parent", type=str, required=False, help="Filter by parent.")
@click.option("-f", "--force", is_flag=True, help="Do not prompt for confirmation")
def remove(
bucket: str,
status: Optional[str] = None,
event: Optional[Tuple[int]] = None,
tag: Optional[Tuple[str]] = None,
parent: Optional[str] = None,
force: bool = False,
):
"""Remove work[s] from the buckets.
Args:
name (str): Name of the workflow pipeline.
event (Optional[int], optional): CHIME/FRB Event ID. Defaults to None.
status (Optional[str], optional): Status of work[s] to prune. Defaults to None.
bucket (str): Name of the bucket.
status (Optional[str], optional): Status of work. Defaults to None.
event (Optional[Tuple[int]], optional): Filter by event. Defaults to None.
tag (Optional[Tuple[str]], optional): Filter by tag. Defaults to None.
parent (Optional[str], optional): Filter by parent. Defaults to None.
force (bool, optional): Do not prompt for confirmation. Defaults to False.
"""
http = HTTPContext()
events: Optional[List[int]] = None
if event is not None:
events = [event]
http.buckets.delete_many(pipeline=name, status=status, events=events)


@buckets.command("ls", help="List all active buckets.")
def ls():
"""List all active buckets."""
http = HTTPContext()
pipelines = http.buckets.pipelines()
table.add_column("Active Buckets", max_width=50, justify="left")
for pipeline in pipelines:
table.add_row(pipeline)
console.print(table)


@buckets.command("ps", help="List the detail of buckets[s].")
@click.option("all", "-a", "--all", is_flag=True, help="List details of all buckets.")
@click.argument("name", type=str, required=False, default=None)
def ps(name: Optional[str] = None, all: bool = False):
"""List the details of the bucket[s].
Args:
name (Optional[str], optional): Name of the bucket. Defaults to None.
all (bool, optional): Whether to show all buckets. Defaults to False.
"""
tags: Optional[List[str]] = None
if event:
events = list(event)
if tag:
tags = list(tag)
http.buckets.delete_many(
pipeline=bucket,
status=status,
events=events,
tags=tags,
parent=parent,
force=force,
)


@buckets.command("ls")
@click.option("-d", "--details", is_flag=True, help="List details.")
def ls(details: bool = False):
"""List Buckets."""
http = HTTPContext()
details = http.buckets.status(pipeline=None)
table.add_column("name", justify="left")
for key in details.keys():
table.add_column(key, justify="right")
table.add_row("total", *create_row(details))
if all:
if details:
for key in ["name", "total", "queued", "running", "success", "failure"]:
table.add_column(key, justify="right")
pipelines = http.buckets.pipelines()
for pipeline in pipelines:
details = http.buckets.status(pipeline=pipeline)
row = create_row(details)
info = http.buckets.status(pipeline=pipeline)
row = create_row(info)
table.add_row(pipeline, *row)
elif name:
details = http.buckets.status(pipeline=name)
row = create_row(details)
table.add_row(name, *row)
else:
pipelines = http.buckets.pipelines()
table.add_column("Buckets", max_width=50, justify="left")
for pipeline in pipelines:
table.add_row(pipeline)
console.print(table)


@buckets.command("view", help="View work in a bucket.")
@click.argument("name", type=str, required=True)
@click.option(
"count",
"-c",
"--count",
type=int,
required=False,
default=3,
help="Number of work to show.",
)
def view(name: str, count: int = 3):
"""View work in a bucket.
@buckets.command("ps", help="List work in a bucket.")
@click.argument("bucket", type=str, required=True, default=None)
@click.option("-c", "--count", type=int, default=1, help="Number of works to list.")
def ps(bucket: str, count: int):
"""List work in the bucket.
Args:
name (str): Name of the bucket.
count (int, optional): Number of work to show. Defaults to 3.
bucket (str): Name of the bucket.
count (int): Number of works to list.
"""
http = HTTPContext()
work = http.buckets.view(query={"pipeline": name}, projection={}, limit=count)
print(work)
work = http.buckets.view(query={"pipeline": bucket}, projection={}, limit=count)
console.print(work)


def create_row(details: Dict[str, Any]) -> List[str]:
Expand Down
4 changes: 2 additions & 2 deletions workflow/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import click
from rich.console import Console

# from workflow.cli.buckets import buckets
from workflow.cli.buckets import buckets
from workflow.cli.configs import configs
from workflow.cli.pipelines import pipelines
from workflow.cli.results import results
Expand Down Expand Up @@ -35,7 +35,7 @@ def cli():


cli.add_command(run)
# cli.add_command(buckets)
cli.add_command(buckets)
cli.add_command(results)
cli.add_command(configs)
cli.add_command(pipelines)
Expand Down
14 changes: 7 additions & 7 deletions workflow/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
@click.option(
"-s",
"--site",
type=str,
type=click.STRING,
required=True,
show_default=True,
help="filter work by site.",
)
@click.option(
"-t",
"--tag",
type=str,
type=click.STRING,
multiple=True,
required=False,
default=None,
Expand All @@ -42,7 +42,7 @@
@click.option(
"-p",
"--parent",
type=str,
type=click.STRING,
required=False,
default=None,
show_default=True,
Expand All @@ -51,7 +51,7 @@
@click.option(
"-f",
"--function",
type=str,
type=click.STRING,
required=False,
default=None,
show_default=True,
Expand All @@ -60,7 +60,7 @@
@click.option(
"-c",
"--command",
type=str,
type=click.STRING,
required=False,
default=None,
show_default=True,
Expand All @@ -69,14 +69,14 @@
@click.option(
"--lives",
"-l",
type=int,
type=click.IntRange(min=-1),
default=-1,
show_default=True,
help="count of work to perform.",
)
@click.option(
"--sleep",
type=int,
type=click.IntRange(min=1, max=300),
default=30,
show_default=True,
help="sleep between work attempts.",
Expand Down
50 changes: 13 additions & 37 deletions workflow/http/buckets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
from urllib.parse import urlencode

from requests.models import Response
from rich.console import Console
from rich.prompt import Prompt
from tenacity import retry
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_random

from workflow.http.client import Client
from workflow.utils.decorators import try_request
from workflow.utils.logger import get_logger
from workflow.utils.prompt import confirmation

logger = get_logger("workflow.http.buckets")

Expand Down Expand Up @@ -149,6 +148,8 @@ def delete_many(
pipeline: str,
status: Optional[str] = None,
events: Optional[List[int]] = None,
tags: Optional[List[str]] = None,
parent: Optional[str] = None,
force: bool = False,
) -> bool:
"""Delete works belonging to a pipeline from the buckets backend.
Expand All @@ -170,52 +171,27 @@ def delete_many(
query: Dict[str, Any] = {"pipeline": pipeline}
query.update({"status": status} if status else {})
query.update({"event": {"$in": events}} if events else {})
query.update({"tags": {"$in": tags}} if tags else {})
query.update({"config.parent": parent} if parent else {})
projection = {"id": True}
result = self.view(query, projection)
ids: List[str] = []
if result:
ids = [work["id"] for work in result]
# Get user confirmation before deleting
if ids and not force:
force = self.confirmation(pipeline, len(ids), status, events)
msg = f"Are you sure you want to delete {len(ids)} works?"
# Display upto 5 ids only
msg += "\n\tids: " + ", ".join(ids[:5]) + ("..." if len(ids) > 5 else "")
msg += "\n\tstatus: " + status if status else ""
msg += "\n\tevents: " + ", ".join(map(str, events)) if events else ""
msg += "\n\ttags: " + ", ".join(tags) if tags else ""
msg += "\n\tparent: " + parent if parent else ""
force = confirmation(msg)
if ids and force:
return self.delete_ids(ids)
return False

def confirmation(
self,
pipeline: str,
count: int,
status: Optional[str] = None,
events: Optional[List[int]] = None,
) -> bool:
"""Confirm that the user wants to delete works.
Args:
pipeline (str): Name of the pipeline.
count (int): Number of works to delete.
status (Optional[str], optional): Status. Defaults to None.
events (Optional[List[int]], optional): Events. Defaults to None.
Returns:
bool: Whether the user confirmed the deletion.
"""
console = Console()
# Write a warning message to the console with emojis
console.print("WARNING: This action cannot be undone.", style="bold red")
console.print("You are about to delete the following works:")
console.print(f"Bucket: {pipeline}")
console.print(f"Status: {status if status else 'any'}")
console.print(f"Events: {events if events else 'any'}")
console.print(f"Count : {count}", style="bold red")
response = Prompt.ask("Are you sure? (y/n)", choices=["y", "n"])
if response == "y":
console.print("Deleting...")
return True
else:
console.print("Aborting...")
return False

@try_request
def status(self, pipeline: Optional[str] = None) -> Dict[str, Any]:
"""View the status of the buckets backend.
Expand Down
2 changes: 0 additions & 2 deletions workflow/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,8 @@ def validate_baseurl(cls, baseurl: str) -> str:
response: Response = head(f"{baseurl}/version", timeout=5)
response.raise_for_status()
except RequestException as error:
logger.warning(f"Unable to connect to the {baseurl}.")
logger.warning(error)
except Exception as error:
logger.warning("Unknown error.")
raise error
return baseurl

Expand Down
2 changes: 1 addition & 1 deletion workflow/lifecycle/archive/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def move(path: Path, payload: Optional[List[str]]) -> bool:
file_path=item,
)
# Update payload with new path
payload[index] = (
payload[index] = ( # noqa: E501
f"s3://{os.getenv('WORKFLOW_S3_ENDPOINT')}/workflow/{'/'.join([object_paths, item.split('/')[-1]])}" # noqa: E501
)
# Delete file
Expand Down
Loading

0 comments on commit de14f28

Please sign in to comment.