Skip to content

Commit

Permalink
Issue #16 Implement bulk insert of STAC items to STAC API.
Browse files Browse the repository at this point in the history
  • Loading branch information
JohanKJSchreurs committed Mar 6, 2024
1 parent 24562f8 commit c994a7d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 21 deletions.
41 changes: 23 additions & 18 deletions stacbuilder/stacapi/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,21 +191,11 @@ def _add_authentication_section(self, collection: Collection) -> dict:
class ItemsEndpoint:
def __init__(self, rest_api: RestApi) -> None:
self._rest_api: RestApi = rest_api
# self._stac_api_url = URL(stac_api_url)
# self._auth = auth or None

@staticmethod
def create_endpoint(stac_api_url: URL, auth: AuthBase | None) -> "ItemsEndpoint":
return ItemsEndpoint(rest_api=RestApi(base_url=stac_api_url, auth=auth))

# @property
# def stac_api_url(self) -> URL:
# return self._stac_api_url

# @property
# def rest_api(self) -> RestApi:
# return self._rest_api

@property
def stac_api_url(self) -> URL:
return self._rest_api.base_url
Expand Down Expand Up @@ -281,25 +271,36 @@ def exists(self, item: Item) -> bool:

def create(self, item: Item) -> dict:
item.validate()

response = self._rest_api.post(self.get_items_url(item.collection_id), json=item.to_dict())
_logger.info(f"HTTP response: {response.status_code} - {response.reason}: body: {response.json()}")
print(f"HTTP response: {response.status_code} - {response.reason}: body: {response.json()}")

_check_response_status(response, _EXPECTED_STATUS_POST)
return response.json()

def update(self, item: Item) -> dict:
item.validate()

response = self._rest_api.put(self.get_items_url_for_id(item.collection_id, item.id), json=item.to_dict())
_logger.info(f"HTTP response: {response.status_code} - {response.reason}: body: {response.json()}")
print(f"HTTP response: {response.status_code} - {response.reason}: body: {response.json()}")

_check_response_status(response, _EXPECTED_STATUS_PUT)
return response.json()

def ingest_bulk(self, items: Iterable[Item]) -> dict:
collection_id = items[0].collection_id
if not all(i.collection_id == collection_id for i in items):
raise Exception("All collection IDs should be identical for bulk ingests")

url_path = str(self._stac_api_url / "collections" / str(collection_id) / "bulk_items")
data = {"items": {item.id: item.to_dict() for item in items}}
url_path = f"collections/{collection_id}/bulk_items"
data = {"method": "upsert", "items": {item.id: item.to_dict() for item in items}}
response = self._rest_api.post(url_path, json=data)
_check_response_status(response, _EXPECTED_STATUS_POST)
return response.json()
_logger.info(f"HTTP response: {response.status_code} - {response.reason}: body: {response.json()}")
print(f"HTTP response: {response.status_code} - {response.reason}: body: {response.json()}")

def update(self, item: Item) -> dict:
item.validate()
response = self._rest_api.put(self.get_items_url_for_id(item.collection_id, item.id), json=item.to_dict())
_check_response_status(response, _EXPECTED_STATUS_PUT)
_check_response_status(response, _EXPECTED_STATUS_POST)
return response.json()

def create_or_update(self, item: Item) -> dict:
Expand All @@ -318,7 +319,11 @@ def delete_by_id(self, collection_id: str, item_id: str) -> dict:
raise InvalidOperation(
f"item_id must have a non-empty str value. Actual type and value: {type(item_id)=}, {item_id=!r}"
)

response = self._rest_api.delete(self.get_items_url_for_id(collection_id, item_id))
_logger.info(f"HTTP response: {response.status_code} - {response.reason}: body: {response.json()}")
print(f"HTTP response: {response.status_code} - {response.reason}: body: {response.json()}")

_check_response_status(response, _EXPECTED_STATUS_DELETE)
return response.json()

Expand Down
24 changes: 21 additions & 3 deletions stacbuilder/stacapi/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@


class Uploader:
def __init__(self, collections_ep: CollectionsEndpoint, items_ep: ItemsEndpoint) -> None:
def __init__(self, collections_ep: CollectionsEndpoint, items_ep: ItemsEndpoint, bulk_size: int = 20) -> None:
self._collections_endpoint = collections_ep
self._items_endpoint = items_ep
self._bulk_size = bulk_size

@classmethod
def from_settings(cls, settings: Settings) -> "Uploader":
Expand All @@ -38,9 +39,17 @@ def create_uploader(
rest_api=rest_api,
collection_auth_info=collection_auth_info,
)
items_endpoint = ItemsEndpoint(stac_api_url=stac_api_url, auth=auth)
items_endpoint = ItemsEndpoint(rest_api)
return Uploader(collections_ep=collections_endpoint, items_ep=items_endpoint)

@property
def bulk_size(self) -> int:
return self._bulk_size

@bulk_size.setter
def bulk_size(self, value: int) -> int:
self._bulk_size = int(value)

def upload_collection(self, collection: Path | Collection) -> dict:
if isinstance(collection, Path):
collection = Collection.from_file(str(collection))
Expand All @@ -56,10 +65,19 @@ def upload_item(self, item) -> dict:
return self._items_endpoint.create_or_update(item)

def upload_items_bulk(self, collection_id: str, items: Iterable[Item]) -> None:
chunk = []
for item in items:
self._prepare_item(item, collection_id)
item.validate()
self.upload_item(item)

chunk.append(item)

if len(chunk) == self.bulk_size:
self._items_endpoint.ingest_bulk(chunk)
chunk = []

if chunk:
self._items_endpoint.ingest_bulk(chunk)

def upload_collection_and_items(
self, collection: Path | Collection, items: Path | list[Item], max_items: int = -1
Expand Down

0 comments on commit c994a7d

Please sign in to comment.