Skip to content

Commit

Permalink
Issue #16 Improve progress messages, and working on fix for large slo…
Browse files Browse the repository at this point in the history
…w collections: saving 10k to 100k STAC items without linking to collection
  • Loading branch information
JohanKJSchreurs committed Feb 27, 2024
1 parent f1581c1 commit 6fa75b2
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 36 deletions.
2 changes: 2 additions & 0 deletions conda-environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies:
- pyproj=3.6.*
- pystac[validation]=1.8.*
- rasterio=1.3.*
- requests==2.31.*
- rio-stac=0.8.*
- shapely=2.0.*
- stac-validator=3.3.*
Expand All @@ -22,3 +23,4 @@ dependencies:
- pip:
- --extra-index-url https://artifactory.vgt.vito.be/artifactory/api/pypi/python-packages/simple
- terracatalogueclient==0.1.14
- requests_auth==7.0.*
4 changes: 1 addition & 3 deletions configs-datasets/HRLVPP/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
STAC_wip/*
STAC_publish/*
openeo-test-out/*
*
19 changes: 14 additions & 5 deletions stacbuilder/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,23 @@ def vpp_upload_to_stac_api(collection_path: str):


@cli.command
def vpp_show_collection_configs():
@click.option("-o", "--output-file", type=click.File(mode="w", encoding="utf8"))
@click.argument("collection_id", type=click.STRING)
def vpp_get_collection_config(collection_id: str, output_file):
"""Display the CollectionConfig for the specified collection in HRL VPP."""
coll_cfg: CollectionConfig = commandapi.vpp_get_collection_config(collection_id)
pprint.pprint(coll_cfg.model_dump())

if output_file:
output_file.write(coll_cfg.model_dump_json(indent=2))


@cli.command
def vpp_show_all_collection_configs():
"""Display the CollectionConfig for each of the collections in HRL VPP."""

coll_cfg: CollectionConfig
for coll_cfg in commandapi.vpp_get_collection_configs():
for coll_cfg in commandapi.vpp_get_all_collection_configs():
pprint.pprint(coll_cfg.model_dump())
print()

Expand All @@ -446,9 +458,6 @@ def vpp_list_tcc_collections(properties):
if properties:
pprint.pprint(coll.properties)

# num_prods = catalogue.get_product_count(collection.id)
# pprint(f"product count for coll_id {collection.id}: {num_prods}")


@cli.command
def vpp_count_products():
Expand Down
78 changes: 73 additions & 5 deletions stacbuilder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# Standard libraries
import datetime as dt
from http.client import RemoteDisconnected
import inspect
import json
import logging
import shutil
Expand Down Expand Up @@ -442,17 +443,23 @@ def build_collection(
self,
stac_items: Iterable[Item],
group: Optional[str | int] = None,
link_items: bool = True,
) -> None:
"""Create and save the STAC collection."""
self.reset()
self._stac_items = list(stac_items) or []
self.create_empty_collection(group=group)
self.add_items_to_collection()
if link_items:
self.add_items_to_collection()
else:
self.save_items_outside_collection()

def add_items_to_collection(
self,
):
"""Fills the collection with stac items."""
self._log_progress_message("START: add_items_to_collection")

if self._collection is None:
raise InvalidOperation("Can not add items to a collection that has not been created yet.")

Expand All @@ -464,8 +471,40 @@ def add_items_to_collection(
continue
self._collection.add_item(item)

self._log_progress_message("updating collection extent")
self._collection.update_extent_from_items()

self._log_progress_message("DONE: add_items_to_collection")

def save_items_outside_collection(
self,
):
"""Fills the collection with stac items."""
self._log_progress_message("START: save_items_outside_collection")

if self._collection is None:
raise InvalidOperation("Can not add items to a collection that has not been created yet.")

items = [i for i in self._stac_items if i is not None]
item: Item
for item in items:
for asset in item.assets:
asset.owner = self._collection

stac_item_dir = self.output_dir / self.collection.id
if not self.stac_item_dir.exists():
self.stac_item_dir.mkdir(parents=True)

from pystac import ItemCollection

item_collection = ItemCollection(items)
item_collection.save_object(dest_href=stac_item_dir)

self._log_progress_message("updating collection extent")
self._collection.extent = Extent.from_items(items)

self._log_progress_message("DONE: save_items_outside_collection")

def normalize_hrefs(self, skip_unresolved: bool = False):
layout_template = self._collection_config.layout_strategy_item_template
strategy = TemplateLayoutStrategy(item_template=layout_template)
Expand All @@ -477,6 +516,7 @@ def normalize_hrefs(self, skip_unresolved: bool = False):

def validate_collection(self, collection: Collection):
"""Run STAC validation on the collection."""
self._log_progress_message("START: validate_collection")
try:
num_items_validated = collection.validate_all(recursive=True)
except STACValidationError as exc:
Expand All @@ -487,9 +527,11 @@ def validate_collection(self, collection: Collection):
else:
print(f"Collection valid: number of items validated: {num_items_validated}")

self._log_progress_message("DONE: validate_collection")

def save_collection(self) -> None:
"""Save the STAC collection to file."""
_logger.info("Saving files ...")
self._log_progress_message("START: Saving files ...")

if not self.output_dir.exists():
self.output_dir.mkdir(parents=True)
Expand All @@ -499,13 +541,16 @@ def save_collection(self) -> None:
# The href links to asset files also have the be relative (to the location of the STAC item)
# This needs to be done via the href_modifier
self._collection.save(catalog_type=CatalogType.SELF_CONTAINED)
self._log_progress_message("DONE: Saving files ...")

@property
def providers(self):
return [p.to_provider() for p in self._collection_config.providers]

def create_empty_collection(self, group: Optional[str | int] = None) -> None:
"""Creates a STAC Collection with no STAC items."""
self._log_progress_message("START: create_empty_collection")

coll_config: CollectionConfig = self._collection_config

if group:
Expand Down Expand Up @@ -544,6 +589,7 @@ def create_empty_collection(self, group: Optional[str | int] = None) -> None:
## )

self._collection = collection
self._log_progress_message("DONE: create_empty_collection")

def get_default_extent(self):
end_dt = dt.datetime.utcnow()
Expand Down Expand Up @@ -577,6 +623,10 @@ def _get_item_assets_definitions(self) -> List[AssetDefinition]:

return asset_definitions

def _log_progress_message(self, message: str) -> None:
calling_method_name = inspect.stack()[2][3]
_logger.info(f"PROGRESS: {self.__class__.__name__}.{calling_method_name}: {message}")


class PostProcessSTACCollectionFile:
"""Takes an existing STAC collection file and runs optional postprocessing steps.
Expand Down Expand Up @@ -720,7 +770,7 @@ def __init__(
# Settings: these are just data, not components we delegate work to.
self._output_base_dir: Path = self._get_output_dir_or_default(output_dir)
self._collection_dir: Path = None
self._overwrite: bool = overwrite
self._overwrite: bool = bool(overwrite)

# Components / dependencies that must be provided
self._metadata_collector: IMetadataCollector = metadata_collector
Expand Down Expand Up @@ -870,6 +920,7 @@ def group_stac_items_by(self) -> Dict[int, List[Item]]:

def collect_stac_items(self):
"""Generate the intermediate STAC Item objects."""
self._log_progress_message("START: collect_stac_items")

groups = self.group_metadata_by_item_id(self.get_metadata())
for assets in groups.values():
Expand All @@ -883,6 +934,8 @@ def collect_stac_items(self):
print(f"Skipped validation of {stac_item.get_self_href()} due to RemoteDisconnected.")
yield stac_item

self._log_progress_message("DONE: collect_stac_items")

# TODO: [simplify] [refactor] Merge this into collect_stac_items once it works well and it has tests.
@staticmethod
def group_metadata_by_item_id(iter_metadata) -> Dict[int, List[Item]]:
Expand Down Expand Up @@ -914,11 +967,16 @@ def get_stac_items_as_dataframe(self) -> pd.DataFrame:
"""Return a pandas DataFrame representing the STAC Items, without the geometry."""
return GeodataframeExporter.stac_items_to_dataframe(list(self.collect_stac_items()))

def build_collection(self):
def build_collection(
self,
link_items: Optional[bool] = True,
):
"""Build the entire STAC collection."""
self._log_progress_message("START: build_collection")

self.reset()

self._collection_builder.build_collection(self.collect_stac_items())
self._collection_builder.build_collection(self.collect_stac_items(), link_items=link_items)
self._collection_builder.normalize_hrefs()
self._collection_builder.save_collection()
self._collection = self._collection_builder.collection
Expand All @@ -927,10 +985,14 @@ def build_collection(self):
post_processor = PostProcessSTACCollectionFile(collection_overrides=self._collection_config.overrides)
post_processor.process_collection(coll_file)

self._log_progress_message("DONE: build_collection")

def get_collection_file_for_group(self, group: str | int):
return self._output_base_dir / str(group)

def build_grouped_collections(self):
self._log_progress_message("START: build_grouped_collections")

self.reset()

if not self.uses_collection_groups:
Expand Down Expand Up @@ -961,6 +1023,12 @@ def build_grouped_collections(self):
coll_file = Path(self._collection_groups[group].self_href)
post_processor.process_collection(coll_file)

self._log_progress_message("DONE: build_grouped_collections")

def _log_progress_message(self, message: str) -> None:
calling_method_name = inspect.stack()[2][3]
_logger.info(f"PROGRESS: {self.__class__.__name__}.{calling_method_name}: {message}")


class GeoTiffPipeline:
"""A pipeline to generate a STAC collection from a directory containing GeoTIFF files.
Expand Down
17 changes: 16 additions & 1 deletion stacbuilder/commandapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,10 @@ def vpp_build_all_collections(
pipeline.build_collection()


# TODO: remove _get_tcc_collection_id, makes the commands too complicated and we have a better solution now.
# Now that we can list the HRL VPP collection names we can copy paste from that output.
def _get_tcc_collection_id(collection_id: Optional[str], collection_number: Optional[int]) -> str:
"""DEPRECATED Helper method to select the collection without dealing with long names"""
if not collection_id and not collection_number:
raise ValueError(
"No collection was specified. "
Expand Down Expand Up @@ -444,7 +447,19 @@ def vpp_count_products() -> list[tcc.Collection]:
return {c.id: catalogue.get_product_count(c.id) for c in collections}


def vpp_get_collection_configs() -> list[CollectionConfig]:
def vpp_get_collection_config(collection_id: str) -> list[CollectionConfig]:
"""Display the CollectionConfig for each of the collections in HRL VPP."""
if not collection_id:
raise ValueError(f'Argument "collection_id" must have a value. {collection_id=!r}, {type(collection_id)=}')
collector = HRLVPPMetadataCollector()

collector.collection_id = collection_id
tcc_coll = collector.get_tcc_collection()
conf_builder = CollectionConfigBuilder(tcc_coll)
return conf_builder.get_collection_config()


def vpp_get_all_collection_configs() -> list[CollectionConfig]:
"""Display the CollectionConfig for each of the collections in HRL VPP."""
collector = HRLVPPMetadataCollector()

Expand Down
1 change: 1 addition & 0 deletions stacbuilder/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ def save_geodataframe(gdf: gpd.GeoDataFrame, out_dir: Path, table_name: str) ->
print(f"Saving pipe-separated CSV file to: {csv_path}")
gdf.to_csv(csv_path, sep="|")

# TODO: Shapefile has too many problems with unsupported column types. Going to remove it (but in a separate branch/PR).
print(f"Saving shapefile to: {shapefile_path }")
gdf.to_file(shapefile_path)

Expand Down
Loading

0 comments on commit 6fa75b2

Please sign in to comment.