Skip to content

Commit

Permalink
Attempt to break circularity
Browse files Browse the repository at this point in the history
  • Loading branch information
moradology committed Feb 21, 2024
1 parent 20ebde6 commit 96d9afa
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 36 deletions.
8 changes: 4 additions & 4 deletions pangeo_forge_runner/bakery/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from typing import List

from apache_beam.pipeline import Pipeline, PipelineOptions
from traitlets import TraitError
from traitlets import HasTraits, TraitError
from traitlets.config import LoggingConfigurable

from ..commands.bake import Bake, ExecutionMetadata
from .execution_metadata import ExecutionMetadata


class Bakery(LoggingConfigurable):
Expand Down Expand Up @@ -40,12 +40,12 @@ def bake(self, pipeline: Pipeline, meta: ExecutionMetadata) -> None:
result = pipeline.run()
job_id = result.job_id()
self.log.info(
f"Submitted job {meta.job_id} for recipe {meta.name}",
f"Submitted job {meta.job_id} for recipe {meta.recipe_name}",
extra=meta.to_dict() | {"job_id": job_id, "status": "submitted"},
)

@classmethod
def validate_bake_command(cls, bake_command: Bake) -> List[TraitError]:
def validate_bake_command(cls, bake_command: HasTraits) -> List[TraitError]:
"""
Validates the given bake_command and collects any validation errors.
Expand Down
18 changes: 18 additions & 0 deletions pangeo_forge_runner/bakery/execution_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from dataclasses import asdict, dataclass


@dataclass
class ExecutionMetadata:
"""
Holds metadata for an execution instance, including recipe and job names.
Attributes:
recipe_name (str): Name of the recipe being executed.
job_name (str): Unique name for the job execution.
"""

recipe_name: str
job_name: str

def to_dict(self) -> dict:
return asdict(self)
10 changes: 5 additions & 5 deletions pangeo_forge_runner/bakery/flink.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@

import escapism
from apache_beam.pipeline import Pipeline, PipelineOptions
from traitlets import Bool, Dict, Integer, TraitError, Unicode
from traitlets import Bool, Dict, HasTraits, Integer, TraitError, Unicode

from ..commands.bake import Bake, ExecutionMetadata
from .base import Bakery
from .execution_metadata import ExecutionMetadata


# Copied from https://github.com/jupyterhub/kubespawner/blob/7d6d82c2be469dd76f770d6f6ed0d1dade6b24a7/kubespawner/utils.py#L8
Expand Down Expand Up @@ -380,15 +380,15 @@ def bake(self, pipeline: Pipeline, meta: ExecutionMetadata) -> None:
meta (ExecutionMetadata): An instance of BakeMetadata containing metadata about the bake process.
"""
self.log.info(
f"Running flink job for recipe {meta.name}\n",
f"Running flink job for recipe {meta.recipe_name}\n",
extra=meta.to_dict() | {"status": "running"},
)
pipeline.run()

