Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: One-pass column optimization #491

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b18b3af
start
martindurant Mar 28, 2024
3cdc778
Remove unused
martindurant Mar 28, 2024
c57693c
Pass reports around
martindurant Apr 4, 2024
0c367df
remember to commit
martindurant Apr 5, 2024
9c654b1
first working (for parquet)
martindurant Apr 9, 2024
e6c5ce3
Merge branch 'main' into one-pass
martindurant Apr 11, 2024
f56f77a
stop
martindurant Apr 12, 2024
75b4416
most pass
martindurant Apr 15, 2024
a5c319d
fix most
martindurant Apr 18, 2024
243fbc1
probably better
martindurant Apr 19, 2024
6eebf87
Reinstate necessary_columns (needs doc)
martindurant Apr 22, 2024
8bbe409
Merge branch 'main' into one-pass
martindurant Apr 22, 2024
43b2e43
pass buffer names around, not columns
martindurant Apr 25, 2024
e5828fb
Clear cache between tests
martindurant May 13, 2024
df0cf2f
Merge branch 'main' into one-pass
martindurant May 13, 2024
b593f87
Another one squashed
martindurant May 21, 2024
e410b61
Squash errors that only show when uproot.dask and hist.dask are insta…
martindurant May 22, 2024
cd537e2
fix uproot
martindurant May 23, 2024
baf6f46
fix report
martindurant May 27, 2024
e29e929
if meta fails
martindurant Jun 6, 2024
f61fdd7
rev
martindurant Jul 23, 2024
2c3abd0
concat enforce condition
martindurant Jul 24, 2024
04abbc8
temp
martindurant Jul 29, 2024
d876f00
squached some
martindurant Jul 30, 2024
8e1d507
add note
martindurant Jul 30, 2024
d1922ab
Merge branch 'main' into one-pass
martindurant Jul 30, 2024
fc9589b
Fix concat form comparison
martindurant Jul 31, 2024
961dd0c
one more squashed
martindurant Jul 31, 2024
c8b254b
fix IO report
martindurant Aug 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 9 additions & 176 deletions src/dask_awkward/layers/layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@ def from_blockwise(cls, layer: Blockwise) -> AwkwardBlockwiseLayer:
ob.__dict__.update(layer.__dict__)
return ob

def mock(self) -> AwkwardBlockwiseLayer:
layer = copy.copy(self)
nb = layer.numblocks
layer.numblocks = {k: tuple(1 for _ in v) for k, v in nb.items()}
layer.__dict__.pop("_dims", None)
return layer

def __getstate__(self) -> dict:
# Indicator that this layer has been serialised
state = self.__dict__.copy()
Expand All @@ -54,10 +47,6 @@ def __call__(self, *args, **kwargs): ...
T = TypeVar("T")


class ImplementsMocking(ImplementsIOFunction, Protocol):
def mock(self) -> AwkwardArray: ...


class ImplementsMockEmpty(ImplementsIOFunction, Protocol):
def mock_empty(self, backend: BackendT) -> AwkwardArray: ...

Expand All @@ -67,10 +56,8 @@ class ImplementsReport(ImplementsIOFunction, Protocol):
def return_report(self) -> bool: ...


class ImplementsProjection(ImplementsMocking, Protocol[T]):
def prepare_for_projection(self) -> tuple[AwkwardArray, TypeTracerReport, T]: ...

def project(self, report: TypeTracerReport, state: T) -> ImplementsIOFunction: ...
class ImplementsProjection(Protocol[T]):
def project(self, columns: list[str]) -> ImplementsIOFunction: ...


class ImplementsNecessaryColumns(ImplementsProjection[T], Protocol):
Expand All @@ -79,7 +66,7 @@ def necessary_columns(
) -> frozenset[str]: ...


class IOFunctionWithMocking(ImplementsMocking, ImplementsIOFunction):
class IOFunctionWithMocking(ImplementsIOFunction):
def __init__(self, meta: AwkwardArray, io_func: ImplementsIOFunction):
self._meta = meta
self._io_func = io_func
Expand All @@ -92,21 +79,9 @@ def __getstate__(self) -> dict:
def __call__(self, *args, **kwargs):
return self._io_func(*args, **kwargs)

def mock(self) -> AwkwardArray:
assert self._meta is not None
return self._meta


def io_func_implements_projection(func: ImplementsIOFunction) -> bool:
return hasattr(func, "prepare_for_projection")


def io_func_implements_mocking(func: ImplementsIOFunction) -> bool:
return hasattr(func, "mock")


def io_func_implements_mock_empty(func: ImplementsIOFunction) -> bool:
return hasattr(func, "mock_empty")
return hasattr(func, "project")


def io_func_implements_columnar(func: ImplementsIOFunction) -> bool:
Expand Down Expand Up @@ -179,87 +154,14 @@ def is_projectable(self) -> bool:
io_func_implements_projection(self.io_func) and not self.has_been_unpickled
)

@property
def is_mockable(self) -> bool:
# isinstance(self.io_func, ImplementsMocking)
return io_func_implements_mocking(self.io_func)

@property
def is_columnar(self) -> bool:
return io_func_implements_columnar(self.io_func)

def mock(self) -> AwkwardInputLayer:
assert self.is_mockable
return AwkwardInputLayer(
name=self.name,
inputs=[None][: int(list(self.numblocks.values())[0][0])],
io_func=lambda *_, **__: cast(ImplementsMocking, self.io_func).mock(),
label=self.label,
produces_tasks=self.produces_tasks,
creation_info=self.creation_info,
annotations=self.annotations,
)

def prepare_for_projection(self) -> tuple[AwkwardInputLayer, TypeTracerReport, T]:
"""Mock the input layer as starting with a data-less typetracer.
This method is used to create new dask task graphs that
operate purely on typetracer Arrays (that is, array with
awkward structure but without real data buffers). This allows
us to test which parts of a real awkward array will be used in
a real computation. We do this by running a graph which starts
with mocked AwkwardInputLayers.

We mock an AwkwardInputLayer in these steps:
1. Ask the IO function to prepare a new meta array, and return
any transient state.
2. Build a new AwkwardInputLayer whose IO function just returns
this meta (typetracer) array
3. Return the new input layer and the transient state

When this new layer is added to a dask task graph and that
graph is computed, the report object will be mutated.
Inspecting the report object after the compute tells us which
buffers from the original form would be required for a real
compute with the same graph.
Returns
-------
AwkwardInputLayer
Copy of the input layer with data-less input.
TypeTracerReport
The report object used to track touched buffers.
Any
The black-box state object returned by the IO function.
"""
assert self.is_projectable
new_meta_array, report, state = cast(
ImplementsProjection, self.io_func
).prepare_for_projection()

new_return = new_meta_array
if io_func_implements_report(self.io_func):
if cast(ImplementsReport, self.io_func).return_report:
new_return = (new_meta_array, type(new_meta_array)([]))

new_input_layer = AwkwardInputLayer(
name=self.name,
inputs=[None][: int(list(self.numblocks.values())[0][0])],
io_func=AwkwardTokenizable(new_return, self.name),
label=self.label,
produces_tasks=self.produces_tasks,
creation_info=self.creation_info,
annotations=self.annotations,
)
return new_input_layer, report, state

def project(
self,
report: TypeTracerReport,
state: T,
) -> AwkwardInputLayer:
def project(self, columns: list[str]) -> AwkwardInputLayer:
assert self.is_projectable
io_func = cast(ImplementsProjection, self.io_func).project(
report=report, state=state
)
breakpoint()
io_func = self.io_func.project(columns)
return AwkwardInputLayer(
name=self.name,
inputs=self.inputs,
Expand All @@ -270,12 +172,6 @@ def project(
annotations=self.annotations,
)

def necessary_columns(self, report: TypeTracerReport, state: T) -> frozenset[str]:
assert self.is_columnar
return cast(ImplementsNecessaryColumns, self.io_func).necessary_columns(
report=report, state=state
)


class AwkwardMaterializedLayer(MaterializedLayer):
def __init__(
Expand All @@ -290,68 +186,5 @@ def __init__(
self.fn = fn
super().__init__(mapping, **kwargs)

def mock(self) -> MaterializedLayer:
mapping = copy.copy(self.mapping)
if not mapping:
# no partitions at all
return self
name = next(iter(mapping))[0]

npln = len(self.previous_layer_names)
# one previous layer name
#
# this case is used for mocking repartition or slicing where
# we maybe have multiple partitions that need to be included
# in a task.
if npln == 1:
prev_name: str = self.previous_layer_names[0]
if (name, 0) in mapping:
task = mapping[(name, 0)]
task = tuple(
(
(prev_name, 0)
if isinstance(v, tuple) and len(v) == 2 and v[0] == prev_name
else v
)
for v in task
)

# when using Array.partitions we need to mock that we
# just want the first partition.
if len(task) == 2 and isinstance(task[1], int) and task[1] > 0:
task = (task[0], 0)
return MaterializedLayer({(name, 0): task})
return self

# zero previous layers; this is likely a known scalar.
#
# we just use the existing mapping
elif npln == 0:
return MaterializedLayer({(name, 0): mapping[(name, 0)]})

# more than one previous_layer_names
#
# this case is needed for dak.concatenate on axis=0; we need
# the first partition of _each_ of the previous layer names!
else:
if self.fn is None:
raise ValueError(
"For multiple previous layers the fn argument cannot be None."
)
name0s = tuple((name, 0) for name in self.previous_layer_names)
task = (self.fn, *name0s)
return MaterializedLayer({(name, 0): task})


class AwkwardTreeReductionLayer(DataFrameTreeReduction):
def mock(self) -> AwkwardTreeReductionLayer:
return AwkwardTreeReductionLayer(
name=self.name,
name_input=self.name_input,
npartitions_input=1,
concat_func=self.concat_func,
tree_node_func=self.tree_node_func,
finalize_func=self.finalize_func,
split_every=self.split_every,
tree_node_name=self.tree_node_name,
)

class AwkwardTreeReductionLayer(DataFrameTreeReduction): ...
39 changes: 31 additions & 8 deletions src/dask_awkward/lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,10 @@ def _rebuild(self, dsk, *, rename=None):
def __reduce__(self):
return (Scalar, (self.dask, self.name, None, self.dtype, self.known_value))

@property
def report(self):
return getattr(self._meta, "_report", set())

@property
def dask(self) -> HighLevelGraph:
return self._dask
Expand All @@ -398,6 +402,10 @@ def name(self) -> str:
def key(self) -> Key:
return (self._name, 0)

@property
def report(self):
return getattr(self._meta, "_report", set())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we lift _report from the meta to the Layer? I know that it's more state to keep in sync, but I don't love the act of tagging non-reserved attributes onto an Awkward Array, and this will better separate the stateful part of typetracer from the stateless part.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpivarski I wonder whether we should add a new merge function to TypeTracerReport. Due to the uniqueness of layer IDs, and the existing association of reports with layouts, we can replace the need to carry a set of reports with a single report via

merge(A, B) -> A

I haven't yet thought about the memory implication for effectively having "larger" reports, but this might be a nicer UX.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These Awkward Arrays are not accessible to users ("_meta" has an underscore); they're internal, bookkeeping objects. There's a similar use of temporary attributes in conversions from Arrow:

https://github.com/scikit-hep/awkward/blob/727ff9ca4759aed52e0b4f21d3daa66724582c61/src/awkward/_connect/pyarrow/conversions.py#L69-L71

and

https://github.com/scikit-hep/awkward/blob/727ff9ca4759aed52e0b4f21d3daa66724582c61/src/awkward/operations/ak_from_arrow.py#L75-L79

We're using the convenience of Python objects as dicts to handle a bookkeeping problem that would get more complicated if we were to try to do it in a statically typable way (although we could also make these "backdoors" into nullable, private attributes that are unused in almost all other circumstances).

An attempt to do this a bit more by-the-book with Indexes unfortunately made too much noise. The Index class has an arbitrary metadata that can be used to stash whatever we need:

https://github.com/scikit-hep/awkward/blob/727ff9ca4759aed52e0b4f21d3daa66724582c61/src/awkward/index.py#L43-L52

although it was only ever used for one thing: associating a multidimensional integer array slice with an Index, which is one-dimensional.

https://github.com/scikit-hep/awkward/blob/727ff9ca4759aed52e0b4f21d3daa66724582c61/src/awkward/_slicing.py#L143-L145

That's pretty obscure, but Index now has this mechanism everywhere, and it must make people wonder why it's there.

A hacky solution, like attaching temporary metadata to a Python object as though it were a dict, can prevent an implementation detail about a corner case from getting "noisy," alerting all parts of the code about its existence. If the hacky solution is implemented incorrectly and the hack spreads, then we'd be wishing we had that noise to help understand what's going wrong, but if it's implemented correctly and no one finds out what happens in Vegas, making it formal would obfuscate the normal case with an outlier.

So I'd be in favor of leaving in the _report attribute handling, especially if something ensures that it never leaves the graph-optimization phase. The example above with a __pyarrow_original attribute (which includes its context in its name, by the way) has an explicit, recursive procedure to ensure that no layouts escape ak.from_arrow with that attribute intact. I don't know if there's a similar guarantee for _report.


def _check_meta(self, m):
if isinstance(m, MaybeNone):
return ak.Array(m.content)
Expand Down Expand Up @@ -713,6 +721,8 @@ def _check_meta(self, m: Any | None) -> Any | None:
def __getitem__(self, where):
token = tokenize(self, where)
new_name = f"{where}-{token}"
report = self.report
[_.commit(new_name) for _ in report]
new_meta = self._meta[where]

# first check for array type return
Expand All @@ -723,6 +733,7 @@ def __getitem__(self, where):
graphlayer,
dependencies=[self],
)
new_meta._report = report
return new_array_object(hlg, new_name, meta=new_meta, npartitions=1)

# then check for scalar (or record) type
Expand All @@ -733,6 +744,7 @@ def __getitem__(self, where):
dependencies=[self],
)
if isinstance(new_meta, ak.Record):
new_meta._report = report
return new_record_object(hlg, new_name, meta=new_meta)
else:
return new_scalar_object(hlg, new_name, meta=new_meta)
Expand Down Expand Up @@ -806,7 +818,7 @@ def new_record_object(dsk: HighLevelGraph, name: str, *, meta: Any) -> Record:
raise TypeError(
f"meta Record must have a typetracer backend, not {ak.backend(meta)}"
)
return Record(dsk, name, meta)
return out


def _is_numpy_or_cupy_like(arr: Any) -> bool:
Expand Down Expand Up @@ -937,6 +949,10 @@ def reset_meta(self) -> None:
"""Assign an empty typetracer array as the collection metadata."""
self._meta = empty_typetracer()

@property
def report(self):
return getattr(self._meta, "_report", set())

def repartition(
self,
npartitions: int | None = None,
Expand Down Expand Up @@ -972,6 +988,7 @@ def repartition(
new_graph = HighLevelGraph.from_collections(
key, new_layer, dependencies=(self,)
)
[_.commit(key) for _ in self.report]
return new_array_object(
new_graph,
key,
Expand Down Expand Up @@ -1173,7 +1190,7 @@ def _partitions(self, index: Any) -> Array:
# otherwise nullify the known divisions
else:
new_divisions = (None,) * (len(new_keys) + 1) # type: ignore

[_.commit(name) for _ in self.report]
return new_array_object(
graph, name, meta=self._meta, divisions=tuple(new_divisions)
)
Expand Down Expand Up @@ -1395,6 +1412,7 @@ def _getitem_slice_on_zero(self, where):
AwkwardMaterializedLayer(dask, previous_layer_names=[self.name]),
dependencies=[self],
)
[_.commit(name) for _ in self.report]
return new_array_object(
hlg,
name,
Expand Down Expand Up @@ -1943,6 +1961,15 @@ def _map_partitions(
if meta is None:
meta = map_meta(fn, *args, **kwargs)

reps = set()
for dep in to_meta(deps):
rep = getattr(dep, "_report", None)
if rep:
[_.commit(name) for _ in rep]
[reps.add(_) for _ in rep]

meta._report = reps

hlg = HighLevelGraph.from_collections(
name,
lay,
Expand All @@ -1964,7 +1991,6 @@ def _map_partitions(
new_divisions = tuple(map(lambda x: x * output_divisions, in_divisions))
else:
new_divisions = in_divisions

if output_divisions is not None:
return new_array_object(
hlg,
Expand Down Expand Up @@ -2195,10 +2221,6 @@ def non_trivial_reduction(
if combiner is None:
combiner = reducer

# is_positional == True is not implemented
# if is_positional:
# assert combiner is reducer

# For `axis=None`, we prepare each array to have the following structure:
# [[[ ... [x1 x2 x3 ... xN] ... ]]] (length-1 outer lists)
# This makes the subsequent reductions an `axis=-1` reduction
Expand Down Expand Up @@ -2273,14 +2295,15 @@ def non_trivial_reduction(
)

graph = HighLevelGraph.from_collections(name_finalize, trl, dependencies=(chunked,))

[_.commit(name_finalize) for _ in array.report]
meta = reducer(
array._meta,
axis=axis,
keepdims=keepdims,
mask_identity=mask_identity,
)
if isinstance(meta, ak.highlevel.Array):
meta._report = array.report
return new_array_object(graph, name_finalize, meta=meta, npartitions=1)
else:
return new_scalar_object(graph, name_finalize, meta=meta)
Expand Down
Loading
Loading