diff --git a/raster_tools/rasterize.py b/raster_tools/rasterize.py index 877120c..097e083 100644 --- a/raster_tools/rasterize.py +++ b/raster_tools/rasterize.py @@ -314,7 +314,7 @@ def _reduce_stacked_feature_rasters( def _rasterize_spatial_matches( matches, - partitions, + dgdf, like_chunk_rasters, all_touched, fill=None, @@ -323,6 +323,24 @@ def _rasterize_spatial_matches( mask=False, mask_invert=False, ): + # NOTE: Convert the partitions to delayed objects to work around some + # flakey behavior in dask. The partitions are passed into map_blocks below + # as args, which should work just fine. A very small and random percentage + # of the time, however, this fails and one of the partitions will evaluate + # to a pandas.Series object with a single element containing a dask graph + # key(?). This started happening after dask-expr became the main dask + # dataframe backend so I believe the issue originates there. Converting to + # delayed objects avoids this behavior. + partitions_as_delayed = dgdf.to_delayed() + chunk_func = _mask_onto_chunk if mask else _rasterize_onto_chunk + target_dtype = U8 if mask else target_dtype + func_kwargs = {"all_touched": all_touched} + if mask: + func_kwargs["invert"] = mask_invert + else: + func_kwargs["out_dtype"] = target_dtype + func_kwargs["fill"] = fill + func_kwargs["overlap_resolve_method"] = overlap_resolve_method # Create a list for holding rasterization results. Each element corresponds # to a chunk in the like raster. All elements start as None. Elements will # will be replaced by a stack of dask arrays, if that chunk intersects a @@ -332,37 +350,21 @@ def _rasterize_spatial_matches( # Group by partition and iterate over the groups for ipart, grp in matches.groupby("part_idx"): # Get the vector partition - part = partitions[ipart] + part = partitions_as_delayed[ipart] # Iterate over the chunks that intersected the vector partition and # rasterize the partition to each intersecting chunk's grid for _, row in grp.iterrows(): little_like = like_chunk_rasters[row.flat_idx] - if not mask: - chunk = da.map_blocks( - _rasterize_onto_chunk, - dtype=target_dtype, - chunks=little_like.shape[1:], - meta=np.array((), dtype=target_dtype), - # func args - gdf=part, - transform=little_like.affine, - out_dtype=target_dtype, - fill=fill, - all_touched=all_touched, - overlap_resolve_method=overlap_resolve_method, - ) - else: - chunk = da.map_blocks( - _mask_onto_chunk, - dtype=U8, - chunks=little_like.shape[1:], - meta=np.array((), dtype=U8), - # Func args - gdf=part, - transform=little_like.affine, - all_touched=all_touched, - invert=mask_invert, - ) + func_kwargs["transform"] = little_like.affine + chunk = da.map_blocks( + chunk_func, + part, + dtype=target_dtype, + chunks=little_like.shape[1:], + meta=np.array((), dtype=target_dtype), + # func args + **func_kwargs, + ) if out_chunks[row.flat_idx] is None: out_chunks[row.flat_idx] = [] out_chunks[row.flat_idx].append(chunk) @@ -465,7 +467,7 @@ def _rasterize_spatial_aware( # Each element is either None or a list of 2D dask arrays raw_chunk_list = _rasterize_spatial_matches( matches, - dgdf.partitions, + dgdf, like_chunk_rasters, all_touched, fill=fill,