Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defer metadata target import #77

Merged
merged 13 commits into from
Jul 5, 2023
Merged
9 changes: 8 additions & 1 deletion .github/workflows/dataflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ on:

jobs:
test-dataflow:
name: test-dataflow-py${{ matrix.python-version }}
# run on:
# - all pushes to main
# - schedule defined above
Expand All @@ -27,6 +26,10 @@ jobs:
fail-fast: false
matrix:
python-version: ["3.9"]
recipes-version: [
"pangeo-forge-recipes==0.9.4",
"pangeo-forge-recipes==0.10.0",
]

steps:
- uses: actions/checkout@v3
Expand All @@ -38,15 +41,19 @@ jobs:
uses: 'google-github-actions/auth@v1'
with:
credentials_json: '${{ secrets.GCP_DATAFLOW_SERVICE_KEY }}'

- name: Install dependencies & our package
run: |
python -m pip install --upgrade pip
python -m pip install -r dev-requirements.txt
python -m pip install -e .
python -m pip install -U ${{ matrix.recipes-version }}

# FIXME: should gcsfs actually be part of some optional installs in setup.py?
- name: Install gcsfs
run: |
python -m pip install 'gcsfs==2022.8.2'

- name: 'Run Dataflow Integration Test'
run: |
pytest -vvvxs tests/integration/test_dataflow_integration.py
4 changes: 2 additions & 2 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ jobs:
matrix:
python-version: ["3.9"]
recipes-version: [
"pangeo-forge-recipes==0.9.2",
"git+https://github.com/pangeo-forge/pangeo-forge-recipes.git@beam-refactor",
"pangeo-forge-recipes==0.9.4",
"pangeo-forge-recipes==0.10.0",
]

steps:
Expand Down
1 change: 1 addition & 0 deletions pangeo_forge_runner/bakery/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def get_pipeline_options(
# https://cloud.google.com/dataflow/docs/guides/using-custom-containers#usage
experiments=["use_runner_v2"],
sdk_container_image=container_image,
sdk_location="container",
# https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors
save_main_session=True,
# this might solve serialization issues; cf. https://beam.apache.org/blog/beam-2.36.0/
Expand Down
21 changes: 19 additions & 2 deletions pangeo_forge_runner/commands/bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ def start(self):
cache_target = input_cache_storage.get_forge_target(job_name=self.job_name)
if cache_target:
callable_args_injections |= {
# FIXME: a plugin/entrypoint system should handle injections.
# hardcoding object names here assumes too much.
"OpenURLWithFSSpec": {"cache": cache_target},
}

Expand Down Expand Up @@ -225,13 +227,28 @@ def start(self):
elif hasattr(recipe, "to_beam"):
# We are in pangeo-forge-recipes <=0.9
# The import has to be here, as this import is not valid in pangeo-forge-recipes>=0.9
# NOTE: `StorageConfig` only requires a target; input and metadata caches are optional,
# so those are handled conditionally if provided.
from pangeo_forge_recipes.storage import StorageConfig

recipe.storage_config = StorageConfig(
target_storage.get_forge_target(job_name=self.job_name),
input_cache_storage.get_forge_target(job_name=self.job_name),
metadata_cache_storage.get_forge_target(job_name=self.job_name),
)
for attrname, optional_storage in zip(
("cache", "metadata"),
(input_cache_storage, metadata_cache_storage),
):
# `.root_path` is an empty string by default, so if the user has not setup this
# optional storage type in config, this block is skipped.
if optional_storage.root_path:
setattr(
recipe.storage_config,
attrname,
optional_storage.get_forge_target(
job_name=self.job_name
),
)
# with configured storage now attached, compile recipe to beam
pipeline | recipe.to_beam()

# Some bakeries are blocking - if Beam is configured to use them, calling
Expand Down
21 changes: 13 additions & 8 deletions pangeo_forge_runner/storage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from fsspec import AbstractFileSystem
from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget, MetadataTarget
from traitlets import Dict, Type, Unicode
from traitlets.config import LoggingConfigurable

Expand Down Expand Up @@ -36,10 +35,10 @@ class StorageTargetConfig(LoggingConfigurable):
""",
)

pangeo_forge_target_class = Type(
pangeo_forge_target_class = Unicode(
config=False,
help="""
StorageConfig class from pangeo_forge_recipes to instantiate.
Name of StorageConfig class from pangeo_forge_recipes to instantiate.

Should be set by subclasses.
""",
Expand All @@ -51,7 +50,13 @@ def get_forge_target(self, job_name: str):

If {job_name} is present in `root_path`, it is expanded with the given job_name
"""
return self.pangeo_forge_target_class(
# import dynamically on call, because different versions of `pangeo-forge-recipes.storage`
# contain different objects, so a static top-level import cannot be used.
from pangeo_forge_recipes import storage

cls = getattr(storage, self.pangeo_forge_target_class)

return cls(
self.fsspec_class(**self.fsspec_args),
root_path=self.root_path.format(job_name=job_name),
)
Expand All @@ -64,28 +69,28 @@ def __str__(self):
fsspec_args_filtered = ", ".join(
f"{k}=<{type(v).__name__}>" for k, v in self.fsspec_args.items()
)
return f'{self.pangeo_forge_target_class.__name__}({self.fsspec_class.__name__}({fsspec_args_filtered}, root_path="{self.root_path}")'
return f'{self.pangeo_forge_target_class}({self.fsspec_class.__name__}({fsspec_args_filtered}, root_path="{self.root_path}")'


class TargetStorage(StorageTargetConfig):
"""
Storage configuration for where the baked data should be stored
"""

pangeo_forge_target_class = FSSpecTarget
pangeo_forge_target_class = "FSSpecTarget"


class InputCacheStorage(StorageTargetConfig):
"""
Storage configuration for caching input files during recipe baking
"""

pangeo_forge_target_class = CacheFSSpecTarget
pangeo_forge_target_class = "CacheFSSpecTarget"


class MetadataCacheStorage(StorageTargetConfig):
"""
Storage configuration for caching metadata during recipe baking
"""

pangeo_forge_target_class = MetadataTarget
pangeo_forge_target_class = "MetadataTarget"
15 changes: 15 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,18 @@ def minio(local_ip):
proc.wait()

assert proc.returncode == 0


@pytest.fixture
def recipes_version_ref():
# FIXME: recipes version matrix is currently determined by github workflows matrix
# in the future, it should be set by pangeo-forge-runner venv feature?
pip_list = subprocess.check_output("pip list".split()).decode("utf-8").splitlines()
recipes_version = [
p.split()[-1] for p in pip_list if p.startswith("pangeo-forge-recipes")
][0]
# the recipes_version is a 3-element semantic version of form `0.A.B` where A is either minor
# version `9` or `10`. the test feedstock (pforgetest/gpcp-from-gcs-feedstock) has tags for
# each of these minor versions, of the format `0.A.x`, so we translate the installed version
# of pangeo-forge-recipes to one of the valid tags (either `0.9.x` or `0.10.x`) here.
return f"0.{recipes_version.split('.')[1]}.x"
17 changes: 11 additions & 6 deletions tests/integration/test_dataflow_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import xarray as xr


def test_dataflow_integration():
def test_dataflow_integration(recipes_version_ref):
bucket = "gs://pangeo-forge-runner-ci-testing"
config = {
"Bake": {
Expand Down Expand Up @@ -38,7 +38,9 @@ def test_dataflow_integration():
"--repo",
"https://github.com/pforgetest/gpcp-from-gcs-feedstock.git",
"--ref",
"0.9.x",
# in the test feedstock, tags are named for the recipes version
# which was used to write the recipe module
recipes_version_ref,
"--json",
"-f",
f.name,
Expand Down Expand Up @@ -90,10 +92,13 @@ def test_dataflow_integration():
pytest.fail(f"{state = } is neither 'Done' nor 'Running'")

# open the generated dataset with xarray!
gpcp = xr.open_dataset(
config["TargetStorage"]["root_path"].format(job_name=job_name),
engine="zarr",
)
target_path = config["TargetStorage"]["root_path"].format(job_name=job_name)
if recipes_version_ref == "0.10.x":
# in pangeo-forge-recipes>=0.10.0, an additional `StoreToZarr.store_name` kwarg
# is appended to the formatted root path at execution time. for ref `0.10.x`,
# the value of that kwarg is "gpcp", so we append that here.
target_path += "/gpcp"
gpcp = xr.open_dataset(target_path, engine="zarr")

assert (
gpcp.title
Expand Down
24 changes: 2 additions & 22 deletions tests/unit/test_bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,6 @@
from pangeo_forge_runner.commands.bake import Bake


@pytest.fixture
def recipes_version_ref():
# FIXME: recipes version matrix is currently determined by github workflows matrix
# in the future, it should be set by pangeo-forge-runner venv feature?
pip_list = subprocess.check_output("pip list".split()).decode("utf-8").splitlines()
recipes_version = [
p.split()[-1] for p in pip_list if p.startswith("pangeo-forge-recipes")
][0]
return (
"0.9.x"
# FIXME: for now, beam-refactor is unreleased, so installing from the dev branch
# gives something like "0.9.1.dev86+g6e9c341" as the version. So we just assume any
# version which includes "dev" is the "beam-refactor" branch, because we're not
# installing from any other upstream dev branch at this point. After beam-refactor
# release, we can figure this out based on an explicit version tag, i.e. "0.10.*".
if "dev" not in recipes_version
else "beam-refactor"
)


@pytest.mark.parametrize(
"job_name, raises",
(
Expand Down Expand Up @@ -141,12 +121,12 @@ def test_gpcp_bake(
else:
assert job_name.startswith("gh-pforgetest-gpcp-from-gcs-")

# In beam-refactor, the actual zarr store is produced in a
# In pangeo-forge-recipes>=0.10.0, the actual zarr store is produced in a
# *subpath* of target_storage.rootpath, rather than in the
# root path itself. This is a compatibility break vs the previous
# versions of pangeo-forge-recipes. https://github.com/pangeo-forge/pangeo-forge-recipes/pull/495
# has more information
if recipes_version_ref == "beam-refactor":
if recipes_version_ref == "0.10.x":
zarr_store_path = config["TargetStorage"]["root_path"] + "gpcp/"
else:
zarr_store_path = config["TargetStorage"]["root_path"]
Expand Down
Loading