From f8328c4a185087c0d9df82dbca457bbecea878b7 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Tue, 4 Jul 2023 12:46:46 -0700 Subject: [PATCH 01/13] defer target storage imports --- pangeo_forge_runner/commands/bake.py | 21 +++++++++++++++++++-- pangeo_forge_runner/storage.py | 19 ++++++++++++------- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/pangeo_forge_runner/commands/bake.py b/pangeo_forge_runner/commands/bake.py index 6583a664..03e31913 100644 --- a/pangeo_forge_runner/commands/bake.py +++ b/pangeo_forge_runner/commands/bake.py @@ -170,6 +170,8 @@ def start(self): cache_target = input_cache_storage.get_forge_target(job_name=self.job_name) if cache_target: callable_args_injections |= { + # FIXME: a plugin/entrypoint system should handle injections. + # hardcoding object names here assumes too much. "OpenURLWithFSSpec": {"cache": cache_target}, } @@ -225,13 +227,28 @@ def start(self): elif hasattr(recipe, "to_beam"): # We are in pangeo-forge-recipes <=0.9 # The import has to be here, as this import is not valid in pangeo-forge-recipes>=0.9 + # NOTE: `StorageConfig` only requires a target; input and metadata caches are optional, + # so those are handled conditionally if provided. from pangeo_forge_recipes.storage import StorageConfig recipe.storage_config = StorageConfig( target_storage.get_forge_target(job_name=self.job_name), - input_cache_storage.get_forge_target(job_name=self.job_name), - metadata_cache_storage.get_forge_target(job_name=self.job_name), ) + for attrname, optional_storage in zip( + ("cache", "metadata"), + (input_cache_storage, metadata_cache_storage), + ): + # `.root_path` is an empty string by default, so if the user has not setup this + # optional storage type in config, this block is skipped. + if optional_storage.root_path: + setattr( + recipe.storage_config, + attrname, + optional_storage.get_forge_target( + job_name=self.job_name + ), + ) + # with configured storage now attached, compile recipe to beam pipeline | recipe.to_beam() # Some bakeries are blocking - if Beam is configured to use them, calling diff --git a/pangeo_forge_runner/storage.py b/pangeo_forge_runner/storage.py index 2536d9cd..2a4cb011 100644 --- a/pangeo_forge_runner/storage.py +++ b/pangeo_forge_runner/storage.py @@ -1,5 +1,4 @@ from fsspec import AbstractFileSystem -from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget, MetadataTarget from traitlets import Dict, Type, Unicode from traitlets.config import LoggingConfigurable @@ -36,10 +35,10 @@ class StorageTargetConfig(LoggingConfigurable): """, ) - pangeo_forge_target_class = Type( + pangeo_forge_target_class = Unicode( config=False, help=""" - StorageConfig class from pangeo_forge_recipes to instantiate. + Name of StorageConfig class from pangeo_forge_recipes to instantiate. Should be set by subclasses. """, @@ -51,7 +50,13 @@ def get_forge_target(self, job_name: str): If {job_name} is present in `root_path`, it is expanded with the given job_name """ - return self.pangeo_forge_target_class( + # import dynamically on call, because different versions of `pangeo-forge-recipes.storage` + # contain different objects, so a static top-level import cannot be used. + from pangeo_forge_recipes import storage + + cls = getattr(storage, self.pangeo_forge_target_class) + + return cls( self.fsspec_class(**self.fsspec_args), root_path=self.root_path.format(job_name=job_name), ) @@ -72,7 +77,7 @@ class TargetStorage(StorageTargetConfig): Storage configuration for where the baked data should be stored """ - pangeo_forge_target_class = FSSpecTarget + pangeo_forge_target_class = "FSSpecTarget" class InputCacheStorage(StorageTargetConfig): @@ -80,7 +85,7 @@ class InputCacheStorage(StorageTargetConfig): Storage configuration for caching input files during recipe baking """ - pangeo_forge_target_class = CacheFSSpecTarget + pangeo_forge_target_class = "CacheFSSpecTarget" class MetadataCacheStorage(StorageTargetConfig): @@ -88,4 +93,4 @@ class MetadataCacheStorage(StorageTargetConfig): Storage configuration for caching metadata during recipe baking """ - pangeo_forge_target_class = MetadataTarget + pangeo_forge_target_class = "MetadataTarget" From 21797e16769614ba7ed92487131e2e70784044f3 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Tue, 4 Jul 2023 13:10:55 -0700 Subject: [PATCH 02/13] fix storage base class str method --- pangeo_forge_runner/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pangeo_forge_runner/storage.py b/pangeo_forge_runner/storage.py index 2a4cb011..b988497d 100644 --- a/pangeo_forge_runner/storage.py +++ b/pangeo_forge_runner/storage.py @@ -69,7 +69,7 @@ def __str__(self): fsspec_args_filtered = ", ".join( f"{k}=<{type(v).__name__}>" for k, v in self.fsspec_args.items() ) - return f'{self.pangeo_forge_target_class.__name__}({self.fsspec_class.__name__}({fsspec_args_filtered}, root_path="{self.root_path}")' + return f'{self.pangeo_forge_target_class}({self.fsspec_class.__name__}({fsspec_args_filtered}, root_path="{self.root_path}")' class TargetStorage(StorageTargetConfig): From 38760075ae6f5592cc414e55e2c5ed98590b5a33 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Tue, 4 Jul 2023 13:15:54 -0700 Subject: [PATCH 03/13] testing fixes for 0.10.0 release --- .github/workflows/unit-test.yml | 4 ++-- tests/unit/test_bake.py | 9 ++------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 2e837093..d75ab475 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -15,8 +15,8 @@ jobs: matrix: python-version: ["3.9"] recipes-version: [ - "pangeo-forge-recipes==0.9.2", - "git+https://github.com/pangeo-forge/pangeo-forge-recipes.git@beam-refactor", + "pangeo-forge-recipes==0.9.4", + "pangeo-forge-recipes==0.10.0", ] steps: diff --git a/tests/unit/test_bake.py b/tests/unit/test_bake.py index 3db53e09..51fd0107 100644 --- a/tests/unit/test_bake.py +++ b/tests/unit/test_bake.py @@ -19,13 +19,8 @@ def recipes_version_ref(): ][0] return ( "0.9.x" - # FIXME: for now, beam-refactor is unreleased, so installing from the dev branch - # gives something like "0.9.1.dev86+g6e9c341" as the version. So we just assume any - # version which includes "dev" is the "beam-refactor" branch, because we're not - # installing from any other upstream dev branch at this point. After beam-refactor - # release, we can figure this out based on an explicit version tag, i.e. "0.10.*". - if "dev" not in recipes_version - else "beam-refactor" + if int(recipes_version.split(".")[1]) < 10 + else "beam-refactor" # FIXME: change branch name on test feedstock ) From df4edc984fe8061d61457bcda6981e9afa6d959f Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Tue, 4 Jul 2023 13:27:12 -0700 Subject: [PATCH 04/13] parametrize integration test for recipes versions --- .github/workflows/dataflow.yaml | 6 ++++++ tests/conftest.py | 15 +++++++++++++++ tests/integration/test_dataflow_integration.py | 6 ++++-- tests/unit/test_bake.py | 15 --------------- 4 files changed, 25 insertions(+), 17 deletions(-) diff --git a/.github/workflows/dataflow.yaml b/.github/workflows/dataflow.yaml index c8a38ea7..40e6a8ea 100644 --- a/.github/workflows/dataflow.yaml +++ b/.github/workflows/dataflow.yaml @@ -27,6 +27,10 @@ jobs: fail-fast: false matrix: python-version: ["3.9"] + recipes-version: [ + "pangeo-forge-recipes==0.9.4", + "pangeo-forge-recipes==0.10.0", + ] steps: - uses: actions/checkout@v3 @@ -43,6 +47,8 @@ jobs: python -m pip install --upgrade pip python -m pip install -r dev-requirements.txt python -m pip install -e . + python -m pip install -U ${{ matrix.recipes-version }} + # FIXME: should gcsfs actually be part of some optional installs in setup.py? - name: Install gcsfs run: | diff --git a/tests/conftest.py b/tests/conftest.py index 66049dfa..4470be79 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -57,3 +57,18 @@ def minio(local_ip): proc.wait() assert proc.returncode == 0 + + +@pytest.fixture +def recipes_version_ref(): + # FIXME: recipes version matrix is currently determined by github workflows matrix + # in the future, it should be set by pangeo-forge-runner venv feature? + pip_list = subprocess.check_output("pip list".split()).decode("utf-8").splitlines() + recipes_version = [ + p.split()[-1] for p in pip_list if p.startswith("pangeo-forge-recipes") + ][0] + return ( + "0.9.x" + if int(recipes_version.split(".")[1]) < 10 + else "beam-refactor" # FIXME: change branch name on test feedstock + ) diff --git a/tests/integration/test_dataflow_integration.py b/tests/integration/test_dataflow_integration.py index bbc0bb62..b24c20ab 100644 --- a/tests/integration/test_dataflow_integration.py +++ b/tests/integration/test_dataflow_integration.py @@ -7,7 +7,7 @@ import xarray as xr -def test_dataflow_integration(): +def test_dataflow_integration(recipes_version_ref): bucket = "gs://pangeo-forge-runner-ci-testing" config = { "Bake": { @@ -38,7 +38,9 @@ def test_dataflow_integration(): "--repo", "https://github.com/pforgetest/gpcp-from-gcs-feedstock.git", "--ref", - "0.9.x", + # in the test feedstock, tags are named for the recipes version + # which was used to write the recipe module + recipes_version_ref, "--json", "-f", f.name, diff --git a/tests/unit/test_bake.py b/tests/unit/test_bake.py index 51fd0107..545b899f 100644 --- a/tests/unit/test_bake.py +++ b/tests/unit/test_bake.py @@ -9,21 +9,6 @@ from pangeo_forge_runner.commands.bake import Bake -@pytest.fixture -def recipes_version_ref(): - # FIXME: recipes version matrix is currently determined by github workflows matrix - # in the future, it should be set by pangeo-forge-runner venv feature? - pip_list = subprocess.check_output("pip list".split()).decode("utf-8").splitlines() - recipes_version = [ - p.split()[-1] for p in pip_list if p.startswith("pangeo-forge-recipes") - ][0] - return ( - "0.9.x" - if int(recipes_version.split(".")[1]) < 10 - else "beam-refactor" # FIXME: change branch name on test feedstock - ) - - @pytest.mark.parametrize( "job_name, raises", ( From 74fca2b5ca45d69d36bd62742b69cde141993f97 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Tue, 4 Jul 2023 13:57:40 -0700 Subject: [PATCH 05/13] fallback to default job name in dataflow test --- .github/workflows/dataflow.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/dataflow.yaml b/.github/workflows/dataflow.yaml index 40e6a8ea..36f95d2d 100644 --- a/.github/workflows/dataflow.yaml +++ b/.github/workflows/dataflow.yaml @@ -11,7 +11,6 @@ on: jobs: test-dataflow: - name: test-dataflow-py${{ matrix.python-version }} # run on: # - all pushes to main # - schedule defined above From 120fb8f825036a3d1a9ffe510bf0f6e8ecb9ad35 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Tue, 4 Jul 2023 15:44:26 -0700 Subject: [PATCH 06/13] set beam python in dataflow integration test --- .github/workflows/dataflow.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.github/workflows/dataflow.yaml b/.github/workflows/dataflow.yaml index 36f95d2d..95e6772b 100644 --- a/.github/workflows/dataflow.yaml +++ b/.github/workflows/dataflow.yaml @@ -52,6 +52,13 @@ jobs: - name: Install gcsfs run: | python -m pip install 'gcsfs==2022.8.2' + + # https://github.com/apache/beam/blob/e8d8043f44113aeeeb181bacc8662c2a2f0f642b/sdks/python/apache_beam/runners/portability/stager.py#L654 + - name: Set beam python + run: | + BEAM_PYTHON=`which python` + echo "BEAM_PYTHON=${BEAM_PYTHON}" >> $GITHUB_ENV + - name: 'Run Dataflow Integration Test' run: | pytest -vvvxs tests/integration/test_dataflow_integration.py From e55c0c3235e307581e198387cfc9d5f3922085c3 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Tue, 4 Jul 2023 15:58:09 -0700 Subject: [PATCH 07/13] try with python3.9 executable --- .github/workflows/dataflow.yaml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/dataflow.yaml b/.github/workflows/dataflow.yaml index 95e6772b..fe4d1e8c 100644 --- a/.github/workflows/dataflow.yaml +++ b/.github/workflows/dataflow.yaml @@ -43,20 +43,20 @@ jobs: credentials_json: '${{ secrets.GCP_DATAFLOW_SERVICE_KEY }}' - name: Install dependencies & our package run: | - python -m pip install --upgrade pip - python -m pip install -r dev-requirements.txt - python -m pip install -e . - python -m pip install -U ${{ matrix.recipes-version }} + python3.9 -m pip install --upgrade pip + python3.9 -m pip install -r dev-requirements.txt + python3.9 -m pip install -e . + python3.9 -m pip install -U ${{ matrix.recipes-version }} # FIXME: should gcsfs actually be part of some optional installs in setup.py? - name: Install gcsfs run: | - python -m pip install 'gcsfs==2022.8.2' + python3.9 -m pip install 'gcsfs==2022.8.2' # https://github.com/apache/beam/blob/e8d8043f44113aeeeb181bacc8662c2a2f0f642b/sdks/python/apache_beam/runners/portability/stager.py#L654 - name: Set beam python run: | - BEAM_PYTHON=`which python` + BEAM_PYTHON=`which python3.9` echo "BEAM_PYTHON=${BEAM_PYTHON}" >> $GITHUB_ENV - name: 'Run Dataflow Integration Test' From 94a9df6b775be79d817ad131de7e76ff3e4456f5 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Tue, 4 Jul 2023 16:09:22 -0700 Subject: [PATCH 08/13] try using venv --- .github/workflows/dataflow.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.github/workflows/dataflow.yaml b/.github/workflows/dataflow.yaml index fe4d1e8c..c683f31f 100644 --- a/.github/workflows/dataflow.yaml +++ b/.github/workflows/dataflow.yaml @@ -41,6 +41,13 @@ jobs: uses: 'google-github-actions/auth@v1' with: credentials_json: '${{ secrets.GCP_DATAFLOW_SERVICE_KEY }}' + + - name: Setup venv + run: | + python3.9 -m venv env + source env/bin/activate + echo PATH=$PATH >> $GITHUB_ENV + - name: Install dependencies & our package run: | python3.9 -m pip install --upgrade pip From bf085c8bea28480a2d6a1a16a8b318af3784df99 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Tue, 4 Jul 2023 16:38:16 -0700 Subject: [PATCH 09/13] set sdk_location='container' in dataflow bakery --- pangeo_forge_runner/bakery/dataflow.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pangeo_forge_runner/bakery/dataflow.py b/pangeo_forge_runner/bakery/dataflow.py index 7c6f5694..19118872 100644 --- a/pangeo_forge_runner/bakery/dataflow.py +++ b/pangeo_forge_runner/bakery/dataflow.py @@ -177,6 +177,7 @@ def get_pipeline_options( # https://cloud.google.com/dataflow/docs/guides/using-custom-containers#usage experiments=["use_runner_v2"], sdk_container_image=container_image, + sdk_location="container", # https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors save_main_session=True, # this might solve serialization issues; cf. https://beam.apache.org/blog/beam-2.36.0/ From 2a4e7bda4928283568734f2fa029d78e087d0de4 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Tue, 4 Jul 2023 16:45:02 -0700 Subject: [PATCH 10/13] revert unecessary workflow changes --- .github/workflows/dataflow.yaml | 22 +++++----------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/.github/workflows/dataflow.yaml b/.github/workflows/dataflow.yaml index c683f31f..12195f23 100644 --- a/.github/workflows/dataflow.yaml +++ b/.github/workflows/dataflow.yaml @@ -42,29 +42,17 @@ jobs: with: credentials_json: '${{ secrets.GCP_DATAFLOW_SERVICE_KEY }}' - - name: Setup venv - run: | - python3.9 -m venv env - source env/bin/activate - echo PATH=$PATH >> $GITHUB_ENV - - name: Install dependencies & our package run: | - python3.9 -m pip install --upgrade pip - python3.9 -m pip install -r dev-requirements.txt - python3.9 -m pip install -e . - python3.9 -m pip install -U ${{ matrix.recipes-version }} + python -m pip install --upgrade pip + python -m pip install -r dev-requirements.txt + python -m pip install -e . + python -m pip install -U ${{ matrix.recipes-version }} # FIXME: should gcsfs actually be part of some optional installs in setup.py? - name: Install gcsfs run: | - python3.9 -m pip install 'gcsfs==2022.8.2' - - # https://github.com/apache/beam/blob/e8d8043f44113aeeeb181bacc8662c2a2f0f642b/sdks/python/apache_beam/runners/portability/stager.py#L654 - - name: Set beam python - run: | - BEAM_PYTHON=`which python3.9` - echo "BEAM_PYTHON=${BEAM_PYTHON}" >> $GITHUB_ENV + python -m pip install 'gcsfs==2022.8.2' - name: 'Run Dataflow Integration Test' run: | From c32bb14a8280cf9bedcf29173779a9e75f70e25f Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Tue, 4 Jul 2023 19:43:20 -0700 Subject: [PATCH 11/13] append store_name value in dataflow integration test --- tests/integration/test_dataflow_integration.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_dataflow_integration.py b/tests/integration/test_dataflow_integration.py index b24c20ab..20a4f146 100644 --- a/tests/integration/test_dataflow_integration.py +++ b/tests/integration/test_dataflow_integration.py @@ -92,10 +92,13 @@ def test_dataflow_integration(recipes_version_ref): pytest.fail(f"{state = } is neither 'Done' nor 'Running'") # open the generated dataset with xarray! - gpcp = xr.open_dataset( - config["TargetStorage"]["root_path"].format(job_name=job_name), - engine="zarr", - ) + target_path = config["TargetStorage"]["root_path"].format(job_name=job_name) + if recipes_version_ref == "0.10.x": + # in pangeo-forge-recipes>=0.10.0, an additional `StoreToZarr.store_name` kwarg + # is appended to the formatted root path at execution time. for ref `0.10.x`, + # the value of that kwarg is "gpcp", so we append that here. + target_path += "/gpcp" + gpcp = xr.open_dataset(target_path, engine="zarr") assert ( gpcp.title From 8abc6d3e3b1883d8ba1077bb645a3e17d40d7ff7 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Tue, 4 Jul 2023 19:50:44 -0700 Subject: [PATCH 12/13] update recipes_version_ref for 0.10.x tag --- tests/conftest.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 4470be79..a67ba59d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -67,8 +67,8 @@ def recipes_version_ref(): recipes_version = [ p.split()[-1] for p in pip_list if p.startswith("pangeo-forge-recipes") ][0] - return ( - "0.9.x" - if int(recipes_version.split(".")[1]) < 10 - else "beam-refactor" # FIXME: change branch name on test feedstock - ) + # the recipes_version is a 3-element semantic version of form `0.A.B` where A is either minor + # version `9` or `10`. the test feedstock (pforgetest/gpcp-from-gcs-feedstock) has tags for + # each of these minor versions, of the format `0.A.x`, so we translate the installed version + # of pangeo-forge-recipes to one of the valid tags (either `0.9.x` or `0.10.x`) here. + return f"0.{recipes_version.split('.')[1]}.x" From e84eac29535267d491783c6a4d93543b5483e85f Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Tue, 4 Jul 2023 19:53:08 -0700 Subject: [PATCH 13/13] fix test_bake recipes_version_ref conditional --- tests/unit/test_bake.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_bake.py b/tests/unit/test_bake.py index 545b899f..6f3752bf 100644 --- a/tests/unit/test_bake.py +++ b/tests/unit/test_bake.py @@ -121,12 +121,12 @@ def test_gpcp_bake( else: assert job_name.startswith("gh-pforgetest-gpcp-from-gcs-") - # In beam-refactor, the actual zarr store is produced in a + # In pangeo-forge-recipes>=0.10.0, the actual zarr store is produced in a # *subpath* of target_storage.rootpath, rather than in the # root path itself. This is a compatibility break vs the previous # versions of pangeo-forge-recipes. https://github.com/pangeo-forge/pangeo-forge-recipes/pull/495 # has more information - if recipes_version_ref == "beam-refactor": + if recipes_version_ref == "0.10.x": zarr_store_path = config["TargetStorage"]["root_path"] + "gpcp/" else: zarr_store_path = config["TargetStorage"]["root_path"]