-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Organize, document, and simplify transforms.py #726
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@moradology very awesome insight re: type union, and elegant fixes! I'll freely confess some of the fixes fly above my current level of understanding of Beam composition, but generally feel quite elegant, and if the tests pass, they pass!
Made a few comments, one of which is just a gut check on function naming (the bytes thing)... and the only other of real importance I think is the question about blocking behavior in StoreToZarr
.
Really inspiring work! I always learn something from reading your code! Thanks so much.
@@ -63,12 +63,13 @@ class MergeDim(CombineDim): | |||
operation: ClassVar[CombineOp] = CombineOp.MERGE | |||
|
|||
|
|||
def augment_index_with_start_stop( | |||
def augment_index_with_byte_range( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is indexing logical dimensions in the dataset, not bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks to me as though this function uses the logical position of the provided index to calculate start/stop in terms of bytes, no? Like, the sum of item lengths calculated prior to it is determined based on logical position but the resulting integer is supposed to be the byte-range start is how i read it
position: Position, | ||
item_lens: List[int], | ||
append_offset: int = 0, | ||
) -> IndexedPosition: | ||
"""Take an index _without_ start / stop and add them based on the lens defined in sequence_lens. | ||
"""Take an index _without_ start / stop (byte range) and add them based on the lens defined in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above re: bytes.
) | ||
| beam.GroupByKey() # this has major performance implication | ||
| beam.MapTuple(combine_fragments) | ||
| "group by write chunk key" >> beam.GroupByKey() # group by key ensures locality |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! this is very helpful developer commentary 😃
# TODO: make it so we don't have to explicitly specify combine_dims | ||
# Could be inferred from the pattern instead |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we're abandoning this idea? I have had my doubts that it's possible, since it would be a meta-operation over the pipeline. But i've been wrong about what's possible in beam before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way this reads to me right now, we'd need to pass the same argument to a couple of different transforms (which isn't a bad pattern afaict). To reuse such a value, we'd need to thread the input through as some kind of computed result and I doubt that juice is worth the squeeze
I'm not opposed to keeping the TODO open, if that's desirable. I just couldn't quite imagine how this could be facilitated at the level of recipes
| "make template dataset" >> beam.Map(schema_to_template_ds) | ||
| "generate chunks dynamically" >> beam.Map(self.dynamic_chunking_fn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these labels really help readability! tysm!
pangeo_forge_recipes/transforms.py
Outdated
n_target_stores = rechunked_datasets | StoreDatasetFragments(target_store=target_store) | ||
singleton_target_store = ( | ||
n_target_stores | ||
| beam.combiners.Sample.FixedSizeGlobally(1) | ||
| beam.FlatMap(lambda x: x) # https://stackoverflow.com/a/47146582 | ||
).expand(schema, beam.pvalue.AsSingleton(target_chunks_pcoll)) | ||
|
||
# Actually attempt to write datasets to their target bytes/files | ||
rechunked_datasets | "write chunks" >> beam.Map( | ||
store_dataset_fragment, target_store=beam.pvalue.AsSingleton(target_store) | ||
) | ||
|
||
return singleton_target_store | ||
# return the target store pcollection (a singleton of the fsspec target) | ||
return target_store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason we did this n_target_stores
-> FixedSizeGlobally(1)
nonsense before (and sorry for not documenting this in comments or otherwise!) ... was that without doing this, Beam will not wait on all chunks to be finished writing before yielding back the target store... and in this case, I'm pretty sure we do want to enforce blocking behavior because if we don't, then if we try to do:
... | StoreToZarr() | ConsolidateMetadata()
... | StoreToZarr() | ValidateStoredZarr()
or what-have-you, then the transform that is chained onto StoreToZarr
will receive it's input before chunks have finished writing. Which will make it impossible to consolidate or validate, etc.
So basically, n_target_stores
-> FixedSizeGlobally(1)
ensures we block, and don't let the output of StoreToZarr
be emitted downstream until all chunks are written.
Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch! Makes total sense
dynamic_chunking_fn: Optional[Callable[[xr.Dataset], dict]] = None | ||
dynamic_chunking_fn_kwargs: Optional[dict] = field(default_factory=dict) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an advantage to lower arity here aside from an aesthetically tidier signature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tidy signature is much of the benefit I see but also hopefully passing responsibility downstream to pipeline writers/users in a way that can't hide any magic we might be applying. On this conception of things, the user has one thing to worry about: a function that produces the chunking they desire given a template dataset
pangeo_forge_recipes/transforms.py
Outdated
|
||
Example of using a wrapper function to reduce the arity of a more complex dynamic_chunking_fn: | ||
|
||
Suppose there's a function `calculate_dynamic_chunks` that requires extra parameters: an | ||
`xarray.Dataset`, a `target_chunk_size` in bytes, and a `dim_name` along which to chunk. | ||
To fit the expected signature for `dynamic_chunking_fn`, we can define a wrapper function | ||
that presets `target_chunk_size` and `dim_name`: | ||
|
||
```python | ||
def calculate_dynamic_chunks(ds, target_chunk_size, dim_name) -> Dict[str, int]: | ||
... | ||
|
||
def dynamic_chunking_wrapper(ds: xarray.Dataset) -> Dict[str, int]: | ||
target_chunk_size = 1024 * 1024 * 10 | ||
dim_name = 'time' | ||
return calculate_dynamic_chunks(ds, target_chunk_size, dim_name) | ||
|
||
StoreToZarr(..., dynamic_chunking_fn=dynamic_chunking_wrapper, ...) | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this not rendering correctly. Not sure what your intended design is here, but maybe the paragraph starting with Suppose..
should be dedented and the ```python fence is not necessary if you're indented?
See https://pangeo-forge--726.org.readthedocs.build/en/726/api_reference.html#ptransforms
I should have called this out in the PR description, but I somehow failed to include it. The one piece I'm not yet sure about is this small change to tests - the argument works as expected but it isn't obvious to me why that's now required. Before +1, I'll need to double-check expected behavior |
What I've attempted to do here:
expand
method and constructing classes with compile-time values.