From 83a4751c64985b292247e197012547077ac1c549 Mon Sep 17 00:00:00 2001 From: Sam Van Kooten Date: Fri, 25 Oct 2024 17:08:39 -0400 Subject: [PATCH 1/6] Build dask array more directly The simpler task graph seems to run faster --- dkist/io/dask_utils.py | 65 +++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 39 deletions(-) diff --git a/dkist/io/dask_utils.py b/dkist/io/dask_utils.py index 6fea1c3d..2028d6f0 100644 --- a/dkist/io/dask_utils.py +++ b/dkist/io/dask_utils.py @@ -1,12 +1,12 @@ -from functools import partial +from itertools import batched -import dask.array as da +import dask import numpy as np __all__ = ["stack_loader_array"] -def stack_loader_array(loader_array, chunksize): +def stack_loader_array(loader_array, chunksize=None, batch_size=1): """ Stack a loader array along each of its dimensions. @@ -22,39 +22,26 @@ def stack_loader_array(loader_array, chunksize): """ # If the chunksize isn't specified then use the whole array shape chunksize = chunksize or loader_array.flat[0].shape - - if loader_array.size == 1: - return tuple(loader_to_dask(loader_array, chunksize))[0] - if len(loader_array.shape) == 1: - return da.stack(loader_to_dask(loader_array, chunksize)) - stacks = [] - for i in range(loader_array.shape[0]): - stacks.append(stack_loader_array(loader_array[i], chunksize)) - return da.stack(stacks) - - -def _partial_to_array(loader, *, meta, chunks): - # Set the name of the array to the filename, that should be unique within the array - return da.from_array(loader, meta=meta, chunks=chunks, name=loader.fileuri) - - -def loader_to_dask(loader_array, chunksize): - """ - Map a call to `dask.array.from_array` onto all the elements in ``loader_array``. - - This is done so that an explicit ``meta=`` argument can be provided to - prevent loading data from disk. - """ - if loader_array.size != 1 and len(loader_array.shape) != 1: - raise ValueError("Can only be used on one dimensional arrays") - - loader_array = np.atleast_1d(loader_array) - - # The meta argument to from array is used to determine properties of the - # array, such as dtype. We explicitly specify it here to prevent dask - # trying to auto calculate it by reading from the actual array on disk. - meta = np.zeros((0,), dtype=loader_array[0].dtype) - - to_array = partial(_partial_to_array, meta=meta, chunks=chunksize) - - return map(to_array, loader_array) + file_shape = loader_array.flat[0].shape + + tasks = {} + batches = list(batched(loader_array.flat, batch_size)) + for i, loaders in enumerate(batches): + key = ("load_files", i) + key += (0,) * len(file_shape) + tasks[key] = (_load_batch, loaders) + + dsk = dask.highlevelgraph.HighLevelGraph.from_collections("load_files", tasks, dependencies=()) + chunks = (tuple(len(b) for b in batches),) + tuple((s,) for s in file_shape) + array = dask.array.Array(dsk, + name="load_files", + chunks=chunks, + dtype=loader_array.flat[0].dtype) + return array + + +def _load_batch(loaders): + arrays = [loader.data for loader in loaders] + if len(arrays) == 1: + return arrays[0] + return np.concatenate(arrays) From ccfc60059042bce432d096caaadbf301fa373056 Mon Sep 17 00:00:00 2001 From: Sam Van Kooten Date: Fri, 25 Oct 2024 18:02:15 -0400 Subject: [PATCH 2/6] Try handling non-default chunksizes --- dkist/io/dask_utils.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dkist/io/dask_utils.py b/dkist/io/dask_utils.py index 2028d6f0..230cb21d 100644 --- a/dkist/io/dask_utils.py +++ b/dkist/io/dask_utils.py @@ -20,8 +20,6 @@ def stack_loader_array(loader_array, chunksize=None, batch_size=1): ------- array : `dask.array.Array` """ - # If the chunksize isn't specified then use the whole array shape - chunksize = chunksize or loader_array.flat[0].shape file_shape = loader_array.flat[0].shape tasks = {} @@ -37,6 +35,11 @@ def stack_loader_array(loader_array, chunksize=None, batch_size=1): name="load_files", chunks=chunks, dtype=loader_array.flat[0].dtype) + if chunksize is not None: + # If requested, re-chunk the array. Not sure this is optimal + new_chunks = (1,) * (array.ndim - len(chunksize)) + chunksize + array = array.rechunk(new_chunks) + # The calling function reshapes the array to the final, correct shape return array From 15d08ae3646d20529ad8e6a0574687e641a4a71c Mon Sep 17 00:00:00 2001 From: Sam Van Kooten Date: Fri, 25 Oct 2024 18:13:57 -0400 Subject: [PATCH 3/6] Move final shape responsibility into dask_utils --- dkist/io/dask_utils.py | 4 ++-- dkist/io/file_manager.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dkist/io/dask_utils.py b/dkist/io/dask_utils.py index 230cb21d..85429f2e 100644 --- a/dkist/io/dask_utils.py +++ b/dkist/io/dask_utils.py @@ -6,7 +6,7 @@ __all__ = ["stack_loader_array"] -def stack_loader_array(loader_array, chunksize=None, batch_size=1): +def stack_loader_array(loader_array, output_shape, chunksize=None, batch_size=1): """ Stack a loader array along each of its dimensions. @@ -35,11 +35,11 @@ def stack_loader_array(loader_array, chunksize=None, batch_size=1): name="load_files", chunks=chunks, dtype=loader_array.flat[0].dtype) + array = array.reshape(output_shape) if chunksize is not None: # If requested, re-chunk the array. Not sure this is optimal new_chunks = (1,) * (array.ndim - len(chunksize)) + chunksize array = array.rechunk(new_chunks) - # The calling function reshapes the array to the final, correct shape return array diff --git a/dkist/io/file_manager.py b/dkist/io/file_manager.py index a2f3e2fc..a0983cc2 100644 --- a/dkist/io/file_manager.py +++ b/dkist/io/file_manager.py @@ -77,7 +77,7 @@ def _generate_array(self) -> dask.array.Array: still have a reference to this `~.FileManager` object, meaning changes to this object will be reflected in the data loaded by the array. """ - return stack_loader_array(self.loader_array, self.chunksize).reshape(self.output_shape) + return stack_loader_array(self.loader_array, self.output_shape, self.chunksize) class StripedExternalArray(BaseStripedExternalArray): From d376bea07b4721698d2201f1d54ce0e1d4419413 Mon Sep 17 00:00:00 2001 From: Sam Van Kooten Date: Fri, 25 Oct 2024 18:14:10 -0400 Subject: [PATCH 4/6] Docstring --- dkist/io/dask_utils.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/dkist/io/dask_utils.py b/dkist/io/dask_utils.py index 85429f2e..844c8ebc 100644 --- a/dkist/io/dask_utils.py +++ b/dkist/io/dask_utils.py @@ -8,13 +8,20 @@ def stack_loader_array(loader_array, output_shape, chunksize=None, batch_size=1): """ - Stack a loader array along each of its dimensions. + Converts an array of loaders to a dask array that loads a chunk from each loader This results in a dask array with the correct chunks and dimensions. Parameters ---------- - loader_array : `dkist.io.reference_collections.BaseFITSArrayContainer` + loader_array : `dkist.io.loaders.BaseFITSLoader` + An array of loader objects + output_shape : tuple[int] + The intended shape of the final array + chunksize : tuple[int] + Can be used to set a chunk size. If not provided, each batch is one chunk + batch_size : int + The number of files to load in each dask task Returns ------- From 4a6551d9a5c14ec0365d5861fabf190772808aef Mon Sep 17 00:00:00 2001 From: Sam Van Kooten Date: Fri, 25 Oct 2024 18:20:50 -0400 Subject: [PATCH 5/6] Removing 'batching' idea, which isn't giving me any speedups --- dkist/io/dask_utils.py | 24 ++++++++---------------- dkist/io/loaders.py | 4 ++++ 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/dkist/io/dask_utils.py b/dkist/io/dask_utils.py index 844c8ebc..c461b237 100644 --- a/dkist/io/dask_utils.py +++ b/dkist/io/dask_utils.py @@ -1,12 +1,9 @@ -from itertools import batched - import dask -import numpy as np __all__ = ["stack_loader_array"] -def stack_loader_array(loader_array, output_shape, chunksize=None, batch_size=1): +def stack_loader_array(loader_array, output_shape, chunksize=None): """ Converts an array of loaders to a dask array that loads a chunk from each loader @@ -20,8 +17,6 @@ def stack_loader_array(loader_array, output_shape, chunksize=None, batch_size=1) The intended shape of the final array chunksize : tuple[int] Can be used to set a chunk size. If not provided, each batch is one chunk - batch_size : int - The number of files to load in each dask task Returns ------- @@ -30,18 +25,21 @@ def stack_loader_array(loader_array, output_shape, chunksize=None, batch_size=1) file_shape = loader_array.flat[0].shape tasks = {} - batches = list(batched(loader_array.flat, batch_size)) - for i, loaders in enumerate(batches): + for i, loader in enumerate(loader_array.flat): + # The key identifies this chunk's position in the (partially-flattened) final data cube key = ("load_files", i) key += (0,) * len(file_shape) - tasks[key] = (_load_batch, loaders) + # Each task will be to call the loader, with no arguments + tasks[key] = (loader,) dsk = dask.highlevelgraph.HighLevelGraph.from_collections("load_files", tasks, dependencies=()) - chunks = (tuple(len(b) for b in batches),) + tuple((s,) for s in file_shape) + # Specifies that each chunk occupies a space of 1 pixel in the first dimension, and all the pixels in the others + chunks = ((1,) * loader_array.size,) + tuple((s,) for s in file_shape) array = dask.array.Array(dsk, name="load_files", chunks=chunks, dtype=loader_array.flat[0].dtype) + # Now impose the higher dimensions on the data cube array = array.reshape(output_shape) if chunksize is not None: # If requested, re-chunk the array. Not sure this is optimal @@ -49,9 +47,3 @@ def stack_loader_array(loader_array, output_shape, chunksize=None, batch_size=1) array = array.rechunk(new_chunks) return array - -def _load_batch(loaders): - arrays = [loader.data for loader in loaders] - if len(arrays) == 1: - return arrays[0] - return np.concatenate(arrays) diff --git a/dkist/io/loaders.py b/dkist/io/loaders.py index 18e3d710..3a31cdfd 100644 --- a/dkist/io/loaders.py +++ b/dkist/io/loaders.py @@ -64,6 +64,10 @@ def __str__(self): def data(self): return self[:] + def __call__(self): + # Support inserting loader directly into a Dask task + return self[:] + @abc.abstractmethod def __getitem__(self, slc): pass From 4a3fd5f963f115651048fe8898626b5f5a27cfb8 Mon Sep 17 00:00:00 2001 From: Sam Van Kooten Date: Fri, 25 Oct 2024 19:24:56 -0400 Subject: [PATCH 6/6] Make sure loaded files have a leading dimension --- dkist/io/dask_utils.py | 12 ++++++++++-- dkist/io/loaders.py | 4 ---- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/dkist/io/dask_utils.py b/dkist/io/dask_utils.py index c461b237..a9a5fa27 100644 --- a/dkist/io/dask_utils.py +++ b/dkist/io/dask_utils.py @@ -1,4 +1,5 @@ import dask +import numpy as np __all__ = ["stack_loader_array"] @@ -29,8 +30,8 @@ def stack_loader_array(loader_array, output_shape, chunksize=None): # The key identifies this chunk's position in the (partially-flattened) final data cube key = ("load_files", i) key += (0,) * len(file_shape) - # Each task will be to call the loader, with no arguments - tasks[key] = (loader,) + # Each task will be to call _call_loader, with the loader as an argument + tasks[key] = (_call_loader, loader) dsk = dask.highlevelgraph.HighLevelGraph.from_collections("load_files", tasks, dependencies=()) # Specifies that each chunk occupies a space of 1 pixel in the first dimension, and all the pixels in the others @@ -47,3 +48,10 @@ def stack_loader_array(loader_array, output_shape, chunksize=None): array = array.rechunk(new_chunks) return array + +def _call_loader(loader): + data = loader.data + # The data needs an extra dimension for the leading index of the intermediate data cube, which has a leading + # index for file number + data = np.expand_dims(data, 0) + return data \ No newline at end of file diff --git a/dkist/io/loaders.py b/dkist/io/loaders.py index 3a31cdfd..18e3d710 100644 --- a/dkist/io/loaders.py +++ b/dkist/io/loaders.py @@ -64,10 +64,6 @@ def __str__(self): def data(self): return self[:] - def __call__(self): - # Support inserting loader directly into a Dask task - return self[:] - @abc.abstractmethod def __getitem__(self, slc): pass