Skip to content

Commit

Permalink
Merge pull request #86 from pangeo-forge/injections
Browse files Browse the repository at this point in the history
Dynamically determine injections based on installed set of packages
  • Loading branch information
cisaacstern authored Oct 3, 2023
2 parents 9629083 + ae4c41c commit 241c167
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 12 deletions.
1 change: 1 addition & 0 deletions .github/workflows/dataflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
recipes-version: [
"pangeo-forge-recipes==0.9.4",
"pangeo-forge-recipes==0.10.0",
"git+https://github.com/yuvipanda/pangeo-forge-recipes@injections"
]

steps:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ jobs:
recipes-version: [
"pangeo-forge-recipes==0.9.4",
"pangeo-forge-recipes==0.10.0",
"git+https://github.com/yuvipanda/pangeo-forge-recipes@injections"
]

steps:
Expand Down
25 changes: 13 additions & 12 deletions pangeo_forge_runner/commands/bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .. import Feedstock
from ..bakery.base import Bakery
from ..bakery.local import LocalDirectBakery
from ..plugin import get_injections, get_injectionspecs_from_entrypoints
from ..storage import InputCacheStorage, MetadataCacheStorage, TargetStorage
from ..stream_capture import redirect_stderr, redirect_stdout
from .base import BaseCommand, common_aliases, common_flags
Expand Down Expand Up @@ -168,30 +169,30 @@ def start(self):
extra={"status": "setup"},
)

injection_specs = get_injectionspecs_from_entrypoints()

with self.fetch() as checkout_dir:
if not self.job_name:
self.job_name = self.autogenerate_job_name()

callable_args_injections = {
"StoreToZarr": {
"target_root": target_storage.get_forge_target(
job_name=self.job_name
),
}
injection_values = {
"TARGET_STORAGE": target_storage.get_forge_target(
job_name=self.job_name
),
}

cache_target = input_cache_storage.get_forge_target(job_name=self.job_name)
if cache_target:
callable_args_injections |= {
# FIXME: a plugin/entrypoint system should handle injections.
# hardcoding object names here assumes too much.
"OpenURLWithFSSpec": {"cache": cache_target},
}
injection_values |= {"INPUT_CACHE_STORAGE": cache_target}
print(injection_values)
print(injection_specs)

feedstock = Feedstock(
Path(checkout_dir) / self.feedstock_subdir,
prune=self.prune,
callable_args_injections=callable_args_injections,
callable_args_injections=get_injections(
injection_specs, injection_values
),
)

self.log.info("Parsing recipes...", extra={"status": "running"})
Expand Down
151 changes: 151 additions & 0 deletions pangeo_forge_runner/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""
Handle the plugin system for injections.
There are three parts of injections:
1. An "injection spec", provided by other installed packages (such as
pangeo_forge_recipes, pangeo_forge_cmr, etc). This specifies what
*values* exactly will be injected as args for *which* callables.
It is in the form of a dictionary, and looks like this:
```
{
"<callable-1-name>": {
"<argument-1-name>": "<value-spec>",
"<argument-2-name>": "<value-spec>"
},
"<callable-2-name>": {
"<argument-1-name>": "<value-spec>",
"<argument-2-name>": "<value-spec>"
}
}
```
`<value-spec>` specifies what value should be injected. Currently
supported are two strings:
1. `TARGET_STORAGE` - Storage that a pipeline *output* should be written to.
Will be passed as a `pangeo_forge_recipes.storage.FSSpecTarget` object.
2. `INPUT_CACHE_STORAGE` - (Optional) Storage used for caching inputs.
Will be passed as a `pangeo_forge_recipes.storage.CacheFSSpecTarget` object.
Additional values may be provided in the future.
An example is:
```
{
'StoreToZarr': {
'target_root': 'TARGET_STORAGE',
},
'OpenURLWithFSSpec': {
'cache': 'INPUT_CACHE_STORAGE'
}
}
```
We considered making this into an Enum, but that would have required all
packages that provide entrypoints also *import* pangeo_forge_runner. This
was deemed too complicating, and hence raw strings are used.
2. "Injection spec values", calculated by pangeo-forge-runner. This is simply a
mapping of "<value-spec>" to a specific value that will be injected for
that "<value-spec>" in this particular run. This might look like:
```
{
"TARGET_STORAGE": <A `pangeo_forge_recipes.storage.FSSpecTarget` object>,
"INPUT_CACHE_STORAGE": <A `pangeo_forge_recipes.storage.CacheFSSpecTarget` object>
}
```
3. "Injections", ready to be passed on to the rewriter! This merges (1) and (2),
and looks like:
```
{
'StoreToZarr': {
'target_root': <A `pangeo_forge_recipes.storage.FSSpecTarget` object>
},
'OpenURLWithFSSpec': {
'cache': <A `pangeo_forge_recipes.storage.CacheFSSpecTarget` object>
}
}
```
This is what is actually injected into the recipes in the end.
"""
# Use the backported importlib_metadata as we still support Python 3.9
# Once we're on 3.10 we can remove this dependency and use the built in
# importlib.metadata
from importlib_metadata import entry_points
from jsonschema import validate

# Schema for the dictionary returned by injection spec entrypoints
INJECTION_SPEC_SCHEMA = {
"type": "object",
# patternProperties to allow arbitrary keys. The first level keys represent
# callable names.
"patternProperties": {
".+": {
"type": "object",
# Second level keys represent attribute names in the callable, and are also arbitray.
"patternProperties": {
# Value of the second level keys is restricted to just these two
".+": {
"type": "string",
"enum": ["TARGET_STORAGE", "INPUT_CACHE_STORAGE"],
}
},
}
},
"additionalProperties": False,
}


def get_injectionspecs_from_entrypoints():
"""
Collection injectionspecs from installed packages.
Looks for entrypoints defined in installed packages with the
group "pangeo_forge_runner.injections", and calls them all in
an undefined order. They are expected to return a dict with
specification of what exactly should be injected where, and then
merged together.
"""
injection_specs = {}
eps = entry_points(group="pangeo_forge_runner.injection_specs")
for ep in eps:
specs = ep.load()()
# FIXME: This throws an exception, but user doesn't know which plugin actually
# failed validation! provide that information
validate(specs, schema=INJECTION_SPEC_SCHEMA)
# FIXME: This is a shallow merge, should be a deep merge instead
injection_specs |= specs

if injection_specs == {}:
# Handle the specific case of pangeo-forge-recipes==0.10.x,
# which shipped with beam transforms that need injections, but without
# entrypoint based injection specs.
injection_specs = {
"StoreToZarr": {
"target_root": "TARGET_STORAGE",
},
"WriteCombinedReference": {
"target_root": "TARGET_STORAGE",
},
"OpenURLWithFSSpec": {"cache": "INPUT_CACHE_STORAGE"},
}

return injection_specs


def get_injections(injection_spec: dict, injection_values: dict) -> dict[str, str]:
"""
Given an injection_spec and injection_values, provide actual injections
"""
injections = {}

for cls, params in injection_spec.items():
for param, target in params.items():
if target in injection_values:
injections.setdefault(cls, {})[param] = injection_values[target]

return injections
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
"ruamel.yaml",
"pangeo-forge-recipes>=0.9.2",
"escapism",
"jsonschema",
"traitlets",
"importlib-metadata",
# Matches the version of apache_beam in the default image,
# specified in bake.py's container_image traitlet default
"apache-beam[gcp]==2.42.0",
Expand Down

0 comments on commit 241c167

Please sign in to comment.