Skip to content

Commit

Permalink
PgSTAC: API hydration of search result items (#397)
Browse files Browse the repository at this point in the history
* Upgrade to pgstac 0.5.1

Initial changes to get most tests passing.

* Add option to hydrate pgstac search results in API

* Support fields extension in nohydrate mode

* Updates to hydrate and filter functionality.

This was done in a pairing session with @mmcfarland

* Fix fields extensions and reduce number of loops

* Tolerate missing required attributes with fields extension

Use of the fields extension can result in the return of invalid stac
items if excludes is used on required attributes. When injecting item
links, don't attempt to build links for which needed attributes aren't
available. When API Hydrate is enabled, the required attributes are
preserved prior to filtering and are used in the link generation.

* Run pgstac tests in db and api hydrate mode

* Merge dicts within lists during hydration

In practice, an asset on a base_item and an item may have mergable
dicts (ie, raster bands).

* Add note on settings in readme

* Pass request to base_item_cache

This will be used by implementors who need app state which is stored on
request.

* Upgrade pypgstac and use included hydrate function

The hydrate function was improved and moved to pypgstac so it could be
used in other projects outside of stac-fastapi. It was developed with a
corresponding dehydrate function to ensure parity between the two.

The version of pypgstac is unpublished and pinned to a draft commit at
the point and will be upgraded subsequently.

* Improve fields extension implementation

Correctly supports deeply nested property keys in both include and
exclude, as well as improves variable naming, comments, and test cases.

* Remove unused error type

* adjust tests for changes in api

* remove print statements

* add bbox back to items in tests

* Upgrade pgstac

* Fix conformance test fixtures

* Fix sqlalchemy test with new status for FK error

* Align fields ext behavior for invalid includes

* Lint

* Changelog

* Remove psycopg install dependency

* Relax dependency version of pgstac to 0.6.* series

* Update dev environment to pgstac 0.6.2

* Changelog fix

Co-authored-by: Rob Emanuele <[email protected]>
Co-authored-by: Ubuntu <planetarycomputer@pct-bitner-vm.kko0dpzi4g3udak2ovyb5nsdte.ax.internal.cloudapp.net>
  • Loading branch information
3 people authored May 14, 2022
1 parent 526501b commit 162a1a2
Show file tree
Hide file tree
Showing 16 changed files with 975 additions and 77 deletions.
4 changes: 3 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* Bulk Transactions object Items iterator now returns the Item objects rather than the string IDs of the Item objects
([#355](https://github.com/stac-utils/stac-fastapi/issues/355))
* docker-compose now runs uvicorn with hot-reloading enabled
* Bump version of PGStac to 0.6.2 that includes support for hydrating results in the API backed ([#397](https://github.com/stac-utils/stac-fastapi/pull/397))

### Removed

Expand All @@ -27,7 +28,8 @@
* Fixes issues (and adds tests) for issues caused by regression in pgstac ([#345](https://github.com/stac-utils/stac-fastapi/issues/345)
* Update error response payloads to match the API spec. ([#361](https://github.com/stac-utils/stac-fastapi/pull/361))
* Fixed stray `/` before the `#` in several extension conformance class strings ([383](https://github.com/stac-utils/stac-fastapi/pull/383))
* SQLAlchemy backend bulk item insert now works ([#356]https://github.com/stac-utils/stac-fastapi/issues/356))
* SQLAlchemy backend bulk item insert now works ([#356](https://github.com/stac-utils/stac-fastapi/issues/356))
* PGStac Backend has stricter implementation of Fields Extension syntax ([#397](https://github.com/stac-utils/stac-fastapi/pull/397))

## [2.3.0]

Expand Down
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ services:
- GDAL_DISABLE_READDIR_ON_OPEN=EMPTY_DIR
- DB_MIN_CONN_SIZE=1
- DB_MAX_CONN_SIZE=1
- USE_API_HYDRATE=${USE_API_HYDRATE:-false}
ports:
- "8082:8082"
volumes:
Expand All @@ -62,7 +63,7 @@ services:

database:
container_name: stac-db
image: ghcr.io/stac-utils/pgstac:v0.4.5
image: ghcr.io/stac-utils/pgstac:v0.6.2
environment:
- POSTGRES_USER=username
- POSTGRES_PASSWORD=password
Expand Down
2 changes: 1 addition & 1 deletion stac_fastapi/api/stac_fastapi/api/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
DEFAULT_STATUS_CODES = {
NotFoundError: status.HTTP_404_NOT_FOUND,
ConflictError: status.HTTP_409_CONFLICT,
ForeignKeyError: status.HTTP_422_UNPROCESSABLE_ENTITY,
ForeignKeyError: status.HTTP_424_FAILED_DEPENDENCY,
DatabaseError: status.HTTP_424_FAILED_DEPENDENCY,
Exception: status.HTTP_500_INTERNAL_SERVER_ERROR,
InvalidQueryParameter: status.HTTP_400_BAD_REQUEST,
Expand Down
5 changes: 5 additions & 0 deletions stac_fastapi/pgstac/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ pip install -e \
stac_fastapi/pgstac[dev,server]
```

### Settings

To configure PGStac stac-fastapi to [hydrate search result items in the API](https://github.com/stac-utils/pgstac#runtime-configurations), set the `USE_API_HYDRATE` environment variable to `true` or explicitly set the option in the PGStac Settings object.

### Migrations

PGStac is an external project and the may be used by multiple front ends.
For Stac FastAPI development, a docker image (which is pulled as part of the docker-compose) is available at
bitner/pgstac:[version] that has the full database already set up for PGStac.
Expand Down
3 changes: 2 additions & 1 deletion stac_fastapi/pgstac/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
"buildpg",
"brotli_asgi",
"pygeofilter @ git+https://github.com/geopython/[email protected]#egg=pygeofilter",
"pypgstac==0.6.*",
]

extra_reqs = {
"dev": [
"pypgstac[psycopg]==0.6.*",
"pytest",
"pytest-cov",
"pytest-asyncio>=0.17",
"pre-commit",
"requests",
"pypgstac==0.4.5",
"httpx",
],
"docs": ["mkdocs", "mkdocs-material", "pdocs"],
Expand Down
10 changes: 10 additions & 0 deletions stac_fastapi/pgstac/stac_fastapi/pgstac/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
"""Postgres API configuration."""

from typing import Type

from stac_fastapi.pgstac.types.base_item_cache import (
BaseItemCache,
DefaultBaseItemCache,
)
from stac_fastapi.types.config import ApiSettings


Expand All @@ -13,6 +19,7 @@ class Settings(ApiSettings):
postgres_host_writer: hostname for the writer connection.
postgres_port: database port.
postgres_dbname: database name.
use_api_hydrate: perform hydration of stac items within stac-fastapi.
"""

postgres_user: str
Expand All @@ -27,6 +34,9 @@ class Settings(ApiSettings):
db_max_queries: int = 50000
db_max_inactive_conn_lifetime: float = 300

use_api_hydrate: bool = False
base_item_cache: Type[BaseItemCache] = DefaultBaseItemCache

testing: bool = False

@property
Expand Down
104 changes: 87 additions & 17 deletions stac_fastapi/pgstac/stac_fastapi/pgstac/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
from pydantic import ValidationError
from pygeofilter.backends.cql2_json import to_cql2
from pygeofilter.parsers.cql2_text import parse as parse_cql2_text
from pypgstac.hydration import hydrate
from stac_pydantic.links import Relations
from stac_pydantic.shared import MimeTypes
from starlette.requests import Request

from stac_fastapi.pgstac.config import Settings
from stac_fastapi.pgstac.models.links import CollectionLinks, ItemLinks, PagingLinks
from stac_fastapi.pgstac.types.search import PgstacSearch
from stac_fastapi.pgstac.utils import filter_fields
from stac_fastapi.types.core import AsyncBaseCoreClient
from stac_fastapi.types.errors import InvalidQueryParameter, NotFoundError
from stac_fastapi.types.stac import Collection, Collections, Item, ItemCollection
Expand Down Expand Up @@ -103,8 +106,38 @@ async def get_collection(self, collection_id: str, **kwargs) -> Collection:

return Collection(**collection)

async def _get_base_item(
self, collection_id: str, request: Request
) -> Dict[str, Any]:
"""Get the base item of a collection for use in rehydrating full item collection properties.
Args:
collection: ID of the collection.
Returns:
Item.
"""
item: Optional[Dict[str, Any]]

pool = request.app.state.readpool
async with pool.acquire() as conn:
q, p = render(
"""
SELECT * FROM collection_base_item(:collection_id::text);
""",
collection_id=collection_id,
)
item = await conn.fetchval(q, *p)

if item is None:
raise NotFoundError(f"A base item for {collection_id} does not exist.")

return item

async def _search_base(
self, search_request: PgstacSearch, **kwargs: Any
self,
search_request: PgstacSearch,
**kwargs: Any,
) -> ItemCollection:
"""Cross catalog search (POST).
Expand All @@ -119,9 +152,11 @@ async def _search_base(
items: Dict[str, Any]

request: Request = kwargs["request"]
settings: Settings = request.app.state.settings
pool = request.app.state.readpool

# pool = kwargs["request"].app.state.readpool
search_request.conf = search_request.conf or {}
search_request.conf["nohydrate"] = settings.use_api_hydrate
req = search_request.json(exclude_none=True, by_alias=True)

try:
Expand All @@ -141,30 +176,65 @@ async def _search_base(
next: Optional[str] = items.pop("next", None)
prev: Optional[str] = items.pop("prev", None)
collection = ItemCollection(**items)
cleaned_features: List[Item] = []

for feature in collection.get("features") or []:
feature = Item(**feature)
exclude = search_request.fields.exclude
if exclude and len(exclude) == 0:
exclude = None
include = search_request.fields.include
if include and len(include) == 0:
include = None

async def _add_item_links(
feature: Item,
collection_id: Optional[str] = None,
item_id: Optional[str] = None,
) -> None:
"""Add ItemLinks to the Item.
If the fields extension is excluding links, then don't add them.
Also skip links if the item doesn't provide collection and item ids.
"""
collection_id = feature.get("collection") or collection_id
item_id = feature.get("id") or item_id

if (
search_request.fields.exclude is None
or "links" not in search_request.fields.exclude
and all([collection_id, item_id])
):
# TODO: feature.collection is not always included
# This code fails if it's left outside of the fields expression
# I've fields extension updated test cases to always include feature.collection
feature["links"] = await ItemLinks(
collection_id=feature["collection"],
item_id=feature["id"],
collection_id=collection_id,
item_id=item_id,
request=request,
).get_links(extra_links=feature.get("links"))

exclude = search_request.fields.exclude
if exclude and len(exclude) == 0:
exclude = None
include = search_request.fields.include
if include and len(include) == 0:
include = None
cleaned_features.append(feature)
cleaned_features: List[Item] = []

if settings.use_api_hydrate:

async def _get_base_item(collection_id: str) -> Dict[str, Any]:
return await self._get_base_item(collection_id, request)

base_item_cache = settings.base_item_cache(
fetch_base_item=_get_base_item, request=request
)

for feature in collection.get("features") or []:
base_item = await base_item_cache.get(feature.get("collection"))
feature = hydrate(base_item, feature)

# Grab ids needed for links that may be removed by the fields extension.
collection_id = feature.get("collection")
item_id = feature.get("id")

feature = filter_fields(feature, include, exclude)
await _add_item_links(feature, collection_id, item_id)

cleaned_features.append(feature)
else:
for feature in collection.get("features") or []:
await _add_item_links(feature)
cleaned_features.append(feature)

collection["features"] = cleaned_features
collection["links"] = await PagingLinks(
Expand Down
55 changes: 55 additions & 0 deletions stac_fastapi/pgstac/stac_fastapi/pgstac/types/base_item_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""base_item_cache classes for pgstac fastapi."""
import abc
from typing import Any, Callable, Coroutine, Dict

from starlette.requests import Request


class BaseItemCache(abc.ABC):
"""
A cache that returns a base item for a collection.
If no base item is found in the cache, use the fetch_base_item function
to fetch the base item from pgstac.
"""

def __init__(
self,
fetch_base_item: Callable[[str], Coroutine[Any, Any, Dict[str, Any]]],
request: Request,
):
"""
Initialize the base item cache.
Args:
fetch_base_item: A function that fetches the base item for a collection.
request: The request object containing app state that may be used by caches.
"""
self._fetch_base_item = fetch_base_item
self._request = request

@abc.abstractmethod
async def get(self, collection_id: str) -> Dict[str, Any]:
"""Return the base item for the collection and cache by collection id."""
pass


class DefaultBaseItemCache(BaseItemCache):
"""Implementation of the BaseItemCache that holds base items in a dict."""

def __init__(
self,
fetch_base_item: Callable[[str], Coroutine[Any, Any, Dict[str, Any]]],
request: Request,
):
"""Initialize the base item cache."""
self._base_items = {}
super().__init__(fetch_base_item, request)

async def get(self, collection_id: str):
"""Return the base item for the collection and cache by collection id."""
if collection_id not in self._base_items:
self._base_items[collection_id] = await self._fetch_base_item(
collection_id,
)
return self._base_items[collection_id]
Loading

0 comments on commit 162a1a2

Please sign in to comment.