@classmethod
def validate_bake_command(cls, bake_command: Bake) -> List[TraitError]:
def validate_bake_command(cls, bake_command: HasTraits) -> List[TraitError]:
errors = []
if not bake_command.container_image:
if not bake_command._trait_values["container_image"]:
errors.append(
TraitError(
"'container_image' is required when using the 'FlinkOperatorBakery' "
Expand Down
4 changes: 2 additions & 2 deletions pangeo_forge_runner/bakery/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from apache_beam.pipeline import Pipeline, PipelineOptions
from traitlets import Integer

from ..commands.bake import ExecutionMetadata
from .base import Bakery
from .execution_metadata import ExecutionMetadata


class LocalDirectBakery(Bakery):
Expand Down Expand Up @@ -54,7 +54,7 @@ def bake(self, pipeline: Pipeline, meta: ExecutionMetadata) -> None:
meta (ExecutionMetadata): An instance of BakeMetadata containing metadata about the bake process.
"""
self.log.info(
f"Running local job for recipe {meta.name}\n",
f"Running local job for recipe {meta.recipe_name}\n",
extra=meta.to_dict() | {"status": "running"},
)
pipeline.run()
30 changes: 9 additions & 21 deletions pangeo_forge_runner/commands/bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import re
import string
import time
from dataclasses import asdict, dataclass
from importlib.metadata import distributions
from pathlib import Path

Expand All @@ -17,30 +16,14 @@

from .. import Feedstock
from ..bakery.base import Bakery
from ..bakery.execution_metadata import ExecutionMetadata
from ..bakery.local import LocalDirectBakery
from ..plugin import get_injections, get_injectionspecs_from_entrypoints
from ..storage import InputCacheStorage, TargetStorage
from ..stream_capture import redirect_stderr, redirect_stdout
from .base import BaseCommand, common_aliases, common_flags


@dataclass
class ExecutionMetadata:
"""
Holds metadata for an execution instance, including recipe and job names.
Attributes:
recipe_name (str): Name of the recipe being executed.
job_name (str): Unique name for the job execution.
"""

recipe_name: str
job_name: str

def to_dict(self) -> dict:
return asdict(self)


class Bake(BaseCommand):
"""
Command to bake a pangeo forge recipe in a given bakery
Expand Down Expand Up @@ -138,8 +121,7 @@ def _validate_job_name(self, proposal):
)
return proposal.value

@validate("bakery_class")
def _bakery_impl_validation(self, proposal):
def bakery_impl_validation(self):
"""
Validates the 'bakery_class' trait using class-specific validation logic.
Expand All @@ -150,8 +132,11 @@ def _bakery_impl_validation(self, proposal):
Raises:
- TraitError: If multiple validation errors are encountered, a single TraitError
is raised containing a summary of all error messages.
Note: Traitlets has no convenient way to set initialization hooks (???) so we have
to run this manually.
"""
klass = proposal["value"]
klass = self.bakery_class
if errors := klass.validate_bake_command(self):
if len(errors) == 1:
raise errors[0]
Expand Down Expand Up @@ -202,6 +187,9 @@ def start(self):
"""
Start the baking process
"""
# Validate bakery-specific requirements of this class. Yes, we have to do it here.
self.bakery_impl_validation()

if not "pangeo-forge-recipes" in [d.metadata["Name"] for d in distributions()]:
raise ValueError(
"To use the `bake` command, `pangeo-forge-recipes` must be installed."
Expand Down
11 changes: 7 additions & 4 deletions tests/unit/test_bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pytest
import xarray as xr
from packaging.version import parse as parse_version
from traitlets import TraitError

from pangeo_forge_runner.commands.bake import Bake

Expand Down Expand Up @@ -72,12 +73,13 @@ def test_job_name_validation(job_name, raises):
bake = Bake()
if raises:
with pytest.raises(
ValueError,
TraitError,
match=re.escape(
f"job_name must match the regex ^[a-z][-_0-9a-z]{{0,62}}$, instead found {job_name}"
),
):
bake.job_name = job_name
bake.bakery_impl_validation()
else:
bake.job_name = job_name
assert bake.job_name == job_name
Expand All @@ -90,15 +92,16 @@ def test_job_name_validation(job_name, raises):
["apache/beam_python3.10_sdk:2.51.0", False],
),
)
def test_container_name_validation(container_image, raises):
def test_container_image_validation(container_image, raises):
bake = Bake()
if raises:
with pytest.raises(
ValueError,
match=r"^'container_name' is required.*",
TraitError,
match=r"^'container_image' is required.*",
):
bake.bakery_class = "pangeo_forge_runner.bakery.flink.FlinkOperatorBakery"
bake.container_image = container_image
bake.bakery_impl_validation()
else:
bake.bakery_class = "pangeo_forge_runner.bakery.flink.FlinkOperatorBakery"
bake.container_image = container_image
Expand Down

0 comments on commit 96d9afa

Please sign in to comment.