Skip to content

Commit

Permalink
feat(workflow): refactored entire workflow core
Browse files Browse the repository at this point in the history
  • Loading branch information
shinybrar committed Jul 19, 2023
1 parent 176800f commit f77df8f
Show file tree
Hide file tree
Showing 12 changed files with 667 additions and 462 deletions.
495 changes: 262 additions & 233 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ tenacity = "^8.2"
pydantic = "^2.0"
requests = "^2.31"
pyyaml = "^6.0"
pydantic-settings = "^2.0"

[tool.poetry.group.dev.dependencies]
mypy = "^1.4"
Expand Down
50 changes: 41 additions & 9 deletions tests/test_work.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,39 @@
"""Test the work object."""

import pytest
from chime_frb_api.workflow import Work
from pydantic import ValidationError

from workflow import Work


def test_bad_instantiation():
"""Test that the work object can't be instantiated without a pipeline."""
with pytest.raises(KeyError):
with pytest.raises(ValidationError):
Work()


def test_bad_pipeline():
"""Test that the work object can't be instantiated with empty pipeline."""
with pytest.raises(KeyError):
with pytest.raises(ValidationError):
Work(pipeline="", site="local", user="test")


def test_pipeline_reformat():
"""Test that the work object can't be instantiated with empty pipeline."""
work = Work(pipeline="sample test", site="local", user="test")
assert work.pipeline == "sample-test"
work.pipeline = "sample test"
assert work.pipeline == "sample-test"


def test_bad_pipeline_char():
"""Test that the work object can't be instantiated with a pipeline containing
invalid characters.
"""
with pytest.raises(ValidationError):
Work(pipeline="sample-test!", site="local", user="test")


@pytest.mark.parametrize("test_input", ["params", 123, [123, "123", {}], (32, "456")])
def test_bad_parameters_datatype(test_input):
"""Test parameters field not being a dict() object."""
Expand Down Expand Up @@ -47,8 +64,8 @@ def test_work_lifecycle():
def test_json_serialization():
"""Test that the work can be serialized to JSON."""
work = Work(pipeline="test", site="local", user="test")
assert work.json() is not None
assert isinstance(work.json(), str)
assert work.model_dump_json() is not None
assert isinstance(work.model_dump_json(), str)


def test_check_work_payload():
Expand All @@ -62,7 +79,7 @@ def test_make_work_from_dict():
"""Test that the work object can be instantiated from a dictionary."""
work = Work(pipeline="test", site="local", user="test", parameters={"hi": "low"})
work_from_dict = Work.from_dict(work.payload)
work_from_json = Work.from_json(work.json())
work_from_json = Work.from_json(work.model_dump_json())
assert work == work_from_dict == work_from_json


Expand Down Expand Up @@ -122,15 +139,19 @@ def test_site_bad_value(test_input):
Work(pipeline="test", user="test", site=test_input)


def check_command_and_function():
def test_command_and_function():
"""Checks if command and function fields are mutually exclusive."""
with pytest.raises(ValidationError):
Work(
pipeline="test", user="test", site="local", command="test", function="test"
pipeline="test",
user="test",
site="local",
command=["test"],
function="test",
)


def check_slack_notify():
def test_bad_slack_notify():
"""Checks if slack_notify field is of type bool."""
with pytest.raises(ValidationError):
Work(
Expand All @@ -139,3 +160,14 @@ def check_slack_notify():
site="local",
notify={"slack": {"channel": "test"}},
)


def test_good_slack_notify():
"""Checks if slack_notify field is of type bool."""
work = Work(
pipeline="test",
user="test",
site="local",
notify={"slack": {"channel_id": "test"}},
)
assert work.notify.slack.channel_id == "test"
2 changes: 2 additions & 0 deletions workflow.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
WORKFLOW_TOKEN="abc123"
WORKFLOW_SITES=chime,local
2 changes: 1 addition & 1 deletion workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""Top-level imports for Tasks API."""
from .work import Work # noqa: F401
from .work.work import Work # noqa: F401s
8 changes: 8 additions & 0 deletions workflow/configs/chime-frb.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
work:
config:
archive:
enabled: true
modes: ["pass", "copy", "move", "delete", "upload"]
notify:
enabled: true
modes: ["slack"]
sites:
- "canfar"
- "cedar"
Expand All @@ -9,5 +16,6 @@ work:
- "kko"
- "local"


authentication:
enabled: true
2 changes: 1 addition & 1 deletion workflow/lifecycle/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def virtualization() -> Optional[str]:
"""Check if the process is running in a container.
Returns:
str: Container type
Optional[str]: Container type.
"""
try:
with open("/proc/1/cgroup") as f:
Expand Down
Empty file added workflow/work/__init__.py
Empty file.
81 changes: 81 additions & 0 deletions workflow/work/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Work Object Configuration."""
from typing import List, Literal, Optional

from pydantic import BaseModel, ConfigDict, Field


class Archive(BaseModel):
"""Archive Configuration.
This class is used to configure the archive strategy for the work.
Args:
BaseModel (BaseModel): Pydantic BaseModel.
Attributes:
results (bool): Archive results for the work.
products (Literal["pass", "copy", "move", "delete", "upload"]):
Archive strategy for the products.
plots (Literal["pass", "copy", "move", "delete", "upload"]):
Archive strategy for the plots.
logs (Literal["pass", "copy", "move", "delete", "upload"]):
Archive strategy for the logs.
"""

