Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype of beam pipeline for fluxes #63

Open
cisaacstern opened this issue Oct 18, 2023 · 2 comments
Open

Prototype of beam pipeline for fluxes #63

cisaacstern opened this issue Oct 18, 2023 · 2 comments

Comments

@cisaacstern
Copy link

Based on pair session with @jbusecke today:

# airseabeam.py

from dataclasses import dataclass
import apache_beam as beam

def append_val(t: tuple, val: str) -> tuple:
    k, v = t
    return (k, v + val)

class PangeoForgeRecipe(beam.PTransform):
    def expand(self, pcoll: beam.PCollection[tuple]):
        return pcoll | beam.Map(append_val, val="_arco")

class XBeamFilter(beam.PTransform):
    def expand(self, pcoll: beam.PCollection[tuple]):
        return pcoll | beam.Map(append_val, val="_filtered")

def add_spec(t: tuple, spec: str):
    k, v = t
    merged = "+".join(list(v))
    return (k, f"${merged}$_{spec}")

@dataclass
class MixVariables(beam.PTransform):
    spec: str
    def expand(self, pcoll: beam.PCollection[tuple]):
        return pcoll | beam.Map(add_spec, spec=self.spec)

def flatten_tuple(t: tuple):
    k, v = t
    arco, filtered = v
    return (k, (arco[0], filtered[0]))

class XBeamComputeFluxes(beam.PTransform):
    def expand(self, pcoll: beam.PCollection[tuple]):
        # return pcoll | xbeam.Something()
        return pcoll | beam.Map(append_val, val="_flux")

class AirSeaPaper(beam.PTransform):

    def expand(self, pcoll):
        arco = pcoll | PangeoForgeRecipe() # -> Zarr(data_vars={a, b, c})
        filtered = arco | XBeamFilter()  # -> Zarr(data_vars={a_f, b_f, c_f})
        nested = (arco, filtered) | beam.CoGroupByKey() | beam.Map(flatten_tuple)

        a_b_c = nested | "mix 0" >> MixVariables(spec="a,b,c")
        a_b_cf = nested | "mix 1" >> MixVariables(spec="a,b,cf")
        a_bf_cf = nested | "mix 2" >> MixVariables(spec="a,bf,cf")
        fluxes = (a_b_c, a_b_cf, a_bf_cf) | beam.Flatten() | XBeamComputeFluxes()
        return fluxes

if __name__ == "__main__":
    input_data = [("cesm", "cesm_ds"), ("cm26", "cm26_ds")]
    with beam.Pipeline() as p:
        (
            p
            | beam.Create(input_data) 
            | AirSeaPaper()
            | beam.Map(print)
        )
$ python airseabeam.py
('cesm', '$cesm_ds_arco+cesm_ds_arco_filtered$_a,b,c_flux')
('cm26', '$cm26_ds_arco+cm26_ds_arco_filtered$_a,b,c_flux')
('cesm', '$cesm_ds_arco+cesm_ds_arco_filtered$_a,bf,cf_flux')
('cm26', '$cm26_ds_arco+cm26_ds_arco_filtered$_a,bf,cf_flux')
('cesm', '$cesm_ds_arco+cesm_ds_arco_filtered$_a,b,cf_flux')
('cm26', '$cm26_ds_arco+cm26_ds_arco_filtered$_a,b,cf_flux')
@cisaacstern
Copy link
Author

cisaacstern commented Oct 18, 2023

Next steps:

  • Rework the prototype using synthetic xarray data (rather than strings)
  • Apply a basic xbeam operation in place of real fluxes (i.e. some mapped operation)
  • Group current output into two Xarray Datasets: one for cesm_fluxes and one for cm26_fluxes with the spec being a new dimension
  • Aggregate each of the datasets over time

Other topics:

  • Understanding parallelism in Beam pre-deployment (i.e. visualizing a graph)
  • Checkpointing all transforms to Zarr so that we don't need to re-compute ARCO, filtered, or fluxes, if they already exist in some cache. This will require that cached Zarr stores pass some sanity check / test functions.

@jbusecke
Copy link
Collaborator

I am getting back into the processing and just made a sketch of the workflow for a single model. I thought that might be helpful here:
Blank diagram-10

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants