Skip to content

Commit

Permalink
Issue #16 Add offset and limit to make uploading large collections to…
Browse files Browse the repository at this point in the history
… STAC-API more manageable
  • Loading branch information
JohanKJSchreurs committed Mar 6, 2024
1 parent 8536413 commit 4b0db05
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 16 deletions.
14 changes: 8 additions & 6 deletions stacbuilder/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,21 +437,23 @@ def vpp_list_items(collection: str, max_products: int, frequency: str):


@cli.command
@click.option("-m", "--max-items", help="Limit number of items to upload to max-items.", default=-1)
@click.option("-l", "--limit", help="Limit number of items to upload to max-items.", default=-1)
@click.option("-o", "--offset", help="Start at items number 'offset' (count starting from 1).", default=-1)
@click.argument("collection_path")
def vpp_upload(collection_path: str, max_items: int):
def vpp_upload(collection_path: str, limit: int, offset: int):
"""Upload a collection to the STAC API."""
settings = get_stac_api_settings()
commandapi.upload_to_stac_api(Path(collection_path), settings=settings, max_items=max_items)
commandapi.upload_to_stac_api(Path(collection_path), settings=settings, limit=limit, offset=offset)


@cli.command
@click.option("-m", "--max-items", help="Limit number of items to upload to max-items.", default=-1)
@click.option("-l", "--limit", help="Limit number of items to upload to max-items.", default=-1)
@click.option("-o", "--offset", help="Start at items number 'offset' (count starting from 1).", default=-1)
@click.argument("collection_path")
def vpp_upload_items(collection_path: str, max_items: int):
def vpp_upload_items(collection_path: str, limit: int, offset: int):
"""Upload a collection to the STAC API."""
settings = get_stac_api_settings()
commandapi.upload_items_to_stac_api(Path(collection_path), settings=settings, max_items=max_items)
commandapi.upload_items_to_stac_api(Path(collection_path), settings=settings, limit=limit, offset=offset)


@cli.command
Expand Down
8 changes: 4 additions & 4 deletions stacbuilder/commandapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,24 +389,24 @@ def _check_tcc_collection_id(collection_id: Optional[str]) -> str:
return collection_id


def upload_to_stac_api(collection_path: Path, settings: Settings, max_items: int = -1) -> None:
def upload_to_stac_api(collection_path: Path, settings: Settings, limit: int = -1, offset: int = -1) -> None:
"""Upload a collection to the STAC API."""
if not isinstance(collection_path, Path):
collection_path = Path(collection_path)
collection_path = collection_path.expanduser().absolute()

uploader = Uploader.from_settings(settings)
uploader.upload_collection_and_items(collection_path, items=collection_path.parent, max_items=max_items)
uploader.upload_collection_and_items(collection_path, items=collection_path.parent, limit=limit, offset=offset)


def upload_items_to_stac_api(collection_path: Path, settings: Settings, max_items: int = -1) -> None:
def upload_items_to_stac_api(collection_path: Path, settings: Settings, limit: int = -1, offset: int = -1) -> None:
"""Upload a collection to the STAC API."""
if not isinstance(collection_path, Path):
collection_path = Path(collection_path)
collection_path = collection_path.expanduser().absolute()

uploader = Uploader.from_settings(settings)
uploader.upload_items(collection_path, items=collection_path.parent, max_items=max_items)
uploader.upload_items(collection_path, items=collection_path.parent, limit=limit, offset=offset)


def vpp_get_tcc_collections() -> list[tcc.Collection]:
Expand Down
135 changes: 135 additions & 0 deletions stacbuilder/stacapi/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,38 +48,119 @@ def _check_response_status(response: requests.Response, expected_status_codes: l


class RestApi:
"""Helper class to execute the typical HTTP requests for a REST API
Delegates the authentication in a consistent way with less code duplication.
"""

def __init__(self, base_url: URL | str, auth: AuthBase | None = None) -> None:
"""Constructor
:param base_url: the base URL of the API
i.e. the part to which we concatentate URL paths.
For example https//stac-api.my-organisation.com/api/
:param auth: if present (= not None), this object takes care of authentication
this is the same as the auth parameter in ` requests.request(method, url, **kwargs)`
from the requests library.
See also: https://requests.readthedocs.io/en/latest/api/#requests.request
"""
self.base_url = URL(base_url)
self.auth = auth or None

def join_path(self, *url_path: list[str]) -> str:
"""Create a full URL path out of a list of strings.
:param url_path:
A string or a list of strings to join into a URL path.
:return:
The URL path, i.e. joining individual path parts joined with '/'
To get the full URL (as a URL object) use join_url instead.
"""
return "/".join(url_path)

def join_url(self, url_path: str | list[str]) -> str:
"""Create a URL from the base_url and the url_path.
:param url_path: same as in join_path
:return: a URL object that represents the full URL.
"""
return str(self.base_url / self.join_path(url_path))

def get(self, url_path: str | list[str], *args, **kwargs) -> requests.Response:
"""Execute an HTTP GET request.
Authentication will be delegated to the AuthBase object self.auth, if it has a value,
as per the requests library.
If self.auth is None then no authentication done.
:param url_path: path or path parts to build the full URL.
:return: the HTTP response.
"""
return requests.get(self.join_url(url_path), auth=self.auth, *args, **kwargs)

def post(self, url_path: str | list[str], *args, **kwargs) -> requests.Response:
"""Execute an HTTP POST request.
Authentication will be delegated to the AuthBase object self.auth, if it has a value,
as per the requests library.
If self.auth is None then no authentication done.
:param url_path: path or path parts to build the full URL.
:return: the HTTP response.
"""
return requests.post(self.join_url(url_path), auth=self.auth, *args, **kwargs)

def put(self, url_path: str | list[str], *args, **kwargs) -> requests.Response:
"""Execute an HTTP PUT request.
Authentication will be delegated to the AuthBase object self.auth, if it has a value,
as per the requests library.
If self.auth is None then no authentication done.
:param url_path: path or path parts to build the full URL.
:return: the HTTP response.
"""
return requests.put(self.join_url(url_path), auth=self.auth, *args, **kwargs)

def delete(self, url_path: str | list[str], *args, **kwargs) -> requests.Response:
"""Execute an HTTP DELETE request.
Authentication will be delegated to the AuthBase object self.auth, if it has a value,
as per the requests library.
If self.auth is None then no authentication done.
:param url_path: path or path parts to build the full URL.
:return: the HTTP response.
"""
return requests.delete(self.join_url(url_path), auth=self.auth, *args, **kwargs)


class CollectionsEndpoint:
def __init__(self, rest_api: RestApi, collection_auth_info: dict | None = None) -> None:
"""Constructor.
Follows dependency injection so you have to provide the objects it needs
(or mock implementation in a test) See parameters below.
:param rest_api: the RestApi to delegate HTTP requests to.
:param collection_auth_info: a dictionary that describe who can read or write the collection after creation.
This dictionary is added to the collection's dictionary in the POST/POT request's body.
"""
self._rest_api = rest_api
self._collection_auth_info: dict | None = collection_auth_info or None

@staticmethod
def create_endpoint(
stac_api_url: URL, auth: AuthBase | None, collection_auth_info: dict | None = None
) -> "CollectionsEndpoint":
"""Convenience method to create a CollectionsEndpoint object from basic information.
This creates the dependencies for you, but that also means you can't pick another implementation here.
If you need that (in a test) you should construct those objects yourself, and pass them directly to the constructor.
"""
rest_api = RestApi(base_url=stac_api_url, auth=auth)
return CollectionsEndpoint(
rest_api=rest_api,
Expand All @@ -88,9 +169,14 @@ def create_endpoint(

@property
def stac_api_url(self) -> URL:
"""The base URL for the STAC API."""
return self._rest_api.base_url

def get_all(self) -> list[Collection]:
"""Get all collections.
TODO: Implement paging: If there are many collections then the API will likely limit the number or collections returns, via paging.
"""
response = self._rest_api.get("collections")

_check_response_status(response, _EXPECTED_STATUS_GET)
Expand All @@ -101,6 +187,14 @@ def get_all(self) -> list[Collection]:
return [Collection.from_dict(j) for j in data.get("collections", [])]

def get(self, collection_id: str) -> Collection:
"""Get the collection with ID collection_id.
:param collection_id: the collection ID to look for.
:raises TypeError: when collection_id is not type str (string).
:raises ValueError: when collection_id is an empty string.
:raises HTTP: when the HTTP response status is 404 or any other error status.
:return: a Collection object if it was found
"""
if not isinstance(collection_id, str):
raise TypeError(f'Argument "collection_id" must be of type str, but its type is {type(collection_id)=}')

Expand All @@ -115,6 +209,14 @@ def get(self, collection_id: str) -> Collection:
return Collection.from_dict(response.json())

def exists(self, collection_id: str) -> bool:
"""Query if a collection with ID collection_id exists.
:param collection_id: the collection ID to look for.
:raises TypeError: when collection_id is not type str (string).
:raises ValueError: when collection_id is an empty string.
:raises HTTP: when the HTTP response status any error status other than "404 Not found".
:return: True if found, false if not fount.
"""
if not isinstance(collection_id, str):
raise TypeError(f'Argument "collection_id" must be of type str, but its type is {type(collection_id)=}')

Expand All @@ -133,6 +235,13 @@ def exists(self, collection_id: str) -> bool:
return True

def create(self, collection: Collection) -> dict:
"""Create a new collection.
:param collection: pystac.Collection object to create in the STAC API backend (or upload if you will)
:raises TypeError: if collection is not a pystac.Collection.
:return: dict that contains the JSON body of the HTTP response.
"""

if not isinstance(collection, Collection):
raise TypeError(
f'Argument "collection" must be of type pystac.Collection, but its type is {type(collection)=}'
Expand All @@ -146,6 +255,13 @@ def create(self, collection: Collection) -> dict:
return response.json()

def update(self, collection: Collection) -> dict:
"""Update an existing collection.
:param collection: pystac.Collection object to update in the STAC API backend.
:raises TypeError: if collection is not a pystac.Collection.
:return: dict that contains the JSON body of the HTTP response.
"""

if not isinstance(collection, Collection):
raise TypeError(
f'Argument "collection" must be of type pystac.Collection, but its type is {type(collection)=}'
Expand All @@ -159,16 +275,35 @@ def update(self, collection: Collection) -> dict:
return response.json()

def create_or_update(self, collection: Collection) -> dict:
"""'Upsert': Create the collection if it does not exist, or update it if it exists.
:param collection: the collection to create/update
:return: dict that contains the JSON body of the HTTP response.
"""

# TODO: decide: Another strategy could be to handle HTTP 409 conflict and the fall back to a self.update / PUT request
if self.exists(collection.id):
return self.update(collection)
else:
return self.create(collection)

def delete(self, collection: Collection) -> dict:
"""Delete this collection.
:param collection: pystac.Collection object to delete from the STAC API backend.
:raises TypeError: if collection is not a pystac.Collection.
:return: dict that contains the JSON body of the HTTP response.
"""
return self.delete_by_id(collection.id)

def delete_by_id(self, collection_id: str) -> dict:
"""Delete the collection that has the specified ID.
:param collection_id: the collection ID to look for.
:raises TypeError: when collection_id is not a string.
:raises ValueError: when collection_id is an empty string.
:return: dict that contains the JSON body of the HTTP response.
"""
if not isinstance(collection_id, str):
raise TypeError(f'Argument "collection_id" must be of type str, but its type is {type(collection_id)=}')

Expand Down
32 changes: 26 additions & 6 deletions stacbuilder/stacapi/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,20 @@ def upload_items_bulk(self, collection_id: str, items: Iterable[Item]) -> None:
self._items_endpoint.ingest_bulk(chunk)

def upload_collection_and_items(
self, collection: Path | Collection, items: Path | list[Item], max_items: int = -1
self,
collection: Path | Collection,
items: Path | list[Item],
limit: int = -1,
offset: int = -1,
) -> None:
collection_out = self.upload_collection(collection)
_logger.info(f"Uploaded collections, result={collection_out}")

self.upload_items(collection, items, max_items)
self.upload_items(collection, items, limit=limit, offset=offset)

def upload_items(self, collection: Path | Collection, items: Path | list[Item], max_items: int = -1) -> None:
def upload_items(
self, collection: Path | Collection, items: Path | list[Item], limit: int = -1, offset: int = -1
) -> None:
if isinstance(collection, Path):
collection = Collection.from_file(collection)

Expand All @@ -121,12 +127,26 @@ def upload_items(self, collection: Path | Collection, items: Path | list[Item],
_logger.info(f"Number of STAC item files found: {len(item_paths)}")
items_out = (Item.from_file(path) for path in item_paths)

if max_items >= 0:
_logger.info(f"User requested to limit the number of items to {max_items=}")
items_out = itertools.islice(items_out, max_items)
start = None
stop = None
if offset > 0:
start = offset
_logger.info(f"User requested to start item upload at offset {offset=}")

if limit > 0:
_logger.info(f"User requested to limit the number of items to {limit=}")
if offset > 0:
stop = offset + limit
else:
stop = limit

self._log_progress_message(f"START upload of items from {start=} to {stop=}. ({offset=}, {limit=})")

items_out = itertools.islice(items_out, start, stop)
self.upload_items_bulk(collection.id, items_out)

self._log_progress_message(f"DONE upload of items from {start=} to {stop=}. ({offset=}, {limit=})")

def _prepare_item(self, item: Item, collection_id: str):
item.collection_id = collection_id

Expand Down

0 comments on commit 4b0db05

Please sign in to comment.