model_config = ConfigDict(validate_default=True, validate_assignment=True)

results: bool = Field(
default=True,
description="Archive results for the work.",
)
products: Literal["pass", "copy", "move", "delete", "upload"] = Field(
default="copy",
description="Archive strategy for the products.",
)
plots: Literal["pass", "copy", "move", "delete", "upload"] = Field(
default="copy",
description="Archive strategy for the plots.",
)
logs: Literal["pass", "copy", "move", "delete", "upload"] = Field(
default="move",
description="Archive strategy for the logs.",
)


class Config(BaseModel):
"""Work Object Configuration.
This class is used to configure the work object.
Args:
BaseModel (BaseModel): Pydantic BaseModel.
"""

model_config = ConfigDict(validate_default=True, validate_assignment=True)

archive: Archive = Archive()
metrics: bool = Field(
default=False,
description="Generate metrics from work lifecycle.",
)
parent: Optional[str] = Field(
default=None,
description="ID of the parent workflow pipeline.",
json_schema_extra={"examples": ["5f9b5c5d7b54b5a9c5e5b5c5"]},
)
orgs: List[str] = Field(
default=["chimefrb"],
description="""
List of organization[s] the work belongs to.
Maps to the Github organization.
""",
json_schema_extra={"examples": ["chimefrb", "chime-sps"]},
)
teams: Optional[List[str]] = Field(
default=None,
description="""
List of team[s] the work belongs to.
Maps to the Github team within the organization.
""",
json_schema_extra={"example": ["frb-tsars", "frb-ops"]},
)
81 changes: 81 additions & 0 deletions workflow/work/notify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Notification Configuration."""
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, ConfigDict, Field, StrictStr


class Slack(BaseModel):
"""Slack Configuration.
This class is used to configure the slack notification strategy for the work.
Args:
BaseModel (BaseModel): Pydantic BaseModel.
Attributes:
channel_id (str): Slack channel to send notifications to.
member_ids (List[str]): Slack members to send notifications to.
message (str): Slack message to send notifications with.
results (bool): Send slack notifications with the work results.
products (bool): Send slack notifications with the work product links.
plots (bool): Send slack notifications with the work plot links.
blocks (Dict[str, Any]): Slack blocks to send notifications with.
reply (Dict[str, Any]): Status of the slack notification.
"""

model_config = ConfigDict(
title="Slack Configuration",
validate_default=True,
validate_assignment=True,
extra="forbid",
)

channel_id: Optional[StrictStr] = Field(
default=None,
description="Slack channel to send notifications to.",
examples=["C01JYQZQX0Y"],
)
member_ids: Optional[List[StrictStr]] = Field(
default=None,
description="Slack members to send notifications to.",
examples=[["U01JYQZQX0Y"]],
)
message: Optional[StrictStr] = Field(
default=None,
description="Slack message to send notifications with.",
examples=["Hello World!"],
)
results: Optional[bool] = Field(
default=None,
description="Send slack notifications with the work results.",
)
products: Optional[bool] = Field(
default=None,
description="Send slack notifications with the work product links.",
)
plots: Optional[bool] = Field(
default=None,
description="Send slack notifications with the work plot links.",
)
blocks: Optional[Dict[str, Any]] = Field(
default=None,
description="Slack blocks to send notifications with.",
)
reply: Optional[Dict[str, Any]] = Field(
default=None,
description="Status of the slack notification.",
examples=[{"ok": True}],
)


class Notify(BaseModel):
"""Work Object Notification Configuration.
Args:
BaseModel (BaseModel): Pydantic BaseModel.
Attributes:
slack (Slack): Send slack notifications for the work.
"""

slack: Slack = Slack()
70 changes: 70 additions & 0 deletions workflow/work/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Workflow Settings."""
from typing import List, Optional

from pydantic import AliasChoices, Field, SecretStr, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
"""Workflow Settings.
Args:
BaseSettings (BaseSettings): Pydantic BaseSettings.
Returns:
Settings: Workflow settings.
"""

model_config = SettingsConfigDict(
env_prefix="WORKFLOW_",
env_file="workflow.env",
env_file_encoding="utf-8",
env_nested_delimiter="__",
secrets_dir="/run/secrets",
str_to_lower=True,
case_sensitive=True,
extra="ignore",
)

token: Optional[SecretStr] = Field(
default=None,
validation_alias=AliasChoices(
"WORKFLOW_TOKEN",
"GITHUB_TOKEN",
"GITHUB_PAT",
"GITHUB_ACCESS_TOKEN",
"GITHUB_PERSONAL_ACCESS_TOKEN",
),
)

sites: Optional[List[str]] = Field(
default=None,
validation_alias=AliasChoices(
"SITES",
"WORKFLOW_SITES",
),
)

tags: Optional[List[str]] = Field(
default=None,
validation_alias=AliasChoices(
"TAGS",
"WORKFLOW_TAGS",
),
)

@field_validator("sites", mode="before")
def sites_validator(cls, sites: str) -> List[str]:
"""Sites Validator.
Args:
sites (str): Value of the sites setting.
Returns:
List[str]: List of sites.
"""
return sites.split(",")


if __name__ == "__main__":
print(Settings().model_dump()) # type: ignore
Loading

0 comments on commit f77df8f

Please sign in to comment.