Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Allow expressions to be shipped to the scheduler #294

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Sep 8, 2023

Given the following two PRs, these minor changes would allow us to pass the expressions to the scheduler via pickle

cc @phofl

Comment on lines +107 to +230
def __dask_tokenize__(self):
return self.expr._name
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just defined as part of the collections protocol. Not sure if it is actually required

Comment on lines -161 to -332
def __dask_keys__(self):
out = self.expr
out = out.lower_completely()
return out.__dask_keys__()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having keys defined on the collections level feels like an abstraction leak. Among other things this is what threw me off for a while when implementing this the first time. I had a hard time distinguishing collections from graphs in the existing code. I find this now a bit clearer in the above PRs

Comment on lines 178 to 182
def finalize_compute(self) -> FrameBase:
from ._repartition import RepartitionToFewer
if self.npartitions > 1:
return new_collection(RepartitionToFewer(self.expr, 1))
return self
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really new but instead of doing the graph mutations ourselves by calling postcompute, etc. in the client or in dask, we are now effectively just modifying the expression to what the end result should look like. In this case, it's just a concat of all partitions

Comment on lines 184 to 185
def postpersist(self, futures: dict) -> NewDaskCollection:
return from_graph(futures, self._meta, self.divisions, self._name)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also not really new but I moved away from the "method that returns a callable [...]" API to just calling the callable. I haven't checked with any downstream projects, yet, if this complex callable construct is actually required. Not married to anything here.

dask_expr/_expr.py Outdated Show resolved Hide resolved
@@ -699,8 +702,10 @@ def dtypes(self):
def _meta(self):
raise NotImplementedError()

def __dask_graph__(self):
def _materialize(self):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underscore is probably a bit silly. Will likely change this again

Comment on lines 38 to 41
from dask.typing import DaskGraph
# Note: subclassing isn't required. This is just for the prototype to have a
# check for abstractmethods but the runtime checks for duck-typing/protocol only
class Expr(DaskGraph):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the note says, there is no need for subclassing. this is just for prototyping (or we keep it, no strong preferences)

@fjetter
Copy link
Member Author

fjetter commented Sep 8, 2023

CI is obviously failing because I haven't set up CI to run against dask / distributed. I'll clean this up soon on the other PRs as well

@fjetter
Copy link
Member Author

fjetter commented Sep 8, 2023

There is only one caveat so far: Since we're delaying materialization until the graph is on the scheduler and since we don't have a concept of merge(collection1, collection2) in dask-expr yet (basically what dask.base.collections_to_dsk is doing) we have to submit expressions sequentially to the scheduler, i.e.

dask.compute(collection1, collection2) will generate two requests to the scheduler.

This has the shortcoming that we will not be performing a "multi-collections" optimization and we will not order the entire graph at once. We will still deduplicate keys that are in both collections.

I find this approach a decent compromise to get started. If we notice this to be a severe problem we can think about a merge operation of some sorts.

Edit: The fact that we're calling dask.order on the two above collections separately may cause the performance of the combined operation to be sensitive to ordering. I do suspect, though, that this is mostly an academic problem and real world use cases should not be impacted. This requires a bit of testing.

@mrocklin
Copy link
Member

mrocklin commented Sep 8, 2023

There is only one caveat so far: Since we're delaying materialization until the graph is on the scheduler and since we don't have a concept of merge(collection1, collection2) in dask-expr

What if we made a Tuple(Expr) term? Would that resolve this issue? (I haven't thought deeply about what's going on here yet, just throwing out ideas)

Comment on lines 281 to 287
@property
def dask(self):
return self.__dask_graph__()
# FIXME: This is highly problematic. Defining this as a property can
# cause very unfortunate materializations. Even a mere hasattr(obj,
# "dask") check already triggers this since it's a property, not even a
# method.
return self.__dask_graph_factory__().optimize().materialize()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we remove it completely? I guess some bits in xarray and pint will break but there will be work to be done in xarray/pint anyway so I don't see a problem.

return self.__dask_graph_factory__().optimize().materialize()

def finalize_compute(self):
return new_collection(Repartition(self.expr, 1))
Copy link
Collaborator

@crusaderky crusaderky Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very problematic in the fairly common use case where the client mounts a lot more memory than a single worker. This forces the whole object to be unnecessarily collected onto a single worker and then sent to the client, whereas we could just have the client fetch separate partitions from separate workers (which may or may not happen all at once if it needs to transit through the scheduler).

This replicates the issue with the current finalizer methods in dask/dask, which are created by dask.compute(df) but are skipped by df.compute().

Memory considerations aside, bouncing through a single worker instead of collecting it on the client directly is adding latency.

Copy link
Member Author

@fjetter fjetter Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is exactly how it is done right now and I don't intend to touch that behavior now

Copy link
Member Author

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

had some staged comments...

Comment on lines 2460 to 3088
class Tuple(Expr):
def __getitem__(self, other):
return self.operands[other]

def _layer(self) -> dict:
return toolz.merge(op._layer() for op in self.operands)

def __dask_output_keys__(self) -> list:
return list(flatten(op.__dask_output_keys__() for op in self.operands))

def __len__(self):
return len(self.operands)

def __iter__(self):
return iter(self.operands)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can tell, the tuple works out well enough for our purpose here.

Comment on lines +95 to +108
@classmethod
def combine_factories(cls, *exprs: Expr, **kwargs) -> Expr:
return Tuple(*exprs)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly syntactic sugar and I don't know if I want to keep this. I see a way forward to just move HLGs and old-style collections to the new protocol and nuke a lot of compat code. In this case, such a hook here would be useful. For now, you can ignore this

Comment on lines 282 to 287
def dask(self):
return self.__dask_graph__()
# FIXME: This is highly problematic. Defining this as a property can
# cause very unfortunate materializations. Even a mere hasattr(obj,
# "dask") check already triggers this since it's a property, not even a
# method.
return self.__dask_graph_factory__().optimize().materialize()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is the most controversial thing. I would actually like to throw this out since it triggers materialization soooo easily. Even an attribute lookup is sufficient to do this.

FWIW I encountered similar problems with the divisions property which triggers not only materialization but quantile computation!

return self.__dask_graph_factory__().optimize().materialize()

def finalize_compute(self):
return new_collection(Repartition(self.expr, 1))
Copy link
Member Author

@fjetter fjetter Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is exactly how it is done right now and I don't intend to touch that behavior now

@fjetter
Copy link
Member Author

fjetter commented Dec 22, 2023

Brief update: This is mostly working. There are still failing tests I have to track down. Most of the things I had to fix so far are somehow coupled to me adding a Repartition expression as a postcompute which changes how the graph is handled compared to how it is done right now. As a reminder, right now it is materialized and this concat/repartition task is added to the low level graph.

This reveals stuff like dask/dask#10739

I will likely want to nuke the dask property. In the test suite this does all kinds of weird things like triggering quantile computations during an hasattr check. I don't think this is user friendly. This isn't a decision that has to be done on this PR, though and I suggest to postpone this to a follow up PR. This one with the siblings in dask and distributed will be big enough once everything is done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants