From 7c9ef448d6438297fb60efa4766943c02e1c9ede Mon Sep 17 00:00:00 2001 From: Angus Hollands Date: Sat, 2 Dec 2023 16:19:02 +0000 Subject: [PATCH 01/10] fix: support multiple keys --- src/dask_awkward/lib/inspect.py | 11 ++++++----- src/dask_awkward/lib/optimize.py | 31 ++++++++++--------------------- 2 files changed, 16 insertions(+), 26 deletions(-) diff --git a/src/dask_awkward/lib/inspect.py b/src/dask_awkward/lib/inspect.py index e5f62602..860fcaae 100644 --- a/src/dask_awkward/lib/inspect.py +++ b/src/dask_awkward/lib/inspect.py @@ -4,7 +4,6 @@ import numpy as np from dask.base import unpack_collections -from dask.highlevelgraph import HighLevelGraph from dask_awkward.layers import AwkwardInputLayer @@ -81,8 +80,9 @@ def report_necessary_buffers( name_to_necessary_buffers: dict[str, NecessaryBuffers | None] = {} for obj in collections: - dsk = obj if isinstance(obj, HighLevelGraph) else obj.dask - projection_data = o._prepare_buffer_projection(dsk) + dsk = obj.__dask_graph__() + keys = obj.__dask_keys__() + projection_data = o._prepare_buffer_projection(dsk, keys) # If the projection failed, or there are no input layers if projection_data is None: @@ -178,8 +178,9 @@ def report_necessary_columns( name_to_necessary_columns: dict[str, frozenset | None] = {} for obj in collections: - dsk = obj if isinstance(obj, HighLevelGraph) else obj.dask - projection_data = o._prepare_buffer_projection(dsk) + dsk = obj.__dask_graph__() + keys = obj.__dask_keys__() + projection_data = o._prepare_buffer_projection(dsk, keys) # If the projection failed, or there are no input layers if projection_data is None: diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index 2f971873..37b38d41 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -3,7 +3,7 @@ import copy import logging import warnings -from collections.abc import Hashable, Iterable, Mapping +from collections.abc import Hashable, Iterable, Mapping, Sequence from typing import TYPE_CHECKING, Any, cast import dask.config @@ -17,6 +17,7 @@ if TYPE_CHECKING: from awkward._nplikes.typetracer import TypeTracerReport + from dask.typing import Key log = logging.getLogger(__name__) @@ -65,7 +66,7 @@ def all_optimizations( def optimize( dsk: HighLevelGraph, - keys: Hashable | list[Hashable] | set[Hashable], + keys: Sequence[Key], **_: Any, ) -> Mapping: """Run optimizations specific to dask-awkward. @@ -77,7 +78,7 @@ def optimize( if dask.config.get("awkward.optimization.enabled"): which = dask.config.get("awkward.optimization.which") if "columns" in which: - dsk = optimize_columns(dsk) + dsk = optimize_columns(dsk, keys) if "layer-chains" in which: dsk = rewrite_layer_chains(dsk, keys) @@ -85,7 +86,7 @@ def optimize( def _prepare_buffer_projection( - dsk: HighLevelGraph, + dsk: HighLevelGraph, keys: Sequence[Key] ) -> tuple[dict[str, TypeTracerReport], dict[str, Any]] | None: """Pair layer names with lists of necessary columns.""" import awkward as ak @@ -117,18 +118,6 @@ def _prepare_buffer_projection( hlg = HighLevelGraph(projection_layers, dsk.dependencies) - # this loop builds up what are the possible final leaf nodes by - # inspecting the dependents dictionary. If something does not have - # a dependent, it must be the end of a graph. These are the things - # we need to compute for; we only use a single partition (the - # first). for a single collection `.compute()` this list will just - # be length 1; but if we are using `dask.compute` to pass in - # multiple collections to be computed simultaneously, this list - # will increase in length. - leaf_layers_keys = [ - (k, 0) for k, v in dsk.dependents.items() if isinstance(v, set) and len(v) == 0 - ] - # now we try to compute for each possible output layer key (leaf # node on partition 0); this will cause the typetacer reports to # get correct fields/columns touched. If the result is a record or @@ -136,7 +125,7 @@ def _prepare_buffer_projection( try: for layer in hlg.layers.values(): layer.__dict__.pop("_cached_dict", None) - results = get_sync(hlg, leaf_layers_keys) + results = get_sync(hlg, list(keys)) for out in results: if isinstance(out, (ak.Array, ak.Record)): touch_data(out) @@ -163,7 +152,7 @@ def _prepare_buffer_projection( return layer_to_reports, layer_to_projection_state -def optimize_columns(dsk: HighLevelGraph) -> HighLevelGraph: +def optimize_columns(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelGraph: """Run column projection optimization. This optimization determines which columns from an @@ -192,7 +181,7 @@ def optimize_columns(dsk: HighLevelGraph) -> HighLevelGraph: New, optimized task graph with column-projected ``AwkwardInputLayer``. """ - projection_data = _prepare_buffer_projection(dsk) + projection_data = _prepare_buffer_projection(dsk, keys) if projection_data is None: return dsk @@ -258,7 +247,7 @@ def _mock_output(layer): return new_layer -def rewrite_layer_chains(dsk: HighLevelGraph, keys: Any) -> HighLevelGraph: +def rewrite_layer_chains(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelGraph: """Smush chains of blockwise layers into a single layer. The logic here identifies chains by popping layers (in arbitrary @@ -292,7 +281,7 @@ def rewrite_layer_chains(dsk: HighLevelGraph, keys: Any) -> HighLevelGraph: chains = [] deps = copy.copy(dsk.dependencies) - required_layers = {k[0] for k in keys} + required_layers = {k[0] for k in keys if isinstance(k, tuple)} layers = {} # find chains; each chain list is at least two keys long dependents = dsk.dependents From 1661bbb8bdd25b2cd4857bb8333538f60c8c415b Mon Sep 17 00:00:00 2001 From: Angus Hollands Date: Tue, 12 Dec 2023 13:50:56 +0000 Subject: [PATCH 02/10] refactor: use clearer names --- src/dask_awkward/lib/optimize.py | 46 ++++++++++++++++---------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index 37b38d41..58c5866b 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -287,48 +287,48 @@ def rewrite_layer_chains(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelG dependents = dsk.dependents all_layers = set(dsk.layers) while all_layers: - lay = all_layers.pop() - val = dsk.layers[lay] - if not isinstance(val, AwkwardBlockwiseLayer): + layer_key = all_layers.pop() + layer = dsk.layers[layer_key] + if not isinstance(layer, AwkwardBlockwiseLayer): # shortcut to avoid making comparisons - layers[lay] = val # passthrough unchanged + layers[layer_key] = layer # passthrough unchanged continue - children = dependents[lay] - chain = [lay] - lay0 = lay + children = dependents[layer_key] + chain = [layer_key] + current_layer_key = layer_key while ( len(children) == 1 - and dsk.dependencies[list(children)[0]] == {lay} + and dsk.dependencies[list(children)[0]] == {current_layer_key} and isinstance(dsk.layers[list(children)[0]], AwkwardBlockwiseLayer) - and len(dsk.layers[lay]) == len(dsk.layers[list(children)[0]]) - and lay not in required_layers + and len(dsk.layers[current_layer_key]) == len(dsk.layers[list(children)[0]]) + and current_layer_key not in required_layers ): # walk forwards - lay = list(children)[0] - chain.append(lay) - all_layers.remove(lay) - children = dependents[lay] - lay = lay0 - parents = dsk.dependencies[lay] + current_layer_key = list(children)[0] + chain.append(current_layer_key) + all_layers.remove(current_layer_key) + children = dependents[current_layer_key] + + parents = dsk.dependencies[layer_key] while ( len(parents) == 1 - and dependents[list(parents)[0]] == {lay} + and dependents[list(parents)[0]] == {layer_key} and isinstance(dsk.layers[list(parents)[0]], AwkwardBlockwiseLayer) - and len(dsk.layers[lay]) == len(dsk.layers[list(parents)[0]]) + and len(dsk.layers[layer_key]) == len(dsk.layers[list(parents)[0]]) and list(parents)[0] not in required_layers ): # walk backwards - lay = list(parents)[0] - chain.insert(0, lay) - all_layers.remove(lay) - parents = dsk.dependencies[lay] + layer_key = list(parents)[0] + chain.insert(0, layer_key) + all_layers.remove(layer_key) + parents = dsk.dependencies[layer_key] if len(chain) > 1: chains.append(chain) layers[chain[-1]] = copy.copy( dsk.layers[chain[-1]] ) # shallow copy to be mutated else: - layers[lay] = val # passthrough unchanged + layers[layer_key] = layer # passthrough unchanged # do rewrite for chain in chains: From 18b5c7b9d1206132611ddc8d4c484f64c540c76a Mon Sep 17 00:00:00 2001 From: Angus Hollands Date: Tue, 12 Dec 2023 13:53:00 +0000 Subject: [PATCH 03/10] refactor: use walrus for readability --- src/dask_awkward/lib/optimize.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index 58c5866b..5d6f4915 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -298,13 +298,14 @@ def rewrite_layer_chains(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelG current_layer_key = layer_key while ( len(children) == 1 - and dsk.dependencies[list(children)[0]] == {current_layer_key} - and isinstance(dsk.layers[list(children)[0]], AwkwardBlockwiseLayer) - and len(dsk.layers[current_layer_key]) == len(dsk.layers[list(children)[0]]) + and dsk.dependencies[first_child := next(iter(children))] + == {current_layer_key} + and isinstance(dsk.layers[first_child], AwkwardBlockwiseLayer) + and len(dsk.layers[current_layer_key]) == len(dsk.layers[first_child]) and current_layer_key not in required_layers ): # walk forwards - current_layer_key = list(children)[0] + current_layer_key = first_child chain.append(current_layer_key) all_layers.remove(current_layer_key) children = dependents[current_layer_key] @@ -312,13 +313,13 @@ def rewrite_layer_chains(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelG parents = dsk.dependencies[layer_key] while ( len(parents) == 1 - and dependents[list(parents)[0]] == {layer_key} - and isinstance(dsk.layers[list(parents)[0]], AwkwardBlockwiseLayer) - and len(dsk.layers[layer_key]) == len(dsk.layers[list(parents)[0]]) - and list(parents)[0] not in required_layers + and dependents[first_parent := next(iter(parents))] == {layer_key} + and isinstance(dsk.layers[first_parent], AwkwardBlockwiseLayer) + and len(dsk.layers[layer_key]) == len(dsk.layers[first_parent]) + and next(iter(parents)) not in required_layers ): # walk backwards - layer_key = list(parents)[0] + layer_key = first_parent chain.insert(0, layer_key) all_layers.remove(layer_key) parents = dsk.dependencies[layer_key] From 554826127d3808ea86169978fe903e7d20060578 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 18 Jan 2024 14:12:19 -0600 Subject: [PATCH 04/10] fix a bad json test; use minimal keys in optimization graph --- src/dask_awkward/lib/optimize.py | 21 ++++++++++----------- tests/test_io_json.py | 2 +- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index 5d6f4915..994250f1 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -31,11 +31,7 @@ """ -def all_optimizations( - dsk: Mapping, - keys: Hashable | list[Hashable] | set[Hashable], - **_: Any, -) -> Mapping: +def all_optimizations(dsk: Mapping, keys: Sequence[Key], **_: Any) -> Mapping: """Run all optimizations that benefit dask-awkward computations. This function will run both dask-awkward specific and upstream @@ -64,11 +60,7 @@ def all_optimizations( return dsk -def optimize( - dsk: HighLevelGraph, - keys: Sequence[Key], - **_: Any, -) -> Mapping: +def optimize(dsk: HighLevelGraph, keys: Sequence[Key], **_: Any) -> Mapping: """Run optimizations specific to dask-awkward. This is currently limited to determining the necessary columns for @@ -118,6 +110,13 @@ def _prepare_buffer_projection( hlg = HighLevelGraph(projection_layers, dsk.dependencies) + minimal_keys = set() + for k in keys: + if isinstance(k, tuple) and len(k) == 2: + minimal_keys.add((k[0], 0)) + else: + minimal_keys.add(k) + # now we try to compute for each possible output layer key (leaf # node on partition 0); this will cause the typetacer reports to # get correct fields/columns touched. If the result is a record or @@ -125,7 +124,7 @@ def _prepare_buffer_projection( try: for layer in hlg.layers.values(): layer.__dict__.pop("_cached_dict", None) - results = get_sync(hlg, list(keys)) + results = get_sync(hlg, list(minimal_keys)) for out in results: if isinstance(out, (ak.Array, ak.Record)): touch_data(out) diff --git a/tests/test_io_json.py b/tests/test_io_json.py index 9e7ca8cc..9b1dd644 100644 --- a/tests/test_io_json.py +++ b/tests/test_io_json.py @@ -91,7 +91,7 @@ def input_layer_array_partition0(collection: Array) -> ak.Array: """ with dask.config.set({"awkward.optimization.which": ["columns"]}): - optimized_hlg = dak_optimize(collection.dask, []) + optimized_hlg = dak_optimize(collection.dask, collection.keys) layers = list(optimized_hlg.layers) # type: ignore layer_name = [name for name in layers if name.startswith("from-json")][0] sgc, arg = optimized_hlg[(layer_name, 0)] From 492efffd2aaac3ee6fc7148bcab7e553176a08ef Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 18 Jan 2024 14:13:13 -0600 Subject: [PATCH 05/10] lint --- src/dask_awkward/lib/optimize.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index 994250f1..57664564 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -3,7 +3,7 @@ import copy import logging import warnings -from collections.abc import Hashable, Iterable, Mapping, Sequence +from collections.abc import Iterable, Mapping, Sequence from typing import TYPE_CHECKING, Any, cast import dask.config From cf9dbec661c9d89ed26a82948f1314f85b837bcd Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 18 Jan 2024 14:19:29 -0600 Subject: [PATCH 06/10] mypy --- src/dask_awkward/lib/optimize.py | 4 +--- tests/test_io_json.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index 57664564..e3176a7e 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -38,8 +38,6 @@ def all_optimizations(dsk: Mapping, keys: Sequence[Key], **_: Any) -> Mapping: general optimizations from core dask. """ - if not isinstance(keys, (list, set)): - keys = (keys,) # pragma: no cover keys = tuple(flatten(keys)) if not isinstance(dsk, HighLevelGraph): @@ -110,7 +108,7 @@ def _prepare_buffer_projection( hlg = HighLevelGraph(projection_layers, dsk.dependencies) - minimal_keys = set() + minimal_keys: set[Key] = set() for k in keys: if isinstance(k, tuple) and len(k) == 2: minimal_keys.add((k[0], 0)) diff --git a/tests/test_io_json.py b/tests/test_io_json.py index 9b1dd644..4837602e 100644 --- a/tests/test_io_json.py +++ b/tests/test_io_json.py @@ -91,7 +91,7 @@ def input_layer_array_partition0(collection: Array) -> ak.Array: """ with dask.config.set({"awkward.optimization.which": ["columns"]}): - optimized_hlg = dak_optimize(collection.dask, collection.keys) + optimized_hlg = dak_optimize(collection.dask, collection.keys) # type: ignore layers = list(optimized_hlg.layers) # type: ignore layer_name = [name for name in layers if name.startswith("from-json")][0] sgc, arg = optimized_hlg[(layer_name, 0)] From 7d502e3a2470c7f5f8f431611804211e757e66da Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 18 Jan 2024 14:29:52 -0600 Subject: [PATCH 07/10] something with the walrus not working in earlier python versions --- src/dask_awkward/lib/optimize.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index e3176a7e..87afe88e 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -14,6 +14,7 @@ from dask.local import get_sync from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardInputLayer +from dask_awkward.utils import first if TYPE_CHECKING: from awkward._nplikes.typetracer import TypeTracerReport @@ -295,14 +296,13 @@ def rewrite_layer_chains(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelG current_layer_key = layer_key while ( len(children) == 1 - and dsk.dependencies[first_child := next(iter(children))] - == {current_layer_key} - and isinstance(dsk.layers[first_child], AwkwardBlockwiseLayer) - and len(dsk.layers[current_layer_key]) == len(dsk.layers[first_child]) + and dsk.dependencies[first(children)] == {current_layer_key} + and isinstance(dsk.layers[first(children)], AwkwardBlockwiseLayer) + and len(dsk.layers[current_layer_key]) == len(dsk.layers[first(children)]) and current_layer_key not in required_layers ): # walk forwards - current_layer_key = first_child + current_layer_key = first(children) chain.append(current_layer_key) all_layers.remove(current_layer_key) children = dependents[current_layer_key] @@ -310,13 +310,13 @@ def rewrite_layer_chains(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelG parents = dsk.dependencies[layer_key] while ( len(parents) == 1 - and dependents[first_parent := next(iter(parents))] == {layer_key} - and isinstance(dsk.layers[first_parent], AwkwardBlockwiseLayer) - and len(dsk.layers[layer_key]) == len(dsk.layers[first_parent]) + and dependents[first(parents)] == {layer_key} + and isinstance(dsk.layers[first(parents)], AwkwardBlockwiseLayer) + and len(dsk.layers[layer_key]) == len(dsk.layers[first(parents)]) and next(iter(parents)) not in required_layers ): # walk backwards - layer_key = first_parent + layer_key = first(parents) chain.insert(0, layer_key) all_layers.remove(layer_key) parents = dsk.dependencies[layer_key] From fbf5e21541bcf7d980b1b995c05d9107a04c870f Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 18 Jan 2024 16:01:09 -0600 Subject: [PATCH 08/10] test that mimics lindsey's issue found in coffea --- tests/test_optimize.py | 75 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/tests/test_optimize.py b/tests/test_optimize.py index c9cf8ec0..ffafb423 100644 --- a/tests/test_optimize.py +++ b/tests/test_optimize.py @@ -76,3 +76,78 @@ def test_multiple_computes_multiple_incapsulated(daa, caa): (opt4_alone,) = dask.optimize(dstep4) assert len(opt4_alone.dask.layers) == 1 assert_eq(opt4_alone, opt4) + + +def test_optimization_runs_on_multiple_collections(tmp_path_factory): + d = tmp_path_factory.mktemp("opt") + array1 = ak.Array( + [ + { + "points": [ + {"x": 3.0, "y": 4.0, "z": 1.0}, + {"x": 2.0, "y": 5.0, "z": 2.0}, + ], + }, + { + "points": [ + {"x": 2.0, "y": 5.0, "z": 2.0}, + {"x": 3.0, "y": 4.0, "z": 1.0}, + ], + }, + { + "points": [], + }, + { + "points": [ + {"x": 2.0, "y": 6.0, "z": 2.0}, + ], + }, + ] + ) + ak.to_parquet(array1, d / "p0.parquet", extensionarray=False) + + array2 = ak.Array( + [ + { + "points": [ + {"x": 1.0, "y": 4.0, "z": 1.0}, + ], + }, + { + "points": [ + {"x": 7.0, "y": 5.0, "z": 2.0}, + {"x": 3.0, "y": 4.0, "z": 1.0}, + {"x": 5.0, "y": 5.0, "z": 2.0}, + ], + }, + { + "points": [ + {"x": 2.0, "y": 6.0, "z": 2.0}, + ], + }, + { + "points": [], + }, + ] + ) + ak.to_parquet(array2, d / "p1.parquet", extensionarray=False) + + ds = dak.from_parquet(d) + a1 = ds.partitions[0].points + a2 = ds.partitions[1].points + a, b = ak.unzip(ak.cartesian([a1, a2], axis=1, nested=True)) + + def something(j, k): + return j.x + k.x + + a_compute = something(a, b) + nc1 = dak.necessary_columns(a_compute) + assert sorted(list(nc1.items())[0][1]) == ["points.x"] + + nc2 = dak.necessary_columns(a_compute, (a, b)) + assert sorted(list(nc2.items())[0][1]) == ["points.x", "points.y", "points.z"] + + x, (y, z) = dask.compute(a_compute, (a, b)) + assert str(x) + assert str(y) + assert str(z) From f2bd8b7dc87816663b7540494987e941c17a52de Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 18 Jan 2024 16:01:51 -0600 Subject: [PATCH 09/10] rename test --- tests/test_optimize.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_optimize.py b/tests/test_optimize.py index ffafb423..0cfc02ac 100644 --- a/tests/test_optimize.py +++ b/tests/test_optimize.py @@ -78,7 +78,7 @@ def test_multiple_computes_multiple_incapsulated(daa, caa): assert_eq(opt4_alone, opt4) -def test_optimization_runs_on_multiple_collections(tmp_path_factory): +def test_optimization_runs_on_multiple_collections_gh430(tmp_path_factory): d = tmp_path_factory.mktemp("opt") array1 = ak.Array( [ From 6b4c3b0f0880cb8a00dc76d4b19f3331aa9cb224 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 18 Jan 2024 16:04:31 -0600 Subject: [PATCH 10/10] importorskip --- tests/test_optimize.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_optimize.py b/tests/test_optimize.py index 0cfc02ac..7f67d836 100644 --- a/tests/test_optimize.py +++ b/tests/test_optimize.py @@ -2,6 +2,7 @@ import awkward as ak import dask +import pytest import dask_awkward as dak from dask_awkward.lib.testutils import assert_eq @@ -79,6 +80,7 @@ def test_multiple_computes_multiple_incapsulated(daa, caa): def test_optimization_runs_on_multiple_collections_gh430(tmp_path_factory): + pytest.importorskip("pyarrow") d = tmp_path_factory.mktemp("opt") array1 = ak.Array( [