Skip to content

Search optimization and indexing based on datetime #405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ test-opensearch:

.PHONY: test
test:
-$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest --cov=stac_fastapi --cov-report=term-missing'
-$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest -s --cov=stac_fastapi --cov-report=term-missing'
docker compose down

-$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest --cov=stac_fastapi --cov-report=term-missing'
-$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest -s --cov=stac_fastapi --cov-report=term-missing'
docker compose down

.PHONY: run-database-es
Expand Down
49 changes: 26 additions & 23 deletions README.md

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ services:
- ES_USE_SSL=false
- ES_VERIFY_CERTS=false
- BACKEND=elasticsearch
- ENABLE_DATETIME_INDEX_FILTERING=true
ports:
- "8080:8080"
volumes:
Expand Down Expand Up @@ -55,6 +56,7 @@ services:
- ES_VERIFY_CERTS=false
- BACKEND=opensearch
- STAC_FASTAPI_RATE_LIMIT=200/minute
- ENABLE_DATETIME_INDEX_FILTERING=true
ports:
- "8082:8082"
volumes:
Expand All @@ -69,9 +71,11 @@ services:
elasticsearch:
container_name: es-container
image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.11.0}
platform: linux/amd64
hostname: elasticsearch
environment:
ES_JAVA_OPTS: -Xms512m -Xmx1g
action.destructive_requires_name: false
volumes:
- ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
- ./elasticsearch/snapshots:/usr/share/elasticsearch/snapshots
Expand All @@ -81,6 +85,7 @@ services:
opensearch:
container_name: os-container
image: opensearchproject/opensearch:${OPENSEARCH_VERSION:-2.11.1}
platform: linux/amd64
hostname: opensearch
environment:
- discovery.type=single-node
Expand Down
9 changes: 7 additions & 2 deletions stac_fastapi/core/stac_fastapi/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
BulkTransactionMethod,
Items,
)
from stac_fastapi.sfeos_helpers.database import return_date
from stac_fastapi.types import stac as stac_types
from stac_fastapi.types.conformance import BASE_CONFORMANCE_CLASSES
from stac_fastapi.types.core import AsyncBaseCoreClient, AsyncBaseTransactionsClient
Expand Down Expand Up @@ -315,9 +316,10 @@ async def item_collection(
search=search, collection_ids=[collection_id]
)

datetime_search = return_date(datetime)
if datetime:
search = self.database.apply_datetime_filter(
search=search, interval=datetime
search=search, datetime_search=datetime_search
)

if bbox:
Expand All @@ -333,6 +335,7 @@ async def item_collection(
sort=None,
token=token,
collection_ids=[collection_id],
datetime_search=datetime_search,
)

items = [
Expand Down Expand Up @@ -491,9 +494,10 @@ async def post_search(
search=search, collection_ids=search_request.collections
)

datetime_search = return_date(search_request.datetime)
if search_request.datetime:
search = self.database.apply_datetime_filter(
search=search, interval=search_request.datetime
search=search, datetime_search=datetime_search
)

if search_request.bbox:
Expand Down Expand Up @@ -551,6 +555,7 @@ async def post_search(
token=search_request.token,
sort=sort,
collection_ids=search_request.collections,
datetime_search=datetime_search,
)

fields = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
)
from stac_fastapi.sfeos_helpers import filter
from stac_fastapi.sfeos_helpers.database import (
AsyncIndexInserter,
IndexInsertionFactory,
IndexSelectionStrategy,
IndexSelectorFactory,
SyncIndexInserter,
apply_free_text_filter_shared,
apply_intersects_filter_shared,
create_index_templates_shared,
Expand All @@ -30,7 +35,6 @@
index_alias_by_collection_id,
index_by_collection_id,
indices,
mk_actions,
mk_item_id,
populate_sort_shared,
return_date,
Expand All @@ -45,7 +49,6 @@
Geometry,
)
from stac_fastapi.types.errors import ConflictError, NotFoundError
from stac_fastapi.types.rfc3339 import DateTimeType
from stac_fastapi.types.stac import Collection, Item

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -123,6 +126,10 @@ class DatabaseLogic(BaseDatabaseLogic):
sync_settings: SyncElasticsearchSettings = attr.ib(
factory=SyncElasticsearchSettings
)
async_index_selector: IndexSelectionStrategy = attr.ib(init=False)
sync_index_selector: IndexSelectionStrategy = attr.ib(init=False)
async_index_inserter: AsyncIndexInserter = attr.ib(init=False)
sync_index_inserter: SyncIndexInserter = attr.ib(init=False)

client = attr.ib(init=False)
sync_client = attr.ib(init=False)
Expand All @@ -131,6 +138,14 @@ def __attrs_post_init__(self):
"""Initialize clients after the class is instantiated."""
self.client = self.async_settings.create_client
self.sync_client = self.sync_settings.create_client
self.async_index_inserter = IndexInsertionFactory.create_insertion_strategy(
self.client
)
self.sync_index_inserter = IndexInsertionFactory.create_sync_insertion_strategy(
self.sync_client
)
self.async_index_selector = IndexSelectorFactory.create_async_selector(self.client)
self.sync_index_selector = IndexSelectorFactory.create_sync_selector(self.sync_client)

