Skip to content

Commit

Permalink
feat: add support for Delayed in partitionwise layer creation. (#449)
Browse files Browse the repository at this point in the history
  • Loading branch information
douglasdavis authored Jan 12, 2024
1 parent f2dbb80 commit 01777b6
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 4 deletions.
17 changes: 13 additions & 4 deletions src/dask_awkward/lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1830,6 +1830,8 @@ def partitionwise_layer(
elif isinstance(arg, Scalar):
pairs.extend([arg.name, "i"])
numblocks[arg.name] = (1,)
elif isinstance(arg, Delayed):
pairs.extend([arg.key, None])
elif is_dask_collection(arg):
raise DaskAwkwardNotImplemented(
"Use of Array with other Dask collections is currently unsupported."
Expand Down Expand Up @@ -2027,13 +2029,20 @@ def map_partitions(
dependencies=flat_deps,
)

dak_arrays = tuple(filter(lambda x: isinstance(x, Array), flat_deps))
if len(dak_arrays) == 0:
raise TypeError(
"at least one argument passed to map_partitions "
"should be a dask_awkward.Array collection."
)
in_npartitions = dak_arrays[0].npartitions
in_divisions = dak_arrays[0].divisions

if output_divisions is not None:
if output_divisions == 1:
new_divisions = flat_deps[0].divisions
else:
new_divisions = tuple(
map(lambda x: x * output_divisions, flat_deps[0].divisions)
)
new_divisions = tuple(map(lambda x: x * output_divisions, in_divisions))
return new_array_object(
hlg,
name=name,
Expand All @@ -2045,7 +2054,7 @@ def map_partitions(
hlg,
name=name,
meta=meta,
npartitions=flat_deps[0].npartitions,
npartitions=in_npartitions,
)


Expand Down
52 changes: 52 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
import fsspec
import numpy as np
import pytest
from dask.delayed import delayed

import dask_awkward as dak
from dask_awkward.lib.core import (
Record,
Scalar,
calculate_known_divisions,
compute_typetracer,
empty_typetracer,
is_typetracer,
map_partitions,
meta_or_identity,
new_array_object,
new_known_scalar,
Expand Down Expand Up @@ -886,3 +889,52 @@ def test_shape_only_ops(fn: Callable, tmp_path_factory: pytest.TempPathFactory)
result = fn(lazy.b) # type: ignore
with dask.config.set({"awkward.optimization.enabled": True}):
result.compute()


@delayed
def a_delayed_array():
return ak.Array([2, 4])


def test_partitionwise_op_with_delayed():
array = ak.Array([[1, 2, 3], [4], [5, 6, 7], [8]])
dak_array = dak.from_awkward(array, npartitions=2)
result = map_partitions(
operator.mul,
dak_array,
a_delayed_array(),
meta=dak_array._meta,
output_divisions=1,
)
concrete_result = ak.concatenate(
[
array[:2] * a_delayed_array().compute(),
array[2:] * a_delayed_array().compute(),
],
)
assert_eq(result, concrete_result)

result = map_partitions(
operator.mul,
a_delayed_array(),
dak_array,
meta=dak_array._meta,
)
assert_eq(result, concrete_result)


def multiply(a, b, c):
return a * b * c


def test_map_partitions_bad_arguments():
array1 = ak.Array([[1, 2, 3], [4], [5, 6, 7], [8]])
array2 = ak.Array([4, 5, 6, 7])
with pytest.raises(TypeError, match="at least one"):
map_partitions(
multiply,
a_delayed_array(),
array1,
array2,
meta=empty_typetracer(),
)

0 comments on commit 01777b6

Please sign in to comment.