From 194b3560f52a02c4b97c86b61bd3c14d9a897bce Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Wed, 21 Feb 2024 14:51:19 +0100 Subject: [PATCH] Issue #16 Improve progress messages, and working on fix for large slow collections: saving 10k to 100k STAC items without linking to collection --- conda-environment.yaml | 2 + configs-datasets/HRLVPP/.gitignore | 4 +- stacbuilder/__main__.py | 19 ++++++-- stacbuilder/builder.py | 78 ++++++++++++++++++++++++++++-- stacbuilder/commandapi.py | 17 ++++++- stacbuilder/metadata.py | 1 + stacbuilder/terracatalog.py | 72 ++++++++++++++++++--------- 7 files changed, 157 insertions(+), 36 deletions(-) diff --git a/conda-environment.yaml b/conda-environment.yaml index bc62aae..734e5cf 100644 --- a/conda-environment.yaml +++ b/conda-environment.yaml @@ -11,6 +11,7 @@ dependencies: - pyproj=3.6.* - pystac[validation]=1.8.* - rasterio=1.3.* + - requests==2.31.* - rio-stac=0.8.* - shapely=2.0.* - stac-validator=3.3.* @@ -22,3 +23,4 @@ dependencies: - pip: - --extra-index-url https://artifactory.vgt.vito.be/artifactory/api/pypi/python-packages/simple - terracatalogueclient==0.1.14 + - requests_auth==7.0.* diff --git a/configs-datasets/HRLVPP/.gitignore b/configs-datasets/HRLVPP/.gitignore index 71952e9..72e8ffc 100644 --- a/configs-datasets/HRLVPP/.gitignore +++ b/configs-datasets/HRLVPP/.gitignore @@ -1,3 +1 @@ -STAC_wip/* -STAC_publish/* -openeo-test-out/* +* diff --git a/stacbuilder/__main__.py b/stacbuilder/__main__.py index 3c7572b..4c05658 100644 --- a/stacbuilder/__main__.py +++ b/stacbuilder/__main__.py @@ -428,11 +428,23 @@ def vpp_upload_to_stac_api(collection_path: str): @cli.command -def vpp_show_collection_configs(): +@click.option("-o", "--output-file", type=click.File(mode="w", encoding="utf8")) +@click.argument("collection_id", type=click.STRING) +def vpp_get_collection_config(collection_id: str, output_file): + """Display the CollectionConfig for the specified collection in HRL VPP.""" + coll_cfg: CollectionConfig = commandapi.vpp_get_collection_config(collection_id) + pprint.pprint(coll_cfg.model_dump()) + + if output_file: + output_file.write(coll_cfg.model_dump_json(indent=2)) + + +@cli.command +def vpp_show_all_collection_configs(): """Display the CollectionConfig for each of the collections in HRL VPP.""" coll_cfg: CollectionConfig - for coll_cfg in commandapi.vpp_get_collection_configs(): + for coll_cfg in commandapi.vpp_get_all_collection_configs(): pprint.pprint(coll_cfg.model_dump()) print() @@ -446,9 +458,6 @@ def vpp_list_tcc_collections(properties): if properties: pprint.pprint(coll.properties) - # num_prods = catalogue.get_product_count(collection.id) - # pprint(f"product count for coll_id {collection.id}: {num_prods}") - @cli.command def vpp_count_products(): diff --git a/stacbuilder/builder.py b/stacbuilder/builder.py index 7a24da0..bf8b325 100644 --- a/stacbuilder/builder.py +++ b/stacbuilder/builder.py @@ -6,6 +6,7 @@ # Standard libraries import datetime as dt from http.client import RemoteDisconnected +import inspect import json import logging import shutil @@ -442,17 +443,23 @@ def build_collection( self, stac_items: Iterable[Item], group: Optional[str | int] = None, + link_items: bool = True, ) -> None: """Create and save the STAC collection.""" self.reset() self._stac_items = list(stac_items) or [] self.create_empty_collection(group=group) - self.add_items_to_collection() + if link_items: + self.add_items_to_collection() + else: + self.save_items_outside_collection() def add_items_to_collection( self, ): """Fills the collection with stac items.""" + self._log_progress_message("START: add_items_to_collection") + if self._collection is None: raise InvalidOperation("Can not add items to a collection that has not been created yet.") @@ -464,8 +471,40 @@ def add_items_to_collection( continue self._collection.add_item(item) + self._log_progress_message("updating collection extent") self._collection.update_extent_from_items() + self._log_progress_message("DONE: add_items_to_collection") + + def save_items_outside_collection( + self, + ): + """Fills the collection with stac items.""" + self._log_progress_message("START: save_items_outside_collection") + + if self._collection is None: + raise InvalidOperation("Can not add items to a collection that has not been created yet.") + + items = [i for i in self._stac_items if i is not None] + item: Item + for item in items: + for asset in item.assets: + asset.owner = self._collection + + stac_item_dir = self.output_dir / self.collection.id + if not self.stac_item_dir.exists(): + self.stac_item_dir.mkdir(parents=True) + + from pystac import ItemCollection + + item_collection = ItemCollection(items) + item_collection.save_object(dest_href=stac_item_dir) + + self._log_progress_message("updating collection extent") + self._collection.extent = Extent.from_items(items) + + self._log_progress_message("DONE: save_items_outside_collection") + def normalize_hrefs(self, skip_unresolved: bool = False): layout_template = self._collection_config.layout_strategy_item_template strategy = TemplateLayoutStrategy(item_template=layout_template) @@ -477,6 +516,7 @@ def normalize_hrefs(self, skip_unresolved: bool = False): def validate_collection(self, collection: Collection): """Run STAC validation on the collection.""" + self._log_progress_message("START: validate_collection") try: num_items_validated = collection.validate_all(recursive=True) except STACValidationError as exc: @@ -487,9 +527,11 @@ def validate_collection(self, collection: Collection): else: print(f"Collection valid: number of items validated: {num_items_validated}") + self._log_progress_message("DONE: validate_collection") + def save_collection(self) -> None: """Save the STAC collection to file.""" - _logger.info("Saving files ...") + self._log_progress_message("START: Saving files ...") if not self.output_dir.exists(): self.output_dir.mkdir(parents=True) @@ -499,6 +541,7 @@ def save_collection(self) -> None: # The href links to asset files also have the be relative (to the location of the STAC item) # This needs to be done via the href_modifier self._collection.save(catalog_type=CatalogType.SELF_CONTAINED) + self._log_progress_message("DONE: Saving files ...") @property def providers(self): @@ -506,6 +549,8 @@ def providers(self): def create_empty_collection(self, group: Optional[str | int] = None) -> None: """Creates a STAC Collection with no STAC items.""" + self._log_progress_message("START: create_empty_collection") + coll_config: CollectionConfig = self._collection_config if group: @@ -544,6 +589,7 @@ def create_empty_collection(self, group: Optional[str | int] = None) -> None: ## ) self._collection = collection + self._log_progress_message("DONE: create_empty_collection") def get_default_extent(self): end_dt = dt.datetime.utcnow() @@ -577,6 +623,10 @@ def _get_item_assets_definitions(self) -> List[AssetDefinition]: return asset_definitions + def _log_progress_message(self, message: str) -> None: + calling_method_name = inspect.stack()[2][3] + _logger.info(f"PROGRESS: {self.__class__.__name__}.{calling_method_name}: {message}") + class PostProcessSTACCollectionFile: """Takes an existing STAC collection file and runs optional postprocessing steps. @@ -720,7 +770,7 @@ def __init__( # Settings: these are just data, not components we delegate work to. self._output_base_dir: Path = self._get_output_dir_or_default(output_dir) self._collection_dir: Path = None - self._overwrite: bool = overwrite + self._overwrite: bool = bool(overwrite) # Components / dependencies that must be provided self._metadata_collector: IMetadataCollector = metadata_collector @@ -870,6 +920,7 @@ def group_stac_items_by(self) -> Dict[int, List[Item]]: def collect_stac_items(self): """Generate the intermediate STAC Item objects.""" + self._log_progress_message("START: collect_stac_items") groups = self.group_metadata_by_item_id(self.get_metadata()) for assets in groups.values(): @@ -883,6 +934,8 @@ def collect_stac_items(self): print(f"Skipped validation of {stac_item.get_self_href()} due to RemoteDisconnected.") yield stac_item + self._log_progress_message("DONE: collect_stac_items") + # TODO: [simplify] [refactor] Merge this into collect_stac_items once it works well and it has tests. @staticmethod def group_metadata_by_item_id(iter_metadata) -> Dict[int, List[Item]]: @@ -914,11 +967,16 @@ def get_stac_items_as_dataframe(self) -> pd.DataFrame: """Return a pandas DataFrame representing the STAC Items, without the geometry.""" return GeodataframeExporter.stac_items_to_dataframe(list(self.collect_stac_items())) - def build_collection(self): + def build_collection( + self, + link_items: Optional[bool] = True, + ): """Build the entire STAC collection.""" + self._log_progress_message("START: build_collection") + self.reset() - self._collection_builder.build_collection(self.collect_stac_items()) + self._collection_builder.build_collection(self.collect_stac_items(), link_items=link_items) self._collection_builder.normalize_hrefs() self._collection_builder.save_collection() self._collection = self._collection_builder.collection @@ -927,10 +985,14 @@ def build_collection(self): post_processor = PostProcessSTACCollectionFile(collection_overrides=self._collection_config.overrides) post_processor.process_collection(coll_file) + self._log_progress_message("DONE: build_collection") + def get_collection_file_for_group(self, group: str | int): return self._output_base_dir / str(group) def build_grouped_collections(self): + self._log_progress_message("START: build_grouped_collections") + self.reset() if not self.uses_collection_groups: @@ -961,6 +1023,12 @@ def build_grouped_collections(self): coll_file = Path(self._collection_groups[group].self_href) post_processor.process_collection(coll_file) + self._log_progress_message("DONE: build_grouped_collections") + + def _log_progress_message(self, message: str) -> None: + calling_method_name = inspect.stack()[2][3] + _logger.info(f"PROGRESS: {self.__class__.__name__}.{calling_method_name}: {message}") + class GeoTiffPipeline: """A pipeline to generate a STAC collection from a directory containing GeoTIFF files. diff --git a/stacbuilder/commandapi.py b/stacbuilder/commandapi.py index 416e664..7df4040 100644 --- a/stacbuilder/commandapi.py +++ b/stacbuilder/commandapi.py @@ -381,7 +381,10 @@ def vpp_build_all_collections( pipeline.build_collection() +# TODO: remove _get_tcc_collection_id, makes the commands too complicated and we have a better solution now. +# Now that we can list the HRL VPP collection names we can copy paste from that output. def _get_tcc_collection_id(collection_id: Optional[str], collection_number: Optional[int]) -> str: + """DEPRECATED Helper method to select the collection without dealing with long names""" if not collection_id and not collection_number: raise ValueError( "No collection was specified. " @@ -444,7 +447,19 @@ def vpp_count_products() -> list[tcc.Collection]: return {c.id: catalogue.get_product_count(c.id) for c in collections} -def vpp_get_collection_configs() -> list[CollectionConfig]: +def vpp_get_collection_config(collection_id: str) -> list[CollectionConfig]: + """Display the CollectionConfig for each of the collections in HRL VPP.""" + if not collection_id: + raise ValueError(f'Argument "collection_id" must have a value. {collection_id=!r}, {type(collection_id)=}') + collector = HRLVPPMetadataCollector() + + collector.collection_id = collection_id + tcc_coll = collector.get_tcc_collection() + conf_builder = CollectionConfigBuilder(tcc_coll) + return conf_builder.get_collection_config() + + +def vpp_get_all_collection_configs() -> list[CollectionConfig]: """Display the CollectionConfig for each of the collections in HRL VPP.""" collector = HRLVPPMetadataCollector() diff --git a/stacbuilder/metadata.py b/stacbuilder/metadata.py index 24bbb3e..8a126e8 100644 --- a/stacbuilder/metadata.py +++ b/stacbuilder/metadata.py @@ -727,6 +727,7 @@ def save_geodataframe(gdf: gpd.GeoDataFrame, out_dir: Path, table_name: str) -> print(f"Saving pipe-separated CSV file to: {csv_path}") gdf.to_csv(csv_path, sep="|") + # TODO: Shapefile has too many problems with unsupported column types. Going to remove it (but in a separate branch/PR). print(f"Saving shapefile to: {shapefile_path }") gdf.to_file(shapefile_path) diff --git a/stacbuilder/terracatalog.py b/stacbuilder/terracatalog.py index 83602da..feaf3fb 100644 --- a/stacbuilder/terracatalog.py +++ b/stacbuilder/terracatalog.py @@ -37,18 +37,48 @@ def get_coll_temporal_extent(collection: tcc.Collection) -> Tuple[dt.datetime | None, dt.datetime | None]: acquisitionInformation = collection.properties["acquisitionInformation"] pprint(acquisitionInformation) + + # acquisitionInformation contains one or more acquisitionParameters, and each has a start + end datetime. + # It looks like this is mainly used to describe the period for each platform, + # for example Sentinal S2A, but also S2B. + # + # Example: + # 'acquisitionInformation': [{'acquisitionParameters': {'beginningDateTime': '2017-01-01T00:00:00Z', + # 'endingDateTime': '2023-09-30T23:59:59Z'}, + # 'instrument': {'instrumentShortName': 'MSI', + # 'sensorType': 'OPTICAL'}, + # 'platform': {'platformSerialIdentifier': 'S2A', + # 'platformShortName': 'SENTINEL-2'}}, + # {'acquisitionParameters': {'beginningDateTime': '2017-03-07T01:49:00Z', + # 'endingDateTime': '2023-09-30T23:59:59Z'}, + # 'instrument': {'instrumentShortName': 'MSI', + # 'sensorType': 'OPTICAL'}, + # 'platform': {'platformSerialIdentifier': 'S2B', + # 'platformShortName': 'SENTINEL-2'}}], + dt_start = None + dt_end = None for info in acquisitionInformation: print(info.get("acquisitionParameters", {})) - dt_start = info.get("acquisitionParameters", {}).get("beginningDateTime") - dt_end = info.get("acquisitionParameters", {}).get("endingDateTime") + new_dt_start = info.get("acquisitionParameters", {}).get("beginningDateTime") + new_dt_end = info.get("acquisitionParameters", {}).get("endingDateTime") + + print(new_dt_start, new_dt_start) + print(dt.datetime.fromisoformat(new_dt_start)) + print(dt.datetime.fromisoformat(new_dt_start)) + + new_dt_start = dt.datetime.fromisoformat(new_dt_start) + new_dt_end = dt.datetime.fromisoformat(new_dt_end) - print(dt_start, dt_end) - print(dt.datetime.fromisoformat(dt_start)) - print(dt.datetime.fromisoformat(dt_end)) + if not dt_start: + dt_start = new_dt_start + elif new_dt_start < dt_start: + dt_start = new_dt_start - dt_start = dt.datetime.fromisoformat(dt_start) - dt_end = dt.datetime.fromisoformat(dt_end) + if not dt_end: + dt_end = new_dt_end + elif dt_end < new_dt_end: + dt_end = new_dt_end return dt_start, dt_end @@ -265,9 +295,10 @@ def collect(self): self._df_products = self.get_products_as_dataframe() if self.temp_dir: - self.geodataframe_path = self.temp_dir / f"{self.collection_id}.parquet" if not self.temp_dir.exists(): self.temp_dir.mkdir(parents=True) + + self.geodataframe_path = self.temp_dir / f"{self.collection_id}.parquet" self._df_products.to_parquet(path=self.geodataframe_path, index=True) _logger.info("PROGRESS: converting GeoDataFrame to list of AssetMetadata objects") @@ -353,21 +384,8 @@ def get_products_as_dataframe(self) -> gpd.GeoDataFrame: if self._max_products > 0 and num_products_processed > self._max_products: break - # print("-" * 50) - # print(product.id) - # print(product.title) - # print("product properties:") - # pprint(product.properties) - # print("... end properties ...") - asset_metadata = self.create_asset_metadata(product) assets_md.append(asset_metadata) - # pprint(asset_metadata.to_dict()) - - # asset_bbox: BoundingBox = asset_metadata.bbox_lat_lon - # print(f"{asset_bbox.as_polygon()=}") - # print(f"{product.geometry}") - # print("-" * 50) # The extra break statements are needed so we don't end up with # an empty dataframe here, which is something we cannot process. @@ -393,10 +411,20 @@ def get_products_as_dataframe(self) -> gpd.GeoDataFrame: def _convert_to_asset_metadata(df: pd.DataFrame) -> List[AssetMetadata]: """Convert the pandas dataframe to a list of AssetMetadata objects.""" md_list = [] - for i in range(len(df)): + + # Log some progress every 1000 records. Without this output it is hard to see what is happening. + progress_chunk_size = 1000 + num_products = len(df) + for i in range(num_products): + if i % progress_chunk_size == 0: + _logger.info(f"PROGRESS: {i} of {num_products} converted to AssetMetadata") + record = df.iloc[i, :] metadata = AssetMetadata.from_geoseries(record) md_list.append(metadata) + + _logger.info(f"PROGRESS: {i+1} of {num_products} converted to AssetMetadata") + return md_list def create_asset_metadata(self, product: tcc.Product) -> AssetMetadata: