diff --git a/stacbuilder/stacapi/endpoints.py b/stacbuilder/stacapi/endpoints.py index d807f4c..68699b8 100644 --- a/stacbuilder/stacapi/endpoints.py +++ b/stacbuilder/stacapi/endpoints.py @@ -191,21 +191,11 @@ def _add_authentication_section(self, collection: Collection) -> dict: class ItemsEndpoint: def __init__(self, rest_api: RestApi) -> None: self._rest_api: RestApi = rest_api - # self._stac_api_url = URL(stac_api_url) - # self._auth = auth or None @staticmethod def create_endpoint(stac_api_url: URL, auth: AuthBase | None) -> "ItemsEndpoint": return ItemsEndpoint(rest_api=RestApi(base_url=stac_api_url, auth=auth)) - # @property - # def stac_api_url(self) -> URL: - # return self._stac_api_url - - # @property - # def rest_api(self) -> RestApi: - # return self._rest_api - @property def stac_api_url(self) -> URL: return self._rest_api.base_url @@ -281,25 +271,36 @@ def exists(self, item: Item) -> bool: def create(self, item: Item) -> dict: item.validate() + response = self._rest_api.post(self.get_items_url(item.collection_id), json=item.to_dict()) + _logger.info(f"HTTP response: {response.status_code} - {response.reason}: body: {response.json()}") + print(f"HTTP response: {response.status_code} - {response.reason}: body: {response.json()}") + _check_response_status(response, _EXPECTED_STATUS_POST) return response.json() + def update(self, item: Item) -> dict: + item.validate() + + response = self._rest_api.put(self.get_items_url_for_id(item.collection_id, item.id), json=item.to_dict()) + _logger.info(f"HTTP response: {response.status_code} - {response.reason}: body: {response.json()}") + print(f"HTTP response: {response.status_code} - {response.reason}: body: {response.json()}") + + _check_response_status(response, _EXPECTED_STATUS_PUT) + return response.json() + def ingest_bulk(self, items: Iterable[Item]) -> dict: collection_id = items[0].collection_id if not all(i.collection_id == collection_id for i in items): raise Exception("All collection IDs should be identical for bulk ingests") - url_path = str(self._stac_api_url / "collections" / str(collection_id) / "bulk_items") - data = {"items": {item.id: item.to_dict() for item in items}} + url_path = f"collections/{collection_id}/bulk_items" + data = {"method": "upsert", "items": {item.id: item.to_dict() for item in items}} response = self._rest_api.post(url_path, json=data) - _check_response_status(response, _EXPECTED_STATUS_POST) - return response.json() + _logger.info(f"HTTP response: {response.status_code} - {response.reason}: body: {response.json()}") + print(f"HTTP response: {response.status_code} - {response.reason}: body: {response.json()}") - def update(self, item: Item) -> dict: - item.validate() - response = self._rest_api.put(self.get_items_url_for_id(item.collection_id, item.id), json=item.to_dict()) - _check_response_status(response, _EXPECTED_STATUS_PUT) + _check_response_status(response, _EXPECTED_STATUS_POST) return response.json() def create_or_update(self, item: Item) -> dict: @@ -318,7 +319,11 @@ def delete_by_id(self, collection_id: str, item_id: str) -> dict: raise InvalidOperation( f"item_id must have a non-empty str value. Actual type and value: {type(item_id)=}, {item_id=!r}" ) + response = self._rest_api.delete(self.get_items_url_for_id(collection_id, item_id)) + _logger.info(f"HTTP response: {response.status_code} - {response.reason}: body: {response.json()}") + print(f"HTTP response: {response.status_code} - {response.reason}: body: {response.json()}") + _check_response_status(response, _EXPECTED_STATUS_DELETE) return response.json() diff --git a/stacbuilder/stacapi/upload.py b/stacbuilder/stacapi/upload.py index 0833120..b651f2e 100644 --- a/stacbuilder/stacapi/upload.py +++ b/stacbuilder/stacapi/upload.py @@ -18,9 +18,10 @@ class Uploader: - def __init__(self, collections_ep: CollectionsEndpoint, items_ep: ItemsEndpoint) -> None: + def __init__(self, collections_ep: CollectionsEndpoint, items_ep: ItemsEndpoint, bulk_size: int = 20) -> None: self._collections_endpoint = collections_ep self._items_endpoint = items_ep + self._bulk_size = bulk_size @classmethod def from_settings(cls, settings: Settings) -> "Uploader": @@ -38,9 +39,17 @@ def create_uploader( rest_api=rest_api, collection_auth_info=collection_auth_info, ) - items_endpoint = ItemsEndpoint(stac_api_url=stac_api_url, auth=auth) + items_endpoint = ItemsEndpoint(rest_api) return Uploader(collections_ep=collections_endpoint, items_ep=items_endpoint) + @property + def bulk_size(self) -> int: + return self._bulk_size + + @bulk_size.setter + def bulk_size(self, value: int) -> int: + self._bulk_size = int(value) + def upload_collection(self, collection: Path | Collection) -> dict: if isinstance(collection, Path): collection = Collection.from_file(str(collection)) @@ -56,10 +65,19 @@ def upload_item(self, item) -> dict: return self._items_endpoint.create_or_update(item) def upload_items_bulk(self, collection_id: str, items: Iterable[Item]) -> None: + chunk = [] for item in items: self._prepare_item(item, collection_id) item.validate() - self.upload_item(item) + + chunk.append(item) + + if len(chunk) == self.bulk_size: + self._items_endpoint.ingest_bulk(chunk) + chunk = [] + + if chunk: + self._items_endpoint.ingest_bulk(chunk) def upload_collection_and_items( self, collection: Path | Collection, items: Path | list[Item], max_items: int = -1