Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
fbunt committed Jan 17, 2025
2 parents 3d2d57d + c07ba76 commit b7a9383
Showing 1 changed file with 31 additions and 29 deletions.
60 changes: 31 additions & 29 deletions raster_tools/rasterize.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def _reduce_stacked_feature_rasters(

def _rasterize_spatial_matches(
matches,
partitions,
dgdf,
like_chunk_rasters,
all_touched,
fill=None,
Expand All @@ -323,6 +323,24 @@ def _rasterize_spatial_matches(
mask=False,
mask_invert=False,
):
# NOTE: Convert the partitions to delayed objects to work around some
# flakey behavior in dask. The partitions are passed into map_blocks below
# as args, which should work just fine. A very small and random percentage
# of the time, however, this fails and one of the partitions will evaluate
# to a pandas.Series object with a single element containing a dask graph
# key(?). This started happening after dask-expr became the main dask
# dataframe backend so I believe the issue originates there. Converting to
# delayed objects avoids this behavior.
partitions_as_delayed = dgdf.to_delayed()
chunk_func = _mask_onto_chunk if mask else _rasterize_onto_chunk
target_dtype = U8 if mask else target_dtype
func_kwargs = {"all_touched": all_touched}
if mask:
func_kwargs["invert"] = mask_invert
else:
func_kwargs["out_dtype"] = target_dtype
func_kwargs["fill"] = fill
func_kwargs["overlap_resolve_method"] = overlap_resolve_method
# Create a list for holding rasterization results. Each element corresponds
# to a chunk in the like raster. All elements start as None. Elements will
# will be replaced by a stack of dask arrays, if that chunk intersects a
Expand All @@ -332,37 +350,21 @@ def _rasterize_spatial_matches(
# Group by partition and iterate over the groups
for ipart, grp in matches.groupby("part_idx"):
# Get the vector partition
part = partitions[ipart]
part = partitions_as_delayed[ipart]
# Iterate over the chunks that intersected the vector partition and
# rasterize the partition to each intersecting chunk's grid
for _, row in grp.iterrows():
little_like = like_chunk_rasters[row.flat_idx]
if not mask:
chunk = da.map_blocks(
_rasterize_onto_chunk,
dtype=target_dtype,
chunks=little_like.shape[1:],
meta=np.array((), dtype=target_dtype),
# func args
gdf=part,
transform=little_like.affine,
out_dtype=target_dtype,
fill=fill,
all_touched=all_touched,
overlap_resolve_method=overlap_resolve_method,
)
else:
chunk = da.map_blocks(
_mask_onto_chunk,
dtype=U8,
chunks=little_like.shape[1:],
meta=np.array((), dtype=U8),
# Func args
gdf=part,
transform=little_like.affine,
all_touched=all_touched,
invert=mask_invert,
)
func_kwargs["transform"] = little_like.affine
chunk = da.map_blocks(
chunk_func,
part,
dtype=target_dtype,
chunks=little_like.shape[1:],
meta=np.array((), dtype=target_dtype),
# func args
**func_kwargs,
)
if out_chunks[row.flat_idx] is None:
out_chunks[row.flat_idx] = []
out_chunks[row.flat_idx].append(chunk)
Expand Down Expand Up @@ -465,7 +467,7 @@ def _rasterize_spatial_aware(
# Each element is either None or a list of 2D dask arrays
raw_chunk_list = _rasterize_spatial_matches(
matches,
dgdf.partitions,
dgdf,
like_chunk_rasters,
all_touched,
fill=fill,
Expand Down

0 comments on commit b7a9383

Please sign in to comment.