Skip to content

Commit

Permalink
Succeed even when subset is empty (#83)
Browse files Browse the repository at this point in the history
Fixes #79
  • Loading branch information
chuckwondo authored Jul 10, 2024
1 parent 4586dd6 commit 3b28c68
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 51 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ The format is based on [Keep a Changelog], and this project adheres to
- Set default job queue to `maap-dps-worker-32vcpu-64gb` to improve performance
by running on 32 CPUs
([#78](https://github.com/MAAP-Project/gedi-subsetter/issues/78))
- Succeed even when the result is an empty subset
([#79](https://github.com/MAAP-Project/gedi-subsetter/issues/79))

### Added

Expand Down
5 changes: 0 additions & 5 deletions src/gedi_subset/gedi_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import requests
import shapely
from maap.Result import Granule
from returns.curry import curry
from returns.io import IOResultE, impure_safe

import gedi_subset.fp as fp
Expand All @@ -32,13 +31,11 @@ def pprint(value: Any) -> None:
print(json.dumps(value, indent=2))


# str -> str -> str
def chext(ext: str, path: str) -> str:
"""Changes the extension of a path."""
return f"{os.path.splitext(path)[0]}{ext}"


@curry
def gdf_to_file(
file: Union[str, os.PathLike], props: Mapping[str, Any], gdf: gpd.GeoDataFrame
) -> IOResultE[None]:
Expand All @@ -51,7 +48,6 @@ def gdf_to_file(
return impure_safe(gdf.to_file)(file, **props)


@curry
def gdf_to_parquet(
path: Union[str, os.PathLike], gdf: gpd.GeoDataFrame
) -> IOResultE[None]:
Expand Down Expand Up @@ -108,7 +104,6 @@ def granule_geometry(granule: Granule) -> shapely.Geometry:
return shapely.union_all(polygons)


@curry
def granule_intersects(aoi: shapely.Geometry, granule: Granule):
"""Determines whether or not a granule intersects an Area of Interest
Expand Down
113 changes: 67 additions & 46 deletions src/gedi_subset/subset.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
Callable,
Iterable,
Mapping,
NoReturn,
Optional,
Sequence,
Tuple,
Expand All @@ -28,7 +27,7 @@
from maap.Result import Collection, Granule
from returns.curry import partial
from returns.functions import raise_exception, tap
from returns.io import IOFailure, IOResult, IOResultE, IOSuccess, impure_safe
from returns.io import IOResultE, IOSuccess
from returns.iterables import Fold
from returns.maybe import Maybe, Nothing, Some
from returns.pipeline import flow, is_successful, pipe
Expand Down Expand Up @@ -103,26 +102,44 @@ def is_gedi_collection(c: Collection) -> bool:
return c.get("ShortName", "").startswith("GEDI") and data_format == "HDF5"


def find_gedi_collection(
maap: MAAP, params: Mapping[str, str]
) -> IOResultE[Collection]:
"""Find a GEDI collection matching the given parameters.
Return `IOSuccess[Collection]` containing the collection upon successful
search; otherwise return `IOFailure[Exception]` containing the reason for
failure, which is a `ValueError` when there is no matching collection or
the collection is _not_ a GEDI collection.
def find_gedi_collection(maap: MAAP, params: Mapping[str, str]) -> Collection:
"""Find a GEDI collection matching search parameters.
Parameters
----------
maap
MAAP client to use for searching for the collection.
params
Search parameters to use when searching for the collection. For
available search parameters, see the
[CMR Search API documentation](https://cmr.earthdata.nasa.gov/search/site/docs/search/api.html). # noqa: E501
Returns
-------
collection
First GEDI collection that matches the search parameters.
Raises
------
ValueError
If the query failed, no GEDI collection was found, or multiple collections were
found.
Examples
--------
>>> maap = MAAP("api.maap-project.org")
>>> find_collection(maap, {"cloud_hosted": "true", "doi": "10.3334/ORNLDAAC/2056"}) # doctest: +SKIP # noqa: E501
{'concept-id': 'C2237824918-ORNL_CLOUD', 'revision-id': '28',
'format': 'application/echo10+xml',
'Collection': {'ShortName': 'GEDI_L4A_AGB_Density_V2_1_2056', ...}}
"""
return (
IOSuccess(c)
if is_gedi_collection(c := find_collection(maap, params))
else IOFailure(
ValueError(
f"Collection {c['Collection']['ShortName']} is not a GEDI"
" collection, or does not contain HDF5 data files."
)
if not is_gedi_collection(collection := find_collection(maap, params)):
raise ValueError(
f"Collection {collection['Collection']['ShortName']} is not a GEDI"
" collection, or does not contain HDF5 data files."
)
)

return collection


def beam_filter(beams: str) -> Callable[[h5py.Group], bool]:
Expand All @@ -135,7 +152,7 @@ def beam_filter(beams: str) -> Callable[[h5py.Group], bool]:
return beam_filter_from_names([item.strip() for item in beams.split(",")])


def check_beams_option(value: str) -> str | NoReturn:
def check_beams_option(value: str) -> str:
upper_value = value.upper()
suffixes = [name.strip().lstrip("BEAM") for name in upper_value.split(",")]
valid_suffixes = ["0000", "0001", "0010", "0011", "0101", "0110", "1000", "1011"]
Expand Down Expand Up @@ -265,8 +282,10 @@ def append_subset(src: str) -> IOResultE[str]:
# If we have at least 10 granules per process, use a chunksize of 10.
chunksize = 10 if processes and len(downloadable_granules) >= 10 * processes else 1

logger.info(f"Found {len(found_granules)} in the CMR")
logger.info(f"Total downloadable granules: {len(downloadable_granules)}")
logger.info(
f"Granules found in the CMR: {len(found_granules)}"
f" (downloadable: {len(downloadable_granules)})"
)

payloads = (
SubsetGranuleProps(
Expand Down Expand Up @@ -424,23 +443,30 @@ def main(
maap = MAAP("api.maap-project.org")
cmr_host = "cmr.earthdata.nasa.gov"

IOResult.do(
subsets
for aoi_gdf in impure_safe(gpd.read_file)(aoi)
# Use wildcards around DOI value because some collections have incorrect
# DOI values. For example, the L2B collection has the full DOI URL as
# the DOI value (i.e., https://doi.org/<DOI> rather than just <DOI>).
for collection in find_gedi_collection(
maap, dict(cmr_host=cmr_host, doi=f"*{doi}*", cloud_hosted="true")
)
for granules in impure_safe(maap.searchGranule)(
aoi_gdf = gpd.read_file(aoi)
aoi_geometry = aoi_gdf.unary_union
# Use wildcards around DOI value because some collections have incorrect
# DOI values. For example, the L2B collection has the full DOI URL as
# the DOI value (i.e., https://doi.org/<DOI> rather than just <DOI>).
collection = find_gedi_collection(
maap, dict(cmr_host=cmr_host, doi=f"*{doi}*", cloud_hosted="true")
)
granules = [
granule
for granule in maap.searchGranule(
cmr_host=cmr_host,
collection_concept_id=collection["concept-id"],
bounding_box=",".join(fp.map(str)(aoi_gdf.total_bounds)), # pyright: ignore
limit=limit,
**(dict(temporal=temporal) if temporal else {}),
)
for subsets in subset_granules(
if granule_intersects(aoi_geometry, granule)
]

if not granules:
logger.info("No granules intersect the AOI within the temporal range.")
elif gpq_paths := unsafe_perform_io(
subset_granules(
maap,
aoi_gdf,
lat,
Expand All @@ -451,21 +477,16 @@ def main(
output_dir,
dest,
(logging_level,),
fp.filter(granule_intersects(aoi_gdf.unary_union))(granules),
granules,
fsspec_kwargs,
processes,
)
).bind_ioresult(
lambda subsets: (
IOSuccess(subsets)
if subsets
else IOFailure(ValueError(f"No granules intersect the AOI: {aoi}"))
)
).map(
lambda subsets: logger.info(f"Subset {len(subsets)} granule(s) to {dest}")
).alt(
raise_exception
)
.alt(raise_exception)
.unwrap()
):
logger.info(f"Subset {len(gpq_paths)} granule(s) to {dest}.")
else:
logger.info(f"Empty subset: no rows satisfy the query {query!r}")


if __name__ == "__main__":
Expand Down

0 comments on commit 3b28c68

Please sign in to comment.