From 66b29a02a1738e22ee24c9573c9561447b2b2dad Mon Sep 17 00:00:00 2001 From: Fred Bunt Date: Thu, 16 Jan 2025 17:14:11 -0700 Subject: [PATCH 1/2] Refactor to cleanup map_blocks call --- raster_tools/rasterize.py | 45 +++++++++++++++++---------------------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/raster_tools/rasterize.py b/raster_tools/rasterize.py index 877120c..02c52d6 100644 --- a/raster_tools/rasterize.py +++ b/raster_tools/rasterize.py @@ -323,6 +323,15 @@ def _rasterize_spatial_matches( mask=False, mask_invert=False, ): + chunk_func = _mask_onto_chunk if mask else _rasterize_onto_chunk + target_dtype = U8 if mask else target_dtype + func_kwargs = {"all_touched": all_touched} + if mask: + func_kwargs["invert"] = mask_invert + else: + func_kwargs["out_dtype"] = target_dtype + func_kwargs["fill"] = fill + func_kwargs["overlap_resolve_method"] = overlap_resolve_method # Create a list for holding rasterization results. Each element corresponds # to a chunk in the like raster. All elements start as None. Elements will # will be replaced by a stack of dask arrays, if that chunk intersects a @@ -337,32 +346,16 @@ def _rasterize_spatial_matches( # rasterize the partition to each intersecting chunk's grid for _, row in grp.iterrows(): little_like = like_chunk_rasters[row.flat_idx] - if not mask: - chunk = da.map_blocks( - _rasterize_onto_chunk, - dtype=target_dtype, - chunks=little_like.shape[1:], - meta=np.array((), dtype=target_dtype), - # func args - gdf=part, - transform=little_like.affine, - out_dtype=target_dtype, - fill=fill, - all_touched=all_touched, - overlap_resolve_method=overlap_resolve_method, - ) - else: - chunk = da.map_blocks( - _mask_onto_chunk, - dtype=U8, - chunks=little_like.shape[1:], - meta=np.array((), dtype=U8), - # Func args - gdf=part, - transform=little_like.affine, - all_touched=all_touched, - invert=mask_invert, - ) + func_kwargs["transform"] = little_like.affine + chunk = da.map_blocks( + chunk_func, + part, + dtype=target_dtype, + chunks=little_like.shape[1:], + meta=np.array((), dtype=target_dtype), + # func args + **func_kwargs, + ) if out_chunks[row.flat_idx] is None: out_chunks[row.flat_idx] = [] out_chunks[row.flat_idx].append(chunk) From c07ba7655e9d7583fe6f82af8f05655bc010fe76 Mon Sep 17 00:00:00 2001 From: Fred Bunt Date: Thu, 16 Jan 2025 17:38:49 -0700 Subject: [PATCH 2/2] Add workaround to avoid flakey Dask behavior This commit adds a workaround that avoids failures caused by dask computing an invalid results. This would happend randomly and very rarley. The issue is that when a dask-dataframe partition is passed to a map_blocks call as one of the args, it will sometimes evaluate to a pandas.Series object with a single element containing a dask graph key-tuple(?). Something like: ``` 0 (getitem-to_frame-b2fa15abf...", 0) dtype: object ``` I believe the issue originates in the dask-expr backend but I don't know how to debug it. This commit works around that by converting dataframe paritions to delayed objects before passing them to map_blocks. --- raster_tools/rasterize.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/raster_tools/rasterize.py b/raster_tools/rasterize.py index 02c52d6..097e083 100644 --- a/raster_tools/rasterize.py +++ b/raster_tools/rasterize.py @@ -314,7 +314,7 @@ def _reduce_stacked_feature_rasters( def _rasterize_spatial_matches( matches, - partitions, + dgdf, like_chunk_rasters, all_touched, fill=None, @@ -323,6 +323,15 @@ def _rasterize_spatial_matches( mask=False, mask_invert=False, ): + # NOTE: Convert the partitions to delayed objects to work around some + # flakey behavior in dask. The partitions are passed into map_blocks below + # as args, which should work just fine. A very small and random percentage + # of the time, however, this fails and one of the partitions will evaluate + # to a pandas.Series object with a single element containing a dask graph + # key(?). This started happening after dask-expr became the main dask + # dataframe backend so I believe the issue originates there. Converting to + # delayed objects avoids this behavior. + partitions_as_delayed = dgdf.to_delayed() chunk_func = _mask_onto_chunk if mask else _rasterize_onto_chunk target_dtype = U8 if mask else target_dtype func_kwargs = {"all_touched": all_touched} @@ -341,7 +350,7 @@ def _rasterize_spatial_matches( # Group by partition and iterate over the groups for ipart, grp in matches.groupby("part_idx"): # Get the vector partition - part = partitions[ipart] + part = partitions_as_delayed[ipart] # Iterate over the chunks that intersected the vector partition and # rasterize the partition to each intersecting chunk's grid for _, row in grp.iterrows(): @@ -458,7 +467,7 @@ def _rasterize_spatial_aware( # Each element is either None or a list of 2D dask arrays raw_chunk_list = _rasterize_spatial_matches( matches, - dgdf.partitions, + dgdf, like_chunk_rasters, all_touched, fill=fill,