Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: One-pass column optimization #491

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b18b3af
start
martindurant Mar 28, 2024
3cdc778
Remove unused
martindurant Mar 28, 2024
c57693c
Pass reports around
martindurant Apr 4, 2024
0c367df
remember to commit
martindurant Apr 5, 2024
9c654b1
first working (for parquet)
martindurant Apr 9, 2024
e6c5ce3
Merge branch 'main' into one-pass
martindurant Apr 11, 2024
f56f77a
stop
martindurant Apr 12, 2024
75b4416
most pass
martindurant Apr 15, 2024
a5c319d
fix most
martindurant Apr 18, 2024
243fbc1
probably better
martindurant Apr 19, 2024
6eebf87
Reinstate necessary_columns (needs doc)
martindurant Apr 22, 2024
8bbe409
Merge branch 'main' into one-pass
martindurant Apr 22, 2024
43b2e43
pass buffer names around, not columns
martindurant Apr 25, 2024
e5828fb
Clear cache between tests
martindurant May 13, 2024
df0cf2f
Merge branch 'main' into one-pass
martindurant May 13, 2024
b593f87
Another one squashed
martindurant May 21, 2024
e410b61
Squash errors that only show when uproot.dask and hist.dask are insta…
martindurant May 22, 2024
cd537e2
fix uproot
martindurant May 23, 2024
baf6f46
fix report
martindurant May 27, 2024
e29e929
if meta fails
martindurant Jun 6, 2024
f61fdd7
rev
martindurant Jul 23, 2024
2c3abd0
concat enforce condition
martindurant Jul 24, 2024
04abbc8
temp
martindurant Jul 29, 2024
d876f00
squached some
martindurant Jul 30, 2024
8e1d507
add note
martindurant Jul 30, 2024
d1922ab
Merge branch 'main' into one-pass
martindurant Jul 30, 2024
fc9589b
Fix concat form comparison
martindurant Jul 31, 2024
961dd0c
one more squashed
martindurant Jul 31, 2024
c8b254b
fix IO report
martindurant Aug 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/dask_awkward/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
)
from dask_awkward.lib.describe import fields
from dask_awkward.lib.inspect import sample

necessary_columns = None # TODO

from dask_awkward.lib.io.io import (
from_awkward,
from_dask_array,
Expand All @@ -38,6 +35,7 @@
from dask_awkward.lib.io.parquet import from_parquet, to_parquet
from dask_awkward.lib.io.text import from_text
from dask_awkward.lib.operations import concatenate
from dask_awkward.lib.optimize import necessary_columns
from dask_awkward.lib.reducers import (
all,
any,
Expand Down
1 change: 1 addition & 0 deletions src/dask_awkward/lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from dask_awkward.lib.io.parquet import from_parquet, to_parquet
from dask_awkward.lib.io.text import from_text
from dask_awkward.lib.operations import concatenate
from dask_awkward.lib.optimize import necessary_columns
from dask_awkward.lib.reducers import (
all,
any,
Expand Down
2 changes: 1 addition & 1 deletion src/dask_awkward/lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,8 +719,8 @@ def __getitem__(self, where):
token = tokenize(self, where)
new_name = f"{where}-{token}"
report = self.report
[_.commit(new_name) for _ in report]
new_meta = self._meta[where]
[_.commit(new_name) for _ in report]

# first check for array type return
if isinstance(new_meta, ak.Array):
Expand Down
67 changes: 48 additions & 19 deletions src/dask_awkward/lib/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,30 @@ def optimize_columns(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelGraph
all_reps.update(getattr(dsk.layers[ln].meta, "_report", ()))
name = tokenize("output", lays)
[_.commit(name) for _ in all_reps]
all_layers = tuple(dsk.layers) + (name,)

for k, lay, cols in _optimize_columns(dsk.layers, all_layers):
new_lay = lay.project(cols)
dsk2[k] = new_lay

return HighLevelGraph(dsk2, dsk.dependencies)


def _buf_to_col(s):
return (
s[2:]
.replace(".content", "")
.replace("-offsets", "")
.replace("-data", "")
.replace("-index", "")
.replace("-mask", "")
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have time right now to look at the full PR here, but is this assumption about buffer name mappings enforced anywhere? Right now the design is such that input sources can use a non-canonical buffer naming scheme.

For coffea I think we made this possible so that Coffea could directly pass in their own buffer names with semantic meaning.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have access to the call to ak.from_buffers here? That function takes a callable that constructs the buffer names from the form_keys and these roles.

The roles are fixed words like this, and Coffea only has the power to change the form_key, not the role. This is almost the complete set of such names; only -tags (for UnionArray) is missing.



def _optimize_columns(dsk, all_layers):

# this loop is necessary_columns
all_layers = tuple(dsk.layers) + (name,)
for k, lay in dsk.layers.items():
for k, lay in dsk.copy().items():
if not isinstance(lay, AwkwardInputLayer) or not hasattr(
lay.io_func, "_column_report"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just like with _report, I'd like to formalise this as part of the API. This would become a part of the column-projection interface.

):
Expand All @@ -125,29 +145,38 @@ def optimize_columns(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelGraph
for ln in all_layers:
# this loop not required after next ak release
try:
cols |= set(rep.data_touched_in((ln,)))
for col in rep.shape_touched_in((ln,)):
if col in cols or any(_.startswith(col) for _ in cols):
# loopy loop?
cols |= {_buf_to_col(s) for s in rep.data_touched_in((ln,))}
for col in (_buf_to_col(s) for s in rep.shape_touched_in((ln,))):
if col in cols:
continue
if any(_.startswith(col) for _ in cols):
continue
col2 = (
col[2:]
.replace(".content", "")
.replace("-offsets", "")
.replace("-data", "")
.replace("-index", "")
.replace("-mask", "")
)
ll = list(_ for _ in all_cols if _.startswith(col2))
ll = list(_ for _ in all_cols if _.startswith(col))
if ll:
cols.add("@." + ll[0])
cols.add(ll[0])

except KeyError:
pass
new_lay = lay.project([c.replace("@.", "") for c in cols])
dsk2[k] = new_lay
yield k, lay, cols

return HighLevelGraph(dsk2, dsk.dependencies)

def necessary_columns(*args):
dsk = {}
all_reps = set()
all_layers = set()
for arg in args:
dsk.update(arg.dask.layers)
all_layers.add(arg.name)
touch_data(arg._meta)
all_reps.update(getattr(arg.dask.layers[arg.name].meta, "_report", ()))
name = tokenize("output", args)
[_.commit(name) for _ in all_reps]
all_layers = tuple(all_layers) + (name,)

out = {}
for k, _, cols in _optimize_columns(dsk, all_layers):
out[k] = cols
return out


def rewrite_layer_chains(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelGraph:
Expand Down