Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataFrame subclass type lost when assigning with unknown divisions #1180

Open
TomAugspurger opened this issue Dec 19, 2024 · 6 comments
Open

Comments

@TomAugspurger
Copy link
Member

Describe the issue:

When assigning a new column to a dask object, it seems like the concrete subtype (e.g. geopandas.GeoDataFrame) is lost.

Minimal Complete Verifiable Example:

import dask.array
import dask.dataframe
import dask_geopandas
import geopandas
import pandas as pd

df = geopandas.GeoDataFrame({"geometry": geopandas.points_from_xy([0, 0], [0, 1])})
ddf = dask_geopandas.from_geopandas(df, npartitions=2)
ddf = ddf.clear_divisions()  # this is important

b = dask.dataframe.from_dask_array(dask.array.zeros((2,), chunks=(1, 1)), index=ddf.index)
ddf.assign(a=b).geometry.x.compute()  ## error

that raises

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
[/var/folders/x7/__bs9yvx21qbvzb17sj4qsh40000gn/T/ipykernel_95282/3433075730.py](http://127.0.0.1:8888/var/folders/x7/__bs9yvx21qbvzb17sj4qsh40000gn/T/ipykernel_95282/3433075730.py) in ?()
      8 ddf = dask_geopandas.from_geopandas(df, npartitions=2)
      9 ddf = ddf.clear_divisions()  # this is important
     10 
     11 b = dask.dataframe.from_dask_array(dask.array.zeros((2,), chunks=(1, 1)), index=ddf.index)
---> 12 ddf.assign(a=b).geometry.x.compute()  ## error

[~/gh/TomAugspurger/dask-geopandas-spatial-partitioning/.direnv/python-3.12/lib/python3.12/site-packages/dask_expr/_collection.py](http://127.0.0.1:8888/lab/tree/~/gh/TomAugspurger/dask-geopandas-spatial-partitioning/.direnv/python-3.12/lib/python3.12/site-packages/dask_expr/_collection.py) in ?(self, fuse, concatenate, **kwargs)
    476         out = self
    477         if not isinstance(out, Scalar) and concatenate:
    478             out = out.repartition(npartitions=1)
    479         out = out.optimize(fuse=fuse)
--> 480         return DaskMethodsMixin.compute(out, **kwargs)

[~/gh/TomAugspurger/dask-geopandas-spatial-partitioning/.direnv/python-3.12/lib/python3.12/site-packages/dask/base.py](http://127.0.0.1:8888/lab/tree/~/gh/TomAugspurger/dask-geopandas-spatial-partitioning/.direnv/python-3.12/lib/python3.12/site-packages/dask/base.py) in ?(self, **kwargs)
    368         See Also
    369         --------
    370         dask.compute
    371         """
--> 372         (result,) = compute(self, traverse=False, **kwargs)
    373         return result

[~/gh/TomAugspurger/dask-geopandas-spatial-partitioning/.direnv/python-3.12/lib/python3.12/site-packages/dask/base.py](http://127.0.0.1:8888/lab/tree/~/gh/TomAugspurger/dask-geopandas-spatial-partitioning/.direnv/python-3.12/lib/python3.12/site-packages/dask/base.py) in ?(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    656         keys.append(x.__dask_keys__())
    657         postcomputes.append(x.__dask_postcompute__())
    658 
    659     with shorten_traceback():
--> 660         results = schedule(dsk, keys, **kwargs)
    661 
    662     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

[~/gh/TomAugspurger/dask-geopandas-spatial-partitioning/.direnv/python-3.12/lib/python3.12/site-packages/pandas/core/generic.py](http://127.0.0.1:8888/lab/tree/~/gh/TomAugspurger/dask-geopandas-spatial-partitioning/.direnv/python-3.12/lib/python3.12/site-packages/pandas/core/generic.py) in ?(self, name)
   6295             and name not in self._accessors
   6296             and self._info_axis._can_hold_identifiers_and_holds_name(name)
   6297         ):
   6298             return self[name]
-> 6299         return object.__getattribute__(self, name)

AttributeError: 'Series' object has no attribute 'x'

geopandas.GeoSeries objects automatically add .x and .y to the geometry columns. We're getting a regular pandas.Series, causing the error.

Anything else we need to know?:

Having unknown divisions does seem to be necessary. Commenting out the def = ddf.clear_divisions() line makes the error go away. So I think we can maybe narrow the search to AssignAlign (and not Assign)

Environment:

  • Dask version: 2024.12.0

  • dask-expr from main @ d7577a2

  • Python version:

  • Operating System:

  • Install method (conda, pip, source):

@phofl
Copy link
Collaborator

phofl commented Dec 19, 2024

Thanks for the report!

The Align case triggers a shuffle under the hood which is responsible for losing the dtype.

More specific reproducer:

import dask.array as da

import dask.dataframe
import dask_geopandas
import geopandas

df = geopandas.GeoDataFrame({"geometry": geopandas.points_from_xy([0, 0], [0, 1])})
ddf = dask_geopandas.from_geopandas(df, npartitions=2)
ddf = ddf.clear_divisions()  # this is important

b = dask.dataframe.from_dask_array(dask.array.zeros((2,), chunks=(1, 1)), index=ddf.index)
print(type(ddf.shuffle(on=ddf.columns[0]).compute()))

This is using disk-based shuffling, so not even p2p related.

Do you know if this used to work with the legacy backend? Turning query planning off gives back a pandas dataframe as well

@TomAugspurger
Copy link
Member Author

TomAugspurger commented Dec 19, 2024

The Align case triggers a shuffle under the hood which is responsible for losing the dtype.

Ah, then this might be the same root cause as #1024.

Do you know if this used to work with the legacy backend?

It seems to:

In [1]: import dask.config
   ...:
   ...: dask.config.set(**{"dataframe.query-planning": False})
   ...:
   ...: import dask.array
   ...: import dask.dataframe
   ...: import dask_geopandas
   ...: import geopandas
   ...: import pandas as pd
   ...:
   ...: df = geopandas.GeoDataFrame({"geometry": geopandas.points_from_xy([0, 0], [0, 1])})
   ...: ddf = dask_geopandas.from_geopandas(df, npartitions=2)
   ...: ddf = ddf.clear_divisions()  # this is important
   ...:
   ...: ddf.assign(a=1).geometry.x.compute()  # OK
   ...:
   ...: ddf.assign(a=dask.dataframe.from_dask_array(dask.array.zeros((2,), chunks=(1, 1)), index=ddf.index)).geometry.x.compute()  ## error
   ...:

which gives

/Users/tom/gh/TomAugspurger/dask-geopandas-spatial-partitioning/.direnv/python-3.12/lib/python3.12/site-packages/dask/dataframe/__init__.py:31: FutureWarning: The legacy Dask DataFrame implementation is deprecated and will be removed in a future version. Set the configuration option `dataframe.query-planning` to `True` or None to enable the new Dask Dataframe implementation and silence this warning.
  warnings.warn(
Out[1]:
0    0.0
1    0.0
dtype: float64

@phofl
Copy link
Collaborator

phofl commented Dec 19, 2024

Ah no, that is not equivalent. The old implementation just assumes that the indices match if divisions are equal (even if they are all None), so there wasn't. shuffle triggered but you could get incorrect results with unknow divisions but different indexes.

You'd have to check

ddf.shuffle(on=ddf.columns[0]).compute()

@TomAugspurger
Copy link
Member Author

Gotcha, that makes sense. In this case I happen to know that the indexes are identical so it would be safe to assume that the all-None divisions are OK, but I see why doing a shuffle would be needed in the general case.

Doing the shuffle does indeed lose the subclass, with or without query planning.

@phofl
Copy link
Collaborator

phofl commented Dec 19, 2024

Yeah the assign case is interesting. We are generally checking if both dataframes originate from the same root and have the same index to avoid the shuffle. Your DataFrame clearly has a different root than the assign series, but we could definitely only check the index here. That is enough information.

The from Dask array cases is an edge case though (I think), generally the index will be different probably

@phofl
Copy link
Collaborator

phofl commented Dec 19, 2024

Something like:

import dask_geopandas
import geopandas

df = geopandas.GeoDataFrame({"geometry": geopandas.points_from_xy([0, 0], [0, 1])})
ddf = dask_geopandas.from_geopandas(df, npartitions=2)
ddf = ddf.clear_divisions()  # this is important

ddf = ddf.assign(a=1)
ddf.assign(b=ddf.a).geometry.x.compute()  ## error

We are looking at the expression structure and can identify that left and right both have a compatible index, falling back to a trivial blockwise op.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants