From 3b28c686d7358eb316a10e903a486e50c55d5ed5 Mon Sep 17 00:00:00 2001 From: Chuck Daniels Date: Wed, 10 Jul 2024 11:03:26 -0400 Subject: [PATCH] Succeed even when subset is empty (#83) Fixes #79 --- CHANGELOG.md | 2 + src/gedi_subset/gedi_utils.py | 5 -- src/gedi_subset/subset.py | 113 ++++++++++++++++++++-------------- 3 files changed, 69 insertions(+), 51 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 07a3ba3..0c02901 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,8 @@ The format is based on [Keep a Changelog], and this project adheres to - Set default job queue to `maap-dps-worker-32vcpu-64gb` to improve performance by running on 32 CPUs ([#78](https://github.com/MAAP-Project/gedi-subsetter/issues/78)) +- Succeed even when the result is an empty subset + ([#79](https://github.com/MAAP-Project/gedi-subsetter/issues/79)) ### Added diff --git a/src/gedi_subset/gedi_utils.py b/src/gedi_subset/gedi_utils.py index baadeff..8614c5c 100644 --- a/src/gedi_subset/gedi_utils.py +++ b/src/gedi_subset/gedi_utils.py @@ -10,7 +10,6 @@ import requests import shapely from maap.Result import Granule -from returns.curry import curry from returns.io import IOResultE, impure_safe import gedi_subset.fp as fp @@ -32,13 +31,11 @@ def pprint(value: Any) -> None: print(json.dumps(value, indent=2)) -# str -> str -> str def chext(ext: str, path: str) -> str: """Changes the extension of a path.""" return f"{os.path.splitext(path)[0]}{ext}" -@curry def gdf_to_file( file: Union[str, os.PathLike], props: Mapping[str, Any], gdf: gpd.GeoDataFrame ) -> IOResultE[None]: @@ -51,7 +48,6 @@ def gdf_to_file( return impure_safe(gdf.to_file)(file, **props) -@curry def gdf_to_parquet( path: Union[str, os.PathLike], gdf: gpd.GeoDataFrame ) -> IOResultE[None]: @@ -108,7 +104,6 @@ def granule_geometry(granule: Granule) -> shapely.Geometry: return shapely.union_all(polygons) -@curry def granule_intersects(aoi: shapely.Geometry, granule: Granule): """Determines whether or not a granule intersects an Area of Interest diff --git a/src/gedi_subset/subset.py b/src/gedi_subset/subset.py index 01c06d1..0558b33 100755 --- a/src/gedi_subset/subset.py +++ b/src/gedi_subset/subset.py @@ -14,7 +14,6 @@ Callable, Iterable, Mapping, - NoReturn, Optional, Sequence, Tuple, @@ -28,7 +27,7 @@ from maap.Result import Collection, Granule from returns.curry import partial from returns.functions import raise_exception, tap -from returns.io import IOFailure, IOResult, IOResultE, IOSuccess, impure_safe +from returns.io import IOResultE, IOSuccess from returns.iterables import Fold from returns.maybe import Maybe, Nothing, Some from returns.pipeline import flow, is_successful, pipe @@ -103,26 +102,44 @@ def is_gedi_collection(c: Collection) -> bool: return c.get("ShortName", "").startswith("GEDI") and data_format == "HDF5" -def find_gedi_collection( - maap: MAAP, params: Mapping[str, str] -) -> IOResultE[Collection]: - """Find a GEDI collection matching the given parameters. - - Return `IOSuccess[Collection]` containing the collection upon successful - search; otherwise return `IOFailure[Exception]` containing the reason for - failure, which is a `ValueError` when there is no matching collection or - the collection is _not_ a GEDI collection. +def find_gedi_collection(maap: MAAP, params: Mapping[str, str]) -> Collection: + """Find a GEDI collection matching search parameters. + + Parameters + ---------- + maap + MAAP client to use for searching for the collection. + params + Search parameters to use when searching for the collection. For + available search parameters, see the + [CMR Search API documentation](https://cmr.earthdata.nasa.gov/search/site/docs/search/api.html). # noqa: E501 + + Returns + ------- + collection + First GEDI collection that matches the search parameters. + + Raises + ------ + ValueError + If the query failed, no GEDI collection was found, or multiple collections were + found. + + Examples + -------- + >>> maap = MAAP("api.maap-project.org") + >>> find_collection(maap, {"cloud_hosted": "true", "doi": "10.3334/ORNLDAAC/2056"}) # doctest: +SKIP # noqa: E501 + {'concept-id': 'C2237824918-ORNL_CLOUD', 'revision-id': '28', + 'format': 'application/echo10+xml', + 'Collection': {'ShortName': 'GEDI_L4A_AGB_Density_V2_1_2056', ...}} """ - return ( - IOSuccess(c) - if is_gedi_collection(c := find_collection(maap, params)) - else IOFailure( - ValueError( - f"Collection {c['Collection']['ShortName']} is not a GEDI" - " collection, or does not contain HDF5 data files." - ) + if not is_gedi_collection(collection := find_collection(maap, params)): + raise ValueError( + f"Collection {collection['Collection']['ShortName']} is not a GEDI" + " collection, or does not contain HDF5 data files." ) - ) + + return collection def beam_filter(beams: str) -> Callable[[h5py.Group], bool]: @@ -135,7 +152,7 @@ def beam_filter(beams: str) -> Callable[[h5py.Group], bool]: return beam_filter_from_names([item.strip() for item in beams.split(",")]) -def check_beams_option(value: str) -> str | NoReturn: +def check_beams_option(value: str) -> str: upper_value = value.upper() suffixes = [name.strip().lstrip("BEAM") for name in upper_value.split(",")] valid_suffixes = ["0000", "0001", "0010", "0011", "0101", "0110", "1000", "1011"] @@ -265,8 +282,10 @@ def append_subset(src: str) -> IOResultE[str]: # If we have at least 10 granules per process, use a chunksize of 10. chunksize = 10 if processes and len(downloadable_granules) >= 10 * processes else 1 - logger.info(f"Found {len(found_granules)} in the CMR") - logger.info(f"Total downloadable granules: {len(downloadable_granules)}") + logger.info( + f"Granules found in the CMR: {len(found_granules)}" + f" (downloadable: {len(downloadable_granules)})" + ) payloads = ( SubsetGranuleProps( @@ -424,23 +443,30 @@ def main( maap = MAAP("api.maap-project.org") cmr_host = "cmr.earthdata.nasa.gov" - IOResult.do( - subsets - for aoi_gdf in impure_safe(gpd.read_file)(aoi) - # Use wildcards around DOI value because some collections have incorrect - # DOI values. For example, the L2B collection has the full DOI URL as - # the DOI value (i.e., https://doi.org/ rather than just ). - for collection in find_gedi_collection( - maap, dict(cmr_host=cmr_host, doi=f"*{doi}*", cloud_hosted="true") - ) - for granules in impure_safe(maap.searchGranule)( + aoi_gdf = gpd.read_file(aoi) + aoi_geometry = aoi_gdf.unary_union + # Use wildcards around DOI value because some collections have incorrect + # DOI values. For example, the L2B collection has the full DOI URL as + # the DOI value (i.e., https://doi.org/ rather than just ). + collection = find_gedi_collection( + maap, dict(cmr_host=cmr_host, doi=f"*{doi}*", cloud_hosted="true") + ) + granules = [ + granule + for granule in maap.searchGranule( cmr_host=cmr_host, collection_concept_id=collection["concept-id"], bounding_box=",".join(fp.map(str)(aoi_gdf.total_bounds)), # pyright: ignore limit=limit, **(dict(temporal=temporal) if temporal else {}), ) - for subsets in subset_granules( + if granule_intersects(aoi_geometry, granule) + ] + + if not granules: + logger.info("No granules intersect the AOI within the temporal range.") + elif gpq_paths := unsafe_perform_io( + subset_granules( maap, aoi_gdf, lat, @@ -451,21 +477,16 @@ def main( output_dir, dest, (logging_level,), - fp.filter(granule_intersects(aoi_gdf.unary_union))(granules), + granules, fsspec_kwargs, processes, ) - ).bind_ioresult( - lambda subsets: ( - IOSuccess(subsets) - if subsets - else IOFailure(ValueError(f"No granules intersect the AOI: {aoi}")) - ) - ).map( - lambda subsets: logger.info(f"Subset {len(subsets)} granule(s) to {dest}") - ).alt( - raise_exception - ) + .alt(raise_exception) + .unwrap() + ): + logger.info(f"Subset {len(gpq_paths)} granule(s) to {dest}.") + else: + logger.info(f"Empty subset: no rows satisfy the query {query!r}") if __name__ == "__main__":