item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer)
collection_serializer: Type[CollectionSerializer] = attr.ib(
Expand Down Expand Up @@ -244,19 +259,18 @@ def apply_collections_filter(search: Search, collection_ids: List[str]):

@staticmethod
def apply_datetime_filter(
search: Search, interval: Optional[Union[DateTimeType, str]]
search: Search, datetime_search: Dict[str, Optional[str]]
):
"""Apply a filter to search on datetime, start_datetime, and end_datetime fields.

Args:
search (Search): The search object to filter.
interval: Optional[Union[DateTimeType, str]]
datetime_search: Dict[str, Optional[str]]

Returns:
Search: The filtered search object.
"""
should = []
datetime_search = return_date(interval)

# If the request is a single datetime return
# items with datetimes equal to the requested datetime OR
Expand Down Expand Up @@ -501,6 +515,7 @@ async def execute_search(
token: Optional[str],
sort: Optional[Dict[str, Dict[str, str]]],
collection_ids: Optional[List[str]],
datetime_search: Dict[str, Optional[str]],
ignore_unavailable: bool = True,
) -> Tuple[Iterable[Dict[str, Any]], Optional[int], Optional[str]]:
"""Execute a search query with limit and other optional parameters.
Expand All @@ -511,6 +526,7 @@ async def execute_search(
token (Optional[str]): The token used to return the next set of results.
sort (Optional[Dict[str, Dict[str, str]]]): Specifies how the results should be sorted.
collection_ids (Optional[List[str]]): The collection ids to search.
datetime_search (Dict[str, Optional[str]]): Datetime range used for index selection.
ignore_unavailable (bool, optional): Whether to ignore unavailable collections. Defaults to True.

Returns:
Expand All @@ -531,7 +547,9 @@ async def execute_search(

query = search.query.to_dict() if search.query else None

index_param = indices(collection_ids)
index_param = await self.async_index_selector.select_indexes(
collection_ids, datetime_search
)

max_result_window = MAX_LIMIT

Expand Down Expand Up @@ -595,6 +613,7 @@ async def aggregate(
geometry_geohash_grid_precision: int,
geometry_geotile_grid_precision: int,
datetime_frequency_interval: str,
datetime_search,
ignore_unavailable: Optional[bool] = True,
):
"""Return aggregations of STAC Items."""
Expand Down Expand Up @@ -630,7 +649,10 @@ def _fill_aggregation_parameters(name: str, agg: dict) -> dict:
if k in aggregations
}

index_param = indices(collection_ids)
index_param = await self.async_index_selector.select_indexes(
collection_ids, datetime_search
)

search_task = asyncio.create_task(
self.client.search(
index=index_param,
Expand Down Expand Up @@ -828,9 +850,12 @@ async def create_item(
item=item, base_url=base_url, exist_ok=exist_ok
)

target_index = await self.async_index_inserter.get_target_index(
collection_id, item
)
# Index the item in the database
await self.client.index(
index=index_alias_by_collection_id(collection_id),
index=target_index,
id=mk_item_id(item_id, collection_id),
document=item,
refresh=refresh,
Expand Down Expand Up @@ -866,10 +891,16 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs: Any):

try:
# Perform the delete operation
await self.client.delete(
await self.client.delete_by_query(
index=index_alias_by_collection_id(collection_id),
id=mk_item_id(item_id, collection_id),
refresh=refresh,
body={
"query": {
"term": {
"_id": mk_item_id(item_id, collection_id)
}
}
},
refresh=refresh
)
except ESNotFoundError:
# Raise a custom NotFoundError if the item does not exist
Expand Down Expand Up @@ -937,8 +968,10 @@ async def create_collection(self, collection: Collection, **kwargs: Any):
refresh=refresh,
)

# Create the item index for the collection
await create_item_index(collection_id)
if self.async_index_inserter.should_create_collection_index():
await self.async_index_inserter.create_simple_index(
self.client, collection_id
)

async def find_collection(self, collection_id: str) -> Collection:
"""Find and return a collection from the database.
Expand Down Expand Up @@ -1136,9 +1169,12 @@ async def bulk_async(

# Perform the bulk insert
raise_on_error = self.async_settings.raise_on_bulk_error
actions = await self.async_index_inserter.prepare_bulk_actions(
collection_id, processed_items
)
success, errors = await helpers.async_bulk(
self.client,
mk_actions(collection_id, processed_items),
actions,
refresh=refresh,
raise_on_error=raise_on_error,
)
Expand Down Expand Up @@ -1202,9 +1238,12 @@ def bulk_sync(

# Perform the bulk insert
raise_on_error = self.sync_settings.raise_on_bulk_error
actions = self.sync_index_inserter.prepare_bulk_actions(
collection_id, processed_items
)
success, errors = helpers.bulk(
self.sync_client,
mk_actions(collection_id, processed_items),
actions,
refresh=refresh,
raise_on_error=raise_on_error,
)
Expand Down
Loading