Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expr as singleton #798

Merged
merged 8 commits into from
Feb 2, 2024
Merged

Expr as singleton #798

merged 8 commits into from
Feb 2, 2024

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Jan 23, 2024

Closes #796
Closes #800

  1. the attribute checkup in Expr.rewrite is expensive since it checks for an element in a list. This cae4df1 commit removes that check and simply ensures the two rewrites that are currently used exists. I think this would be a sensible change but a proper PR should do this in a clean fashion. This removes all issues with the rewrite stack in my testing
  2. This one is a little more controversial. I'm ensuring that the Expr objects are in fact singletons by implementing a __new__ method. This ensures that there can always ever by one Expr with a given _name (that is based on tokenization)

A couple of observations

  1. The performance of optimize improves by roughly an order of magnitude down from 10-12s to 1.12s on my machine.
  2. There are two tests that fail with this. I strongly suspect that this is due to improper tokenization since I haven't finished Non deterministic tokenization for empty numpy arrays after pickle roundtrip dask#10799 yet. I will use this test case to root out another broken token...
  3. 1.12s is about as good as it gets with this but this still feels way too slow for such a small example... (I'll post more detailed measurements shortly)

@fjetter fjetter changed the title Expr as singleton + Ensure rewrite methods always exist [DNM] Expr as singleton + Ensure rewrite methods always exist Jan 23, 2024
@mrocklin
Copy link
Member

In SymPy we switched to using __new__ for performance reasons. There's good prior art here I think.

if self._required_attribute:
dep = next(iter(self.dependencies()))._meta
if not hasattr(dep, self._required_attribute):
_dependencies = [operand for operand in operands if isinstance(operand, Expr)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make this a cached property? I think that ideally we have as little computed state as possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that we shouldn't need to "attach" _dependencies to the Expr class here, but that information is being used to check _required_attribute below anyway.

I don't think we really need the _required_attribute check anymore if we update the Expr.__getattr__ to avoid throwing an AttributeError when any metadata calculation (._meta, ._meta_chunk, etc.) fails. I'll submit a simple PR to strip out the _required_attribute stuff.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just lazy and don't really care what we do with _dependencies. Personally, I consider the cached property an indirection and a little more difficult to grasp but I don't have strong opinions here. We have to compute it either way at this point so it's just a matter of code organization.

Getting rid of the _required_attribute check would of course simplify this quite a bit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR removes _required_atttribute: #799

(not that removing that attribute solves anything here - just nice to remove unnecessary code)

@fjetter
Copy link
Member Author

fjetter commented Jan 23, 2024

Pushed a version of the caching as I proposed in #797 (comment) but I'm using an attribute instead of a list with the key. With this, the TPC Q1 optimization is down to 200ms

Edit: removed that again since the caching is not straight forward, see #798 (comment) for a discussion

dask_expr/_core.py Outdated Show resolved Hide resolved
@fjetter
Copy link
Member Author

fjetter commented Jan 24, 2024

Well, the singleton approach as suggested here will clash pretty hard with the way caching is implemented on the ReadParquet expression.

In essence, the tokenization of the expression class does not reliably tie the expression to the dataset to be read (the path is not sufficient, a modified timestamp or etag should be part of this as well). If we reuse instances, this will fail hard as soon as the dataset is mutated.
To a lesser extend, the same is of course true for all IO layers but it is particularly bad since the cached properties are also backed by a global cache for the case of parquet.

@rjzamora
Copy link
Member

rjzamora commented Jan 24, 2024

In essence, the tokenization of the expression class does not reliably tie the expression to the dataset to be read (the path is not sufficient, a modified timestamp or etag should be part of this as well). If we reuse instances, this will fail hard as soon as the dataset is mutated.

Ah, interesting. The global caching component of ReadParquet is actually not a problem at all. The problem is entirely caused by in the cached _dataset_info property. The global cache is always cleared when to_parquet is called, but the objects local _dataset_info cache is not cleared. (sorry if this is obvious to you - just sharing my understanding)

EDIT: I guess the global cache is only cleared if overwrite is specified. My mistake

@fjetter fjetter changed the title [DNM] Expr as singleton + Ensure rewrite methods always exist Expr as singleton Feb 1, 2024
Comment on lines +546 to +551
for file in files_for_checksum:
# The checksum / file info is usually already cached by the fsspec
# FileSystem dir_cache since this info was already asked for in
# _collect_dataset_info
checksum.append(fs.checksum(file))
dataset_info["checksum"] = tokenize(checksum)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To deal with the cache consistency problem described in #800 I am calculating a checksum here. For s3 this falls back to using the ETag provided in the listdir response. This should not add any overhead since this stuff is already cached by fsspec.
We're either taking the checksum of the metadata file or of all files that we iterate over. At this point this listdir operation is already done so the checksum identifies every dataset uniquely. Since adding this checksum to the dataset_info, this also guarantees that the cache for the plan is invalidated if the dataset changes.

@@ -432,6 +416,7 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO):
"kwargs": None,
"_partitions": None,
"_series": False,
"_dataset_info_cache": list,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new concept here is that I am moving off a global cache. The dataset_info is always calculated whenever a user calls read_parquet(foo) and will therefore always receive an accurate representation of the dataset at the time this is called.
This dataset_info is cached in this paramter. I am choosing a list as a container but this could be anything. I could also just set the operand and mutate the expression in place.

The benefit of using a paramter for this cache is that the cache will naturally propagate to all derived instances, e.g. whenever we rewrite the expression using Expr.substitute_parameters. This allows us to maintain the cache during optimization and it ties the lifetime of the cache to the lifetime of the expression ancestry removing any need for us to invalidate the cache ever.

Comment on lines +462 to +467
def _name(self):
return (
funcname(type(self)).lower()
+ "-"
+ _tokenize_deterministic(self.checksum, *self.operands)
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this checksum is part of the _name allowing us to differentiate expressions that point to modified states of the dataset. It also allows us to reuse already cached "plans / divisions" if the dataset did not change which is the most common case

Comment on lines 470 to 476
def checksum(self):
return self._dataset_info["checksum"]

@property
def _dataset_info(self):
if rv := self.operand("_dataset_info_cache"):
return rv[0]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the ReadParquet expression is initialized first during a read_parquet this cache is empty and we'll fetch the dataset_info essentially during construction time of the expression object.

Subsequent expressions that are derived which are inheriting the cache will just access this making the __new__ call instantaneous.

@fjetter fjetter marked this pull request as ready for review February 1, 2024 11:24
@fjetter
Copy link
Member Author

fjetter commented Feb 1, 2024

interesting, the dask/dask tests are running into tokenization issues while the dask-expr tests are all green. Will have to dig into this a little

@fjetter
Copy link
Member Author

fjetter commented Feb 1, 2024

ok, this one requires two minor fixes in dask/dask, see dask/dask#10880

@fjetter
Copy link
Member Author

fjetter commented Feb 1, 2024

Well, the performance is better but still not where I'd like it to be

image

image

We have a very consistent improvement with this PR but some of the queries appear to still scratch the 1s mark.

The reason for this is that the current branch uses a weakvalue dictionary to determine whether an object already exists but it looks like the expr objects are going out of scope too quickly such that the instances are indeed being recreated frequently. Looking at Q1, the main branch creates about 20k new expression instances and with this branch I can half this. However, over the lifetime of the optimization there are only about 500 unique expressions.
Switching from a weakvalue dictionary to something more persistent is currently also not easily possible since the collect_dependents mechanism does rely on the weakref semantics.

@phofl
Copy link
Collaborator

phofl commented Feb 1, 2024

We should be able to remove the weak refs if we keep track of the "number of references" ourselves on top of this PR

@fjetter
Copy link
Member Author

fjetter commented Feb 1, 2024

If we combine this approach with #831 we'll get down further

image

(higher is better here)
image

The one thing that is not budging is Q21. Looking into that, it appears that there is an insane amount of substitution happening during blockwise fusion
Screenshot 2024-02-01 at 17 55 54

@phofl
Copy link
Collaborator

phofl commented Feb 1, 2024

21 is because the 3 filters blow up the number of expressions, similar to what assign did in merge in the past. I am almost done with the filter pushdown in merges which will remove those problems entirely

@fjetter
Copy link
Member Author

fjetter commented Feb 1, 2024

With both changes, Q1 optimize is about as fast as it can get at around 500ms. Further speedup is possible by improving tokenization, simplifying the attribute lookup for Expr and speeding up convergence of the optimizer overall to require fewer steps. Even the meta computation is rather negligible at this point. I think 500ms is fine (for now) but I'd like to understand the substitude thing a little better.

@fjetter
Copy link
Member Author

fjetter commented Feb 1, 2024

21 is because the 3 filters blow up the number of expressions, similar to what assign did in merge in the past. I am almost done with the filter pushdown in merges which will remove those problems entirely

There are around ~700 unique expressions in that query during optimization. That's not that much (compared to ~500 for Q1)

@fjetter
Copy link
Member Author

fjetter commented Feb 2, 2024

ok, so I looked into the Q21 performance which has nothing to do with caching but is an algorithmic problem in the recursion of substitute. I documented this in #835

@@ -36,7 +36,7 @@ class FromGraph(IO):
conversion from legacy dataframes.
"""

_parameters = ["layer", "_meta", "divisions", "_name"]
_parameters = ["layer", "_meta", "divisions", "keys", "name_prefix"]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FromGraph thing got a little odd. For context, this thing is used in two ways

  1. As a wrapper for futures when persist is called
  2. As a wrapper for legacy dataframes

The previous implementation accepted the _name as an input argument. For the persisted dataframe, this was the name of the original expression. For the wrapped one, it is the name of the legacy dataframe.

The problem is that the legacy dataframe does not have the same strict uniqueness guarantees as the expressions do so setting duplicate names is very easy. In fact, our tests where doing just that! (and still are). This caused the Expr.__new__ to deduplicate and effectively ignore the second dataframe... oops.

For persist it is also a little odd since if the FromGraph expression is inheriting the exact name of it's ancestor, there exist now two expressions of a different type with the same name. This is odd.

Effectively, with this chosen singleton implementation, setting the _name explicitly instead of calculating it using a hash is a cardinal sin and will cause all sorts of weird things to happen.

Now, this can be fixed but that has a rather big caveat. If I have to redefine the name of the expression, I actually also have to rewrite the graph! Many places in dask-expr are (imo incorrectly) assuming that output key names of a dataframe layer/expression are universally built as (df._name, i) and are hard coding this when implementing their own layer (instead of iterating over i, iterating over df.__dask_keys__() would maintain the abstraction). This rewrite adds effectively another layer of keys. In reality this is really ugly since when computing something on top of a persisted dataframe, there will always be this dummy key in between.

Alternatively, I could make the singleton deduplication type aware to give the FromGraph thing an excuse to overwrite the name. However, if we truly stick with singletons that are based on the name, I would prefer the name to actually be unique which required all implementations to stop hard coding keys of another expression/dataframe and iterate properly over the __dask_keys__

Copy link
Collaborator

@phofl phofl Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, this can be fixed but that has a rather big caveat. If I have to redefine the name of the expression, I actually also have to rewrite the graph! Many places in dask-expr are (imo incorrectly) assuming that output key names of a dataframe layer/expression are universally built as (df._name, i) and are hard coding this when implementing their own layer (instead of iterating over i, iterating over df.dask_keys() would maintain the abstraction). This rewrite adds effectively another layer of keys. In reality this is really ugly since when computing something on top of a persisted dataframe, there will always be this dummy key in between.

This is a good point, we should fix this instead of relying on df._name and i

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will investigate if it is possible to just use __dask_keys__ everywhere but I'd prefer doing this in a follow up

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes totally agree, this should definitely be a follow up (also doesn't have to be you, I could pick this up as well)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got started on this, see

c3c01ed

That commit replaces all occurrences that match \((\w+(_\w+)?)(?<!self)\._name, but something appears to be still missing. It is possible and not as ugly as I thought it would be

Comment on lines 54 to +60
def _layer(self):
return dict(self.operand("layer"))
dsk = dict(self.operand("layer"))
# The name may not actually match the layers name therefore rewrite this
# using an alias
for part, k in enumerate(self.operand("keys")):
dsk[(self._name, part)] = k
return dsk
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intuitive fix for me would've been to overwrite __dask_keys__ instead to point to the appropriate keys but as I explained above, the implementation of Expr.__dask_keys__ is hard coded in many places.

@fjetter
Copy link
Member Author

fjetter commented Feb 2, 2024

I opened #838 which points the dask/dask tests to dask/dask#10880 such that we can see if CI is happy

@fjetter
Copy link
Member Author

fjetter commented Feb 2, 2024

CI when pointing to dask/dask with the fixes #838 is green

@phofl phofl merged commit 8f99b0b into dask:main Feb 2, 2024
5 of 7 checks passed
@phofl
Copy link
Collaborator

phofl commented Feb 2, 2024

thx @fjetter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Parquet Dataset cache not reliable [Bug] Optimization is now much slower in TPCh benchmarks
4 participants