diff --git a/google/cloud/firestore/__init__.py b/google/cloud/firestore/__init__.py index 79095778db..314a138cbc 100644 --- a/google/cloud/firestore/__init__.py +++ b/google/cloud/firestore/__init__.py @@ -38,6 +38,7 @@ from google.cloud.firestore_v1 import DocumentSnapshot from google.cloud.firestore_v1 import DocumentTransform from google.cloud.firestore_v1 import ExistsOption +from google.cloud.firestore_v1 import ExplainOptions from google.cloud.firestore_v1 import FieldFilter from google.cloud.firestore_v1 import GeoPoint from google.cloud.firestore_v1 import Increment @@ -78,6 +79,7 @@ "DocumentSnapshot", "DocumentTransform", "ExistsOption", + "ExplainOptions", "FieldFilter", "GeoPoint", "Increment", diff --git a/google/cloud/firestore_v1/__init__.py b/google/cloud/firestore_v1/__init__.py index 1aff5ec740..049eb4183f 100644 --- a/google/cloud/firestore_v1/__init__.py +++ b/google/cloud/firestore_v1/__init__.py @@ -50,6 +50,7 @@ from google.cloud.firestore_v1.collection import CollectionReference from google.cloud.firestore_v1.document import DocumentReference from google.cloud.firestore_v1.query import CollectionGroup, Query +from google.cloud.firestore_v1.query_profile import ExplainOptions from google.cloud.firestore_v1.transaction import Transaction, transactional from google.cloud.firestore_v1.transforms import ( DELETE_FIELD, @@ -131,6 +132,7 @@ "DocumentSnapshot", "DocumentTransform", "ExistsOption", + "ExplainOptions", "FieldFilter", "GeoPoint", "Increment", diff --git a/google/cloud/firestore_v1/aggregation.py b/google/cloud/firestore_v1/aggregation.py index 65106122ab..f0e3f94baf 100644 --- a/google/cloud/firestore_v1/aggregation.py +++ b/google/cloud/firestore_v1/aggregation.py @@ -30,12 +30,14 @@ BaseAggregationQuery, _query_response_to_result, ) -from google.cloud.firestore_v1.base_document import DocumentSnapshot +from google.cloud.firestore_v1.query_results import QueryResultsList from google.cloud.firestore_v1.stream_generator import StreamGenerator # Types needed only for Type Hints -if TYPE_CHECKING: - from google.cloud.firestore_v1 import transaction # pragma: NO COVER +if TYPE_CHECKING: # pragma: NO COVER + from google.cloud.firestore_v1 import transaction + from google.cloud.firestore_v1.query_profile import ExplainMetrics + from google.cloud.firestore_v1.query_profile import ExplainOptions class AggregationQuery(BaseAggregationQuery): @@ -54,10 +56,14 @@ def get( retries.Retry, None, gapic_v1.method._MethodDefault ] = gapic_v1.method.DEFAULT, timeout: float | None = None, - ) -> List[AggregationResult]: + *, + explain_options: Optional[ExplainOptions] = None, + ) -> QueryResultsList[AggregationResult]: """Runs the aggregation query. - This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages. + This sends a ``RunAggregationQuery`` RPC and returns a list of + aggregation results in the stream of ``RunAggregationQueryResponse`` + messages. Args: transaction @@ -70,20 +76,39 @@ def get( should be retried. Defaults to a system-specified policy. timeout (float): The timeout for this request. Defaults to a system-specified value. + explain_options + (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. Returns: - list: The aggregation query results + QueryResultsList[AggregationResult]: The aggregation query results. """ - result = self.stream(transaction=transaction, retry=retry, timeout=timeout) - return list(result) # type: ignore + explain_metrics: ExplainMetrics | None = None - def _get_stream_iterator(self, transaction, retry, timeout): + result = self.stream( + transaction=transaction, + retry=retry, + timeout=timeout, + explain_options=explain_options, + ) + result_list = list(result) + + if explain_options is None: + explain_metrics = None + else: + explain_metrics = result.get_explain_metrics() + + return QueryResultsList(result_list, explain_options, explain_metrics) + + def _get_stream_iterator(self, transaction, retry, timeout, explain_options=None): """Helper method for :meth:`stream`.""" request, kwargs = self._prep_stream( transaction, retry, timeout, + explain_options, ) return self._client._firestore_api.run_aggregation_query( @@ -106,9 +131,12 @@ def _retry_query_after_exception(self, exc, retry, transaction): def _make_stream( self, transaction: Optional[transaction.Transaction] = None, - retry: Optional[retries.Retry] = gapic_v1.method.DEFAULT, + retry: Union[ + retries.Retry, None, gapic_v1.method._MethodDefault + ] = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, - ) -> Union[Generator[List[AggregationResult], Any, None]]: + explain_options: Optional[ExplainOptions] = None, + ) -> Generator[List[AggregationResult], Any, Optional[ExplainMetrics]]: """Internal method for stream(). Runs the aggregation query. This sends a ``RunAggregationQuery`` RPC and then returns a generator @@ -127,16 +155,27 @@ def _make_stream( system-specified policy. timeout (Optional[float]): The timeout for this request. Defaults to a system-specified value. + explain_options + (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. Yields: - :class:`~google.cloud.firestore_v1.base_aggregation.AggregationResult`: + List[AggregationResult]: The result of aggregations of this query. + + Returns: + (Optional[google.cloud.firestore_v1.types.query_profile.ExplainMetrtics]): + The results of query profiling, if received from the service. + """ + metrics: ExplainMetrics | None = None response_iterator = self._get_stream_iterator( transaction, retry, timeout, + explain_options, ) while True: try: @@ -154,15 +193,26 @@ def _make_stream( if response is None: # EOI break + + if metrics is None and response.explain_metrics: + metrics = response.explain_metrics + result = _query_response_to_result(response) - yield result + if result: + yield result + + return metrics def stream( self, transaction: Optional["transaction.Transaction"] = None, - retry: Optional[retries.Retry] = gapic_v1.method.DEFAULT, + retry: Union[ + retries.Retry, None, gapic_v1.method._MethodDefault + ] = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, - ) -> "StreamGenerator[DocumentSnapshot]": + *, + explain_options: Optional[ExplainOptions] = None, + ) -> StreamGenerator[List[AggregationResult]]: """Runs the aggregation query. This sends a ``RunAggregationQuery`` RPC and then returns a generator @@ -181,13 +231,19 @@ def stream( system-specified policy. timeout (Optinal[float]): The timeout for this request. Defaults to a system-specified value. + explain_options + (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. Returns: - `StreamGenerator[DocumentSnapshot]`: A generator of the query results. + `StreamGenerator[List[AggregationResult]]`: + A generator of the query results. """ inner_generator = self._make_stream( transaction=transaction, retry=retry, timeout=timeout, + explain_options=explain_options, ) - return StreamGenerator(inner_generator) + return StreamGenerator(inner_generator, explain_options) diff --git a/google/cloud/firestore_v1/async_aggregation.py b/google/cloud/firestore_v1/async_aggregation.py index 1c75f0cfd8..6ae42ac266 100644 --- a/google/cloud/firestore_v1/async_aggregation.py +++ b/google/cloud/firestore_v1/async_aggregation.py @@ -53,7 +53,7 @@ async def get( retries.AsyncRetry, None, gapic_v1.method._MethodDefault ] = gapic_v1.method.DEFAULT, timeout: float | None = None, - ) -> List[AggregationResult]: + ) -> List[List[AggregationResult]]: """Runs the aggregation query. This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages. @@ -71,7 +71,7 @@ async def get( system-specified value. Returns: - list: The aggregation query results + List[List[AggregationResult]]: The aggregation query results """ stream_result = self.stream( diff --git a/google/cloud/firestore_v1/async_stream_generator.py b/google/cloud/firestore_v1/async_stream_generator.py index ca0481c0d1..e575a59d21 100644 --- a/google/cloud/firestore_v1/async_stream_generator.py +++ b/google/cloud/firestore_v1/async_stream_generator.py @@ -16,25 +16,28 @@ Firestore API. """ -from collections import abc +from typing import Any, AsyncGenerator, Awaitable, TypeVar -class AsyncStreamGenerator(abc.AsyncGenerator): +T = TypeVar("T") + + +class AsyncStreamGenerator(AsyncGenerator[T, Any]): """Asynchronous generator for the streamed results.""" - def __init__(self, response_generator): + def __init__(self, response_generator: AsyncGenerator[T, Any]): self._generator = response_generator - def __aiter__(self): - return self._generator + def __aiter__(self) -> AsyncGenerator[T, Any]: + return self - def __anext__(self): + def __anext__(self) -> Awaitable[T]: return self._generator.__anext__() - def asend(self, value=None): + def asend(self, value=None) -> Awaitable[Any]: return self._generator.asend(value) - def athrow(self, exp=None): + def athrow(self, exp=None) -> Awaitable[Any]: return self._generator.athrow(exp) def aclose(self): diff --git a/google/cloud/firestore_v1/base_aggregation.py b/google/cloud/firestore_v1/base_aggregation.py index f922663791..a3b0e4e760 100644 --- a/google/cloud/firestore_v1/base_aggregation.py +++ b/google/cloud/firestore_v1/base_aggregation.py @@ -24,17 +24,7 @@ import abc from abc import ABC -from typing import ( - TYPE_CHECKING, - Any, - AsyncGenerator, - Coroutine, - Generator, - List, - Optional, - Tuple, - Union, -) +from typing import TYPE_CHECKING, Any, Coroutine, List, Optional, Tuple, Union from google.api_core import gapic_v1 from google.api_core import retry as retries @@ -47,8 +37,14 @@ ) # Types needed only for Type Hints -if TYPE_CHECKING: - from google.cloud.firestore_v1 import transaction # pragma: NO COVER +if TYPE_CHECKING: # pragma: NO COVER + from google.cloud.firestore_v1 import transaction + from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator + from google.cloud.firestore_v1.query_profile import ExplainOptions + from google.cloud.firestore_v1.query_results import QueryResultsList + from google.cloud.firestore_v1.stream_generator import ( + StreamGenerator, + ) class AggregationResult(object): @@ -62,7 +58,7 @@ class AggregationResult(object): :param value: The resulting read_time """ - def __init__(self, alias: str, value: int, read_time=None): + def __init__(self, alias: str, value: float, read_time=None): self.alias = alias self.value = value self.read_time = read_time @@ -211,6 +207,7 @@ def _prep_stream( transaction=None, retry: Union[retries.Retry, None, gapic_v1.method._MethodDefault] = None, timeout: float | None = None, + explain_options: Optional[ExplainOptions] = None, ) -> Tuple[dict, dict]: parent_path, expected_prefix = self._collection_ref._parent_info() request = { @@ -218,6 +215,8 @@ def _prep_stream( "structured_aggregation_query": self._to_protobuf(), "transaction": _helpers.get_transaction_id(transaction), } + if explain_options: + request["explain_options"] = explain_options._to_dict() kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) return request, kwargs @@ -230,10 +229,17 @@ def get( retries.Retry, None, gapic_v1.method._MethodDefault ] = gapic_v1.method.DEFAULT, timeout: float | None = None, - ) -> List[AggregationResult] | Coroutine[Any, Any, List[AggregationResult]]: + *, + explain_options: Optional[ExplainOptions] = None, + ) -> ( + QueryResultsList[AggregationResult] + | Coroutine[Any, Any, List[List[AggregationResult]]] + ): """Runs the aggregation query. - This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages. + This sends a ``RunAggregationQuery`` RPC and returns a list of + aggregation results in the stream of ``RunAggregationQueryResponse`` + messages. Args: transaction @@ -246,22 +252,27 @@ def get( should be retried. Defaults to a system-specified policy. timeout (float): The timeout for this request. Defaults to a system-specified value. + explain_options + (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. Returns: - list: The aggregation query results - + (QueryResultsList[List[AggregationResult]] | Coroutine[Any, Any, List[List[AggregationResult]]]): + The aggregation query results. """ @abc.abstractmethod def stream( self, transaction: Optional[transaction.Transaction] = None, - retry: Optional[retries.Retry] = gapic_v1.method.DEFAULT, + retry: Union[ + retries.Retry, None, gapic_v1.method._MethodDefault + ] = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, - ) -> ( - Generator[List[AggregationResult], Any, None] - | AsyncGenerator[List[AggregationResult], None] - ): + *, + explain_options: Optional[ExplainOptions] = None, + ) -> StreamGenerator[List[AggregationResult]] | AsyncStreamGenerator: """Runs the aggregation query. This sends a``RunAggregationQuery`` RPC and returns a generator in the stream of ``RunAggregationQueryResponse`` messages. @@ -274,8 +285,13 @@ def stream( errors, if any, should be retried. Defaults to a system-specified policy. timeout (Optinal[float]): The timeout for this request. Defaults - to a system-specified value. + to a system-specified value. + explain_options + (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. Returns: + StreamGenerator[List[AggregationResult]] | AsyncStreamGenerator: A generator of the query results. """ diff --git a/google/cloud/firestore_v1/base_collection.py b/google/cloud/firestore_v1/base_collection.py index 18c62aa33b..865638c431 100644 --- a/google/cloud/firestore_v1/base_collection.py +++ b/google/cloud/firestore_v1/base_collection.py @@ -35,19 +35,23 @@ from google.api_core import retry as retries from google.cloud.firestore_v1 import _helpers -from google.cloud.firestore_v1.base_aggregation import BaseAggregationQuery from google.cloud.firestore_v1.base_query import QueryType -from google.cloud.firestore_v1.base_vector_query import BaseVectorQuery, DistanceMeasure -from google.cloud.firestore_v1.document import DocumentReference -from google.cloud.firestore_v1.vector import Vector if TYPE_CHECKING: # pragma: NO COVER # Types needed only for Type Hints - from firestore_v1.vector_query import VectorQuery - + from google.cloud.firestore_v1.base_aggregation import BaseAggregationQuery from google.cloud.firestore_v1.base_document import DocumentSnapshot + from google.cloud.firestore_v1.base_vector_query import ( + BaseVectorQuery, + DistanceMeasure, + ) + from google.cloud.firestore_v1.document import DocumentReference from google.cloud.firestore_v1.field_path import FieldPath + from google.cloud.firestore_v1.query_profile import ExplainOptions + from google.cloud.firestore_v1.query_results import QueryResultsList from google.cloud.firestore_v1.transaction import Transaction + from google.cloud.firestore_v1.vector import Vector + from google.cloud.firestore_v1.vector_query import VectorQuery _AUTO_ID_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" @@ -492,9 +496,9 @@ def get( transaction: Optional[Transaction] = None, retry: Optional[retries.Retry] = None, timeout: Optional[float] = None, - ) -> Union[ - Generator[DocumentSnapshot, Any, Any], AsyncGenerator[DocumentSnapshot, Any] - ]: + *, + explain_options: Optional[ExplainOptions] = None, + ) -> QueryResultsList[DocumentSnapshot]: raise NotImplementedError def stream( @@ -502,6 +506,8 @@ def stream( transaction: Optional[Transaction] = None, retry: Optional[retries.Retry] = None, timeout: Optional[float] = None, + *, + explain_options: Optional[ExplainOptions] = None, ) -> Union[Iterator[DocumentSnapshot], AsyncIterator[DocumentSnapshot]]: raise NotImplementedError diff --git a/google/cloud/firestore_v1/base_document.py b/google/cloud/firestore_v1/base_document.py index 1418ea34d0..ada42acb3e 100644 --- a/google/cloud/firestore_v1/base_document.py +++ b/google/cloud/firestore_v1/base_document.py @@ -13,17 +13,29 @@ # limitations under the License. """Classes for representing documents for the Google Cloud Firestore API.""" +from __future__ import annotations import copy -from typing import Any, Dict, Iterable, NoReturn, Optional, Tuple, Union +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + NoReturn, + Optional, + Tuple, + Union, +) from google.api_core import retry as retries from google.cloud.firestore_v1 import _helpers from google.cloud.firestore_v1 import field_path as field_path_module +from google.cloud.firestore_v1.types import common # Types needed only for Type Hints -from google.cloud.firestore_v1.types import Document, common, firestore, write +if TYPE_CHECKING: # pragma: NO COVER + from google.cloud.firestore_v1.types import Document, firestore, write class BaseDocumentReference(object): diff --git a/google/cloud/firestore_v1/base_query.py b/google/cloud/firestore_v1/base_query.py index cfed454b93..a1b8ee187b 100644 --- a/google/cloud/firestore_v1/base_query.py +++ b/google/cloud/firestore_v1/base_query.py @@ -61,6 +61,7 @@ if TYPE_CHECKING: # pragma: NO COVER from google.cloud.firestore_v1.base_vector_query import BaseVectorQuery from google.cloud.firestore_v1.field_path import FieldPath + from google.cloud.firestore_v1.query_profile import ExplainMetrics, ExplainOptions _BAD_DIR_STRING: str _BAD_OP_NAN_NULL: str @@ -1008,6 +1009,8 @@ def get( transaction=None, retry: Optional[retries.Retry] = None, timeout: Optional[float] = None, + *, + explain_options: Optional[ExplainOptions] = None, ) -> Iterable[DocumentSnapshot]: raise NotImplementedError @@ -1016,6 +1019,7 @@ def _prep_stream( transaction=None, retry: Optional[retries.Retry] = None, timeout: Optional[float] = None, + explain_options: Optional[ExplainOptions] = None, ) -> Tuple[dict, str, dict]: """Shared setup for async / sync :meth:`stream`""" if self._limit_to_last: @@ -1030,6 +1034,8 @@ def _prep_stream( "structured_query": self._to_protobuf(), "transaction": _helpers.get_transaction_id(transaction), } + if explain_options is not None: + request["explain_options"] = explain_options._to_dict() kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) return request, expected_prefix, kwargs @@ -1039,7 +1045,9 @@ def stream( transaction=None, retry: Optional[retries.Retry] = None, timeout: Optional[float] = None, - ) -> Generator[document.DocumentSnapshot, Any, None]: + *, + explain_options: Optional[ExplainOptions] = None, + ) -> Generator[document.DocumentSnapshot, Any, Optional[ExplainMetrics]]: raise NotImplementedError def on_snapshot(self, callback) -> NoReturn: diff --git a/google/cloud/firestore_v1/base_transaction.py b/google/cloud/firestore_v1/base_transaction.py index 09f0c1fb9a..3b9cd479be 100644 --- a/google/cloud/firestore_v1/base_transaction.py +++ b/google/cloud/firestore_v1/base_transaction.py @@ -13,13 +13,18 @@ # limitations under the License. """Helpers for applying Google Cloud Firestore changes in a transaction.""" +from __future__ import annotations -from typing import Any, Coroutine, NoReturn, Optional, Union +from typing import TYPE_CHECKING, Any, Coroutine, NoReturn, Optional, Union from google.api_core import retry as retries from google.cloud.firestore_v1 import types +# Types needed only for Type Hints +if TYPE_CHECKING: # pragma: NO COVER + from google.cloud.firestore_v1.query_profile import ExplainOptions + _CANT_BEGIN: str _CANT_COMMIT: str _CANT_RETRY_READ_ONLY: str @@ -150,6 +155,8 @@ def get( ref_or_query, retry: retries.Retry = None, timeout: float = None, + *, + explain_options: Optional[ExplainOptions] = None, ) -> NoReturn: raise NotImplementedError diff --git a/google/cloud/firestore_v1/base_vector_query.py b/google/cloud/firestore_v1/base_vector_query.py index 26cd5b1997..e7607bd478 100644 --- a/google/cloud/firestore_v1/base_vector_query.py +++ b/google/cloud/firestore_v1/base_vector_query.py @@ -14,19 +14,23 @@ """Classes for representing vector queries for the Google Cloud Firestore API. """ +from __future__ import annotations import abc from abc import ABC from enum import Enum -from typing import Iterable, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Generator, Iterable, Optional, Tuple, Union from google.api_core import gapic_v1 from google.api_core import retry as retries -from google.cloud.firestore_v1 import _helpers, document -from google.cloud.firestore_v1.base_document import DocumentSnapshot +from google.cloud.firestore_v1 import _helpers from google.cloud.firestore_v1.types import query -from google.cloud.firestore_v1.vector import Vector + +if TYPE_CHECKING: # pragma: NO COVER + from google.cloud.firestore_v1.base_document import DocumentSnapshot + from google.cloud.firestore_v1.query_profile import ExplainMetrics, ExplainOptions + from google.cloud.firestore_v1.vector import Vector class DistanceMeasure(Enum): @@ -94,6 +98,7 @@ def _prep_stream( transaction=None, retry: Union[retries.Retry, None, gapic_v1.method._MethodDefault] = None, timeout: Optional[float] = None, + explain_options: Optional[ExplainOptions] = None, ) -> Tuple[dict, str, dict]: parent_path, expected_prefix = self._collection_ref._parent_info() request = { @@ -103,6 +108,9 @@ def _prep_stream( } kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) + if explain_options is not None: + request["explain_options"] = explain_options._to_dict() + return request, expected_prefix, kwargs @abc.abstractmethod @@ -111,6 +119,8 @@ def get( transaction=None, retry: retries.Retry = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, + *, + explain_options: Optional[ExplainOptions] = None, ) -> Iterable[DocumentSnapshot]: """Runs the vector query.""" @@ -138,5 +148,7 @@ def stream( transaction=None, retry: retries.Retry = gapic_v1.method.DEFAULT, timeout: float = None, - ) -> Iterable[document.DocumentSnapshot]: + *, + explain_options: Optional[ExplainOptions] = None, + ) -> Generator[DocumentSnapshot, Any, Optional[ExplainMetrics]]: """Reads the documents in the collection that match this query.""" diff --git a/google/cloud/firestore_v1/collection.py b/google/cloud/firestore_v1/collection.py index 96dadf2e70..5e2f23811e 100644 --- a/google/cloud/firestore_v1/collection.py +++ b/google/cloud/firestore_v1/collection.py @@ -13,6 +13,7 @@ # limitations under the License. """Classes for representing collections for the Google Cloud Firestore API.""" +from __future__ import annotations from typing import TYPE_CHECKING, Any, Callable, Generator, Optional, Tuple, Union @@ -26,10 +27,12 @@ BaseCollectionReference, _item_to_document_ref, ) +from google.cloud.firestore_v1.query_results import QueryResultsList from google.cloud.firestore_v1.watch import Watch if TYPE_CHECKING: # pragma: NO COVER from google.cloud.firestore_v1.base_document import DocumentSnapshot + from google.cloud.firestore_v1.query_profile import ExplainOptions from google.cloud.firestore_v1.stream_generator import StreamGenerator @@ -169,7 +172,9 @@ def get( transaction: Union[transaction.Transaction, None] = None, retry: retries.Retry = gapic_v1.method.DEFAULT, timeout: Union[float, None] = None, - ) -> list: + *, + explain_options: Optional[ExplainOptions] = None, + ) -> QueryResultsList[DocumentSnapshot]: """Read the documents in this collection. This sends a ``RunQuery`` RPC and returns a list of documents @@ -183,15 +188,22 @@ def get( should be retried. Defaults to a system-specified policy. timeout (float): The timeout for this request. Defaults to a system-specified value. + explain_options + (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. If a ``transaction`` is used and it already has write operations added, this method cannot be used (i.e. read-after-write is not allowed). Returns: - list: The documents in this collection that match the query. + QueryResultsList[DocumentSnapshot]: The documents in this collection + that match the query. """ query, kwargs = self._prep_get_or_stream(retry, timeout) + if explain_options is not None: + kwargs["explain_options"] = explain_options return query.get(transaction=transaction, **kwargs) @@ -200,6 +212,8 @@ def stream( transaction: Optional[transaction.Transaction] = None, retry: Optional[retries.Retry] = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, + *, + explain_options: Optional[ExplainOptions] = None, ) -> "StreamGenerator[DocumentSnapshot]": """Read the documents in this collection. @@ -227,11 +241,17 @@ def stream( system-specified policy. timeout (Optional[float]): The timeout for this request. Defaults to a system-specified value. + explain_options + (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. Returns: `StreamGenerator[DocumentSnapshot]`: A generator of the query results. """ query, kwargs = self._prep_get_or_stream(retry, timeout) + if explain_options: + kwargs["explain_options"] = explain_options return query.stream(transaction=transaction, **kwargs) diff --git a/google/cloud/firestore_v1/query.py b/google/cloud/firestore_v1/query.py index eb8f51dc8d..8677ea0d04 100644 --- a/google/cloud/firestore_v1/query.py +++ b/google/cloud/firestore_v1/query.py @@ -27,7 +27,10 @@ from google.cloud import firestore_v1 from google.cloud.firestore_v1 import aggregation, transaction -from google.cloud.firestore_v1.base_document import DocumentSnapshot +from google.cloud.firestore_v1.query_results import QueryResultsList +from google.cloud.firestore_v1.base_document import ( + DocumentSnapshot, +) from google.cloud.firestore_v1.base_query import ( BaseCollectionGroup, BaseQuery, @@ -36,14 +39,15 @@ _enum_from_direction, _query_response_to_snapshot, ) -from google.cloud.firestore_v1.base_vector_query import DistanceMeasure from google.cloud.firestore_v1.stream_generator import StreamGenerator from google.cloud.firestore_v1.vector import Vector from google.cloud.firestore_v1.vector_query import VectorQuery from google.cloud.firestore_v1.watch import Watch if TYPE_CHECKING: # pragma: NO COVER + from google.cloud.firestore_v1.base_vector_query import DistanceMeasure from google.cloud.firestore_v1.field_path import FieldPath + from google.cloud.firestore_v1.query_profile import ExplainMetrics, ExplainOptions class Query(BaseQuery): @@ -135,7 +139,9 @@ def get( transaction=None, retry: retries.Retry = gapic_v1.method.DEFAULT, timeout: float = None, - ) -> List[DocumentSnapshot]: + *, + explain_options: Optional[ExplainOptions] = None, + ) -> QueryResultsList[DocumentSnapshot]: """Read the documents in the collection that match this query. This sends a ``RunQuery`` RPC and returns a list of documents @@ -152,10 +158,17 @@ def get( should be retried. Defaults to a system-specified policy. timeout (float): The timeout for this request. Defaults to a system-specified value. + explain_options + (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. Returns: - list: The documents in the collection that match this query. + QueryResultsList[DocumentSnapshot]: The documents in the collection + that match this query. """ + explain_metrics: ExplainMetrics | None = None + is_limited_to_last = self._limit_to_last if self._limit_to_last: @@ -174,11 +187,18 @@ def get( transaction=transaction, retry=retry, timeout=timeout, + explain_options=explain_options, ) + result_list = list(result) if is_limited_to_last: - result = reversed(list(result)) + result_list = list(reversed(result_list)) - return list(result) + if explain_options is None: + explain_metrics = None + else: + explain_metrics = result.get_explain_metrics() + + return QueryResultsList(result_list, explain_options, explain_metrics) def _chunkify( self, chunk_size: int @@ -218,12 +238,13 @@ def _chunkify( ): return - def _get_stream_iterator(self, transaction, retry, timeout): + def _get_stream_iterator(self, transaction, retry, timeout, explain_options=None): """Helper method for :meth:`stream`.""" request, expected_prefix, kwargs = self._prep_stream( transaction, retry, timeout, + explain_options, ) response_iterator = self._client._firestore_api.run_query( @@ -331,7 +352,8 @@ def _make_stream( transaction: Optional[transaction.Transaction] = None, retry: Optional[retries.Retry] = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, - ) -> Generator[DocumentSnapshot, Any, None]: + explain_options: Optional[ExplainOptions] = None, + ) -> Generator[DocumentSnapshot, Any, Optional[ExplainMetrics]]: """Internal method for stream(). Read the documents in the collection that match this query. @@ -360,15 +382,26 @@ def _make_stream( system-specified policy. timeout (Optional[float]): The timeout for this request. Defaults to a system-specified value. + explain_options + (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. Yields: - :class:`~google.cloud.firestore_v1.document.DocumentSnapshot`: + DocumentSnapshot: The next document that fulfills the query. + + Returns: + ([google.cloud.firestore_v1.types.query_profile.ExplainMetrtics | None]): + The results of query profiling, if received from the service. """ + metrics: ExplainMetrics | None = None + response_iterator, expected_prefix = self._get_stream_iterator( transaction, retry, timeout, + explain_options, ) last_snapshot = None @@ -391,6 +424,9 @@ def _make_stream( if response is None: # EOI break + if metrics is None and response.explain_metrics: + metrics = response.explain_metrics + if self._all_descendants: snapshot = _collection_group_query_response_to_snapshot( response, self._parent @@ -403,12 +439,16 @@ def _make_stream( last_snapshot = snapshot yield snapshot + return metrics + def stream( self, transaction: Optional[transaction.Transaction] = None, retry: Optional[retries.Retry] = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, - ) -> "StreamGenerator[DocumentSnapshot]": + *, + explain_options: Optional[ExplainOptions] = None, + ) -> StreamGenerator[DocumentSnapshot]: """Read the documents in the collection that match this query. This sends a ``RunQuery`` RPC and then returns a generator which @@ -434,7 +474,11 @@ def stream( errors, if any, should be retried. Defaults to a system-specified policy. timeout (Optinal[float]): The timeout for this request. Defaults - to a system-specified value. + to a system-specified value. + explain_options + (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. Returns: `StreamGenerator[DocumentSnapshot]`: A generator of the query results. @@ -443,8 +487,9 @@ def stream( transaction=transaction, retry=retry, timeout=timeout, + explain_options=explain_options, ) - return StreamGenerator(inner_generator) + return StreamGenerator(inner_generator, explain_options) def on_snapshot(self, callback: Callable) -> Watch: """Monitor the documents in this collection that match this query. diff --git a/google/cloud/firestore_v1/query_profile.py b/google/cloud/firestore_v1/query_profile.py new file mode 100644 index 0000000000..6925f83ffa --- /dev/null +++ b/google/cloud/firestore_v1/query_profile.py @@ -0,0 +1,145 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from typing import Any + +import datetime + +from dataclasses import dataclass +from google.protobuf.json_format import MessageToDict + + +@dataclass(frozen=True) +class ExplainOptions: + """ + Explain options for the query. + Set on a query object using the explain_options attribute at query + construction time. + + :type analyze: bool + :param analyze: Optional. Whether to execute this query. When false + (the default), the query will be planned, returning only metrics from the + planning stages. When true, the query will be planned and executed, + returning the full query results along with both planning and execution + stage metrics. + """ + + analyze: bool = False + + def _to_dict(self): + return {"analyze": self.analyze} + + +@dataclass(frozen=True) +class PlanSummary: + """ + Contains planning phase information about a query.` + + :type indexes_used: list[dict[str, Any]] + :param indexes_used: The indexes selected for this query. + """ + + indexes_used: list[dict[str, Any]] + + +@dataclass(frozen=True) +class ExecutionStats: + """ + Execution phase information about a query. + + Only available when explain_options.analyze is True. + + :type results_returned: int + :param results_returned: Total number of results returned, including + documents, projections, aggregation results, keys. + :type execution_duration: datetime.timedelta + :param execution_duration: Total time to execute the query in the backend. + :type read_operations: int + :param read_operations: Total billable read operations. + :type debug_stats: dict[str, Any] + :param debug_stats: Debugging statistics from the execution of the query. + Note that the debugging stats are subject to change as Firestore evolves + """ + + results_returned: int + execution_duration: datetime.timedelta + read_operations: int + debug_stats: dict[str, Any] + + +@dataclass(frozen=True) +class ExplainMetrics: + """ + ExplainMetrics contains information about the planning and execution of a query. + + When explain_options.analyze is false, only plan_summary is available. + When explain_options.analyze is true, execution_stats is also available. + + :type plan_summary: PlanSummary + :param plan_summary: Planning phase information about the query. + :type execution_stats: ExecutionStats + :param execution_stats: Execution phase information about the query. + """ + + plan_summary: PlanSummary + + @staticmethod + def _from_pb(metrics_pb): + dict_repr = MessageToDict(metrics_pb._pb, preserving_proto_field_name=True) + plan_summary = PlanSummary( + indexes_used=dict_repr.get("plan_summary", {}).get("indexes_used", []) + ) + if "execution_stats" in dict_repr: + stats_dict = dict_repr.get("execution_stats", {}) + execution_stats = ExecutionStats( + results_returned=int(stats_dict.get("results_returned", 0)), + execution_duration=metrics_pb.execution_stats.execution_duration, + read_operations=int(stats_dict.get("read_operations", 0)), + debug_stats=stats_dict.get("debug_stats", {}), + ) + return _ExplainAnalyzeMetrics( + plan_summary=plan_summary, _execution_stats=execution_stats + ) + else: + return ExplainMetrics(plan_summary=plan_summary) + + @property + def execution_stats(self) -> ExecutionStats: + raise QueryExplainError( + "execution_stats not available when explain_options.analyze=False." + ) + + +@dataclass(frozen=True) +class _ExplainAnalyzeMetrics(ExplainMetrics): + """ + Subclass of ExplainMetrics that includes execution_stats. + Only available when explain_options.analyze is True. + """ + + plan_summary: PlanSummary + _execution_stats: ExecutionStats + + @property + def execution_stats(self) -> ExecutionStats: + return self._execution_stats + + +class QueryExplainError(Exception): + """ + Error returned when there is a problem accessing query profiling information. + """ + + pass diff --git a/google/cloud/firestore_v1/query_results.py b/google/cloud/firestore_v1/query_results.py new file mode 100644 index 0000000000..47dddf9de7 --- /dev/null +++ b/google/cloud/firestore_v1/query_results.py @@ -0,0 +1,87 @@ +# Copyright 2024 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.cloud.firestore_v1.query_profile import ( + ExplainMetrics, + ExplainOptions, + QueryExplainError, +) + +from typing import List, Optional, TypeVar + +T = TypeVar("T") + + +class QueryResultsList(List[T]): + """A list of received query results from the query call. + + This is a subclass of the built-in list. A new property `explain_metrics` + is added to return the query profile results. + + Args: + docs (list): + The list of query results. + explain_options + (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. + explain_metrics (Optional[ExplainMetrics]): + Query profile results. + """ + + def __init__( + self, + docs: List, + explain_options: Optional[ExplainOptions] = None, + explain_metrics: Optional[ExplainMetrics] = None, + ): + super().__init__(docs) + + # When explain_options is set, explain_metrics should be non-empty too. + if explain_options is not None and explain_metrics is None: + raise ValueError( + "If explain_options is set, explain_metrics must be non-empty." + ) + elif explain_options is None and explain_metrics is not None: + raise ValueError( + "If explain_options is empty, explain_metrics must be empty." + ) + + self._explain_options = explain_options + self._explain_metrics = explain_metrics + + @property + def explain_options(self) -> Optional[ExplainOptions]: + """Query profiling options for getting these query results.""" + return self._explain_options + + def get_explain_metrics(self) -> ExplainMetrics: + """ + Get the metrics associated with the query execution. + Metrics are only available when explain_options is set on the query. If + ExplainOptions.analyze is False, only plan_summary is available. If it is + True, execution_stats is also available. + :rtype: :class:`~google.cloud.firestore_v1.query_profile.ExplainMetrics` + :returns: The metrics associated with the query execution. + :raises: :class:`~google.cloud.firestore_v1.query_profile.QueryExplainError` + if explain_metrics is not available on the query. + """ + if self._explain_options is None: + raise QueryExplainError("explain_options not set on query.") + elif self._explain_metrics is None: + raise QueryExplainError( + "explain_metrics is empty despite explain_options is set." + ) + else: + return self._explain_metrics diff --git a/google/cloud/firestore_v1/stream_generator.py b/google/cloud/firestore_v1/stream_generator.py index 0a95af8d1f..7e39a3fbab 100644 --- a/google/cloud/firestore_v1/stream_generator.py +++ b/google/cloud/firestore_v1/stream_generator.py @@ -14,27 +14,98 @@ """Classes for iterating over stream results for the Google Cloud Firestore API. """ +from __future__ import annotations -from collections import abc +from typing import TYPE_CHECKING, Any, Generator, Optional, TypeVar +from google.cloud.firestore_v1.query_profile import ( + ExplainMetrics, + QueryExplainError, +) -class StreamGenerator(abc.Generator): - """Generator for the streamed results.""" +if TYPE_CHECKING: # pragma: NO COVER + from google.cloud.firestore_v1.query_profile import ExplainOptions - def __init__(self, response_generator): + +T = TypeVar("T") + + +class StreamGenerator(Generator[T, Any, Optional[ExplainMetrics]]): + """Generator for the streamed results. + + Args: + response_generator (Generator[T, Any, Optional[ExplainMetrics]]): + The inner generator that yields the returned document in the stream. + explain_options + (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Query profiling options for this stream request. + """ + + def __init__( + self, + response_generator: Generator[T, Any, Optional[ExplainMetrics]], + explain_options: Optional[ExplainOptions] = None, + ): self._generator = response_generator + self._explain_options = explain_options + self._explain_metrics = None - def __iter__(self): - return self._generator + def __iter__(self) -> StreamGenerator: + return self - def __next__(self): - return self._generator.__next__() + def __next__(self) -> T: + try: + return self._generator.__next__() + except StopIteration as e: + # If explain_metrics is available, it would be returned. + if e.value: + self._explain_metrics = ExplainMetrics._from_pb(e.value) + raise - def send(self, value=None): + def send(self, value: Any = None) -> T: return self._generator.send(value) - def throw(self, exp=None): - return self._generator.throw(exp) + def throw(self, *args, **kwargs) -> T: + return self._generator.throw(*args, **kwargs) def close(self): return self._generator.close() + + @property + def explain_options(self) -> ExplainOptions | None: + """Query profiling options for this stream request.""" + return self._explain_options + + def get_explain_metrics(self) -> ExplainMetrics: + """ + Get the metrics associated with the query execution. + Metrics are only available when explain_options is set on the query. If + ExplainOptions.analyze is False, only plan_summary is available. If it is + True, execution_stats is also available. + :rtype: :class:`~google.cloud.firestore_v1.query_profile.ExplainMetrics` + :returns: The metrics associated with the query execution. + :raises: :class:`~google.cloud.firestore_v1.query_profile.QueryExplainError` + if explain_metrics is not available on the query. + """ + if self._explain_metrics is not None: + return self._explain_metrics + elif self._explain_options is None: + raise QueryExplainError("explain_options not set on query.") + elif self._explain_options.analyze is False: + # We need to run the query to get the explain_metrics. Since no + # query results are returned, it's ok to discard the returned value. + try: + next(self) + except StopIteration: + pass + + if self._explain_metrics is None: + raise QueryExplainError( + "Did not receive explain_metrics for this query, despite " + "explain_options is set and analyze = False." + ) + else: + return self._explain_metrics + raise QueryExplainError( + "explain_metrics not available until query is complete." + ) diff --git a/google/cloud/firestore_v1/transaction.py b/google/cloud/firestore_v1/transaction.py index 8f92ddaf0d..ab79061efe 100644 --- a/google/cloud/firestore_v1/transaction.py +++ b/google/cloud/firestore_v1/transaction.py @@ -13,17 +13,15 @@ # limitations under the License. """Helpers for applying Google Cloud Firestore changes in a transaction.""" +from __future__ import annotations - -from typing import Any, Callable, Generator +from typing import TYPE_CHECKING, Any, Callable, Generator, Optional +import warnings from google.api_core import exceptions, gapic_v1 from google.api_core import retry as retries from google.cloud.firestore_v1 import _helpers, batch - -# Types needed only for Type Hints -from google.cloud.firestore_v1.base_document import DocumentSnapshot from google.cloud.firestore_v1.base_transaction import ( _CANT_BEGIN, _CANT_COMMIT, @@ -37,6 +35,12 @@ from google.cloud.firestore_v1.document import DocumentReference from google.cloud.firestore_v1.query import Query +# Types needed only for Type Hints +if TYPE_CHECKING: # pragma: NO COVER + from google.cloud.firestore_v1.base_document import DocumentSnapshot + from google.cloud.firestore_v1.query_profile import ExplainOptions + from google.cloud.firestore_v1.stream_generator import StreamGenerator + class Transaction(batch.WriteBatch, BaseTransaction): """Accumulate read-and-write operations to be sent in a transaction. @@ -172,7 +176,9 @@ def get( ref_or_query, retry: retries.Retry = gapic_v1.method.DEFAULT, timeout: float = None, - ) -> Generator[DocumentSnapshot, Any, None]: + *, + explain_options: Optional[ExplainOptions] = None, + ) -> StreamGenerator[DocumentSnapshot]: """Retrieve a document or a query result from the database. Args: @@ -181,6 +187,11 @@ def get( should be retried. Defaults to a system-specified policy. timeout (float): The timeout for this request. Defaults to a system-specified value. + explain_options + (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. + Can only be used when running a query. Yields: .DocumentSnapshot: The next document snapshot that fulfills the @@ -188,8 +199,16 @@ def get( """ kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) if isinstance(ref_or_query, DocumentReference): + if explain_options is not None: + warnings.warn( + "explain_options not supported in transanction with " + "document references and will be ignored. To use " + "explain_options, use transaction with query instead." + ) return self._client.get_all([ref_or_query], transaction=self, **kwargs) elif isinstance(ref_or_query, Query): + if explain_options is not None: + kwargs["explain_options"] = explain_options return ref_or_query.stream(transaction=self, **kwargs) else: raise ValueError( diff --git a/google/cloud/firestore_v1/transforms.py b/google/cloud/firestore_v1/transforms.py index ae061f6b30..5ec15b3dc2 100644 --- a/google/cloud/firestore_v1/transforms.py +++ b/google/cloud/firestore_v1/transforms.py @@ -102,7 +102,7 @@ class _NumericValue(object): """Hold a single integer / float value. Args: - value (int | float): value held in the helper. + value (float): value held in the helper. """ def __init__(self, value) -> None: @@ -116,7 +116,7 @@ def value(self): """Value used by the transform. Returns: - (Integer | Float) value passed in the constructor. + (Lloat) value passed in the constructor. """ return self._value @@ -133,7 +133,7 @@ class Increment(_NumericValue): https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.DocumentTransform.FieldTransform.FIELDS.google.firestore.v1.ArrayValue.google.firestore.v1.DocumentTransform.FieldTransform.increment Args: - value (int | float): value used to increment the field. + value (float): value used to increment the field. """ @@ -144,7 +144,7 @@ class Maximum(_NumericValue): https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.DocumentTransform.FieldTransform.FIELDS.google.firestore.v1.ArrayValue.google.firestore.v1.DocumentTransform.FieldTransform.maximum Args: - value (int | float): value used to bound the field. + value (float): value used to bound the field. """ @@ -155,5 +155,5 @@ class Minimum(_NumericValue): https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.DocumentTransform.FieldTransform.FIELDS.google.firestore.v1.ArrayValue.google.firestore.v1.DocumentTransform.FieldTransform.minimum Args: - value (int | float): value used to bound the field. + value (float): value used to bound the field. """ diff --git a/google/cloud/firestore_v1/vector_query.py b/google/cloud/firestore_v1/vector_query.py index a419dba63a..9e2d4ad0f0 100644 --- a/google/cloud/firestore_v1/vector_query.py +++ b/google/cloud/firestore_v1/vector_query.py @@ -14,12 +14,14 @@ """Classes for representing vector queries for the Google Cloud Firestore API. """ +from __future__ import annotations -from typing import TYPE_CHECKING, Any, Generator, Iterable, Optional, TypeVar, Union +from typing import TYPE_CHECKING, Any, Generator, Optional, TypeVar, Union from google.api_core import gapic_v1 from google.api_core import retry as retries +from google.cloud.firestore_v1.query_results import QueryResultsList from google.cloud.firestore_v1.base_query import ( BaseQuery, _collection_group_query_response_to_snapshot, @@ -32,6 +34,8 @@ if TYPE_CHECKING: # pragma: NO COVER from google.cloud.firestore_v1 import transaction from google.cloud.firestore_v1.base_document import DocumentSnapshot + from google.cloud.firestore_v1.query_profile import ExplainMetrics + from google.cloud.firestore_v1.query_profile import ExplainOptions TVectorQuery = TypeVar("TVectorQuery", bound="VectorQuery") @@ -55,7 +59,9 @@ def get( transaction=None, retry: retries.Retry = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, - ) -> Iterable["DocumentSnapshot"]: + *, + explain_options: Optional[ExplainOptions] = None, + ) -> QueryResultsList[DocumentSnapshot]: """Runs the vector query. This sends a ``RunQuery`` RPC and returns a list of document messages. @@ -71,20 +77,38 @@ def get( should be retried. Defaults to a system-specified policy. timeout (float): The timeout for this request. Defaults to a system-specified value. + explain_options + (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. Returns: - list: The vector query results. + QueryResultsList[DocumentSnapshot]: The vector query results. """ - result = self.stream(transaction=transaction, retry=retry, timeout=timeout) + explain_metrics: ExplainMetrics | None = None - return list(result) + result = self.stream( + transaction=transaction, + retry=retry, + timeout=timeout, + explain_options=explain_options, + ) + result_list = list(result) + + if explain_options is None: + explain_metrics = None + else: + explain_metrics = result.get_explain_metrics() + + return QueryResultsList(result_list, explain_options, explain_metrics) - def _get_stream_iterator(self, transaction, retry, timeout): + def _get_stream_iterator(self, transaction, retry, timeout, explain_options=None): """Helper method for :meth:`stream`.""" request, expected_prefix, kwargs = self._prep_stream( transaction, retry, timeout, + explain_options, ) response_iterator = self._client._firestore_api.run_query( @@ -100,7 +124,8 @@ def _make_stream( transaction: Optional["transaction.Transaction"] = None, retry: Optional[retries.Retry] = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, - ) -> Generator["DocumentSnapshot", Any, None]: + explain_options: Optional[ExplainOptions] = None, + ) -> Generator[DocumentSnapshot, Any, Optional[ExplainMetrics]]: """Reads the documents in the collection that match this query. This sends a ``RunQuery`` RPC and then returns a generator which @@ -120,15 +145,26 @@ def _make_stream( system-specified policy. timeout (Optional[float]): The timeout for this request. Defaults to a system-specified value. + explain_options + (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. Yields: - :class:`~google.cloud.firestore_v1.document.DocumentSnapshot`: + DocumentSnapshot: The next document that fulfills the query. + + Returns: + ([google.cloud.firestore_v1.types.query_profile.ExplainMetrtics | None]): + The results of query profiling, if received from the service. """ + metrics: ExplainMetrics | None = None + response_iterator, expected_prefix = self._get_stream_iterator( transaction, retry, timeout, + explain_options, ) while True: @@ -137,6 +173,9 @@ def _make_stream( if response is None: # EOI break + if metrics is None and response.explain_metrics: + metrics = response.explain_metrics + if self._nested_query._all_descendants: snapshot = _collection_group_query_response_to_snapshot( response, self._nested_query._parent @@ -148,12 +187,16 @@ def _make_stream( if snapshot is not None: yield snapshot + return metrics + def stream( self, transaction: Optional["transaction.Transaction"] = None, retry: Optional[retries.Retry] = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, - ) -> "StreamGenerator[DocumentSnapshot]": + *, + explain_options: Optional[ExplainOptions] = None, + ) -> StreamGenerator[DocumentSnapshot]: """Reads the documents in the collection that match this query. This sends a ``RunQuery`` RPC and then returns a generator which @@ -173,6 +216,10 @@ def stream( system-specified policy. timeout (Optinal[float]): The timeout for this request. Defaults to a system-specified value. + explain_options + (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. Returns: `StreamGenerator[DocumentSnapshot]`: A generator of the query results. @@ -181,5 +228,6 @@ def stream( transaction=transaction, retry=retry, timeout=timeout, + explain_options=explain_options, ) - return StreamGenerator(inner_generator) + return StreamGenerator(inner_generator, explain_options) diff --git a/tests/system/test_system.py b/tests/system/test_system.py index b67b8aecca..0ea52ea791 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -99,6 +99,124 @@ def test_collections_w_import(database): assert isinstance(collections, list) +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["stream", "get"]) +@pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) +def test_collection_stream_or_get_w_no_explain_options(database, query_docs, method): + from google.cloud.firestore_v1.query_profile import QueryExplainError + + collection, _, _ = query_docs + + # Tests either `stream()` or `get()`. + method_under_test = getattr(collection, method) + results = method_under_test() + + # verify explain_metrics isn't available + with pytest.raises( + QueryExplainError, + match="explain_options not set on query.", + ): + results.get_explain_metrics() + + +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["get", "stream"]) +@pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) +def test_collection_stream_or_get_w_explain_options_analyze_false( + database, method, query_docs +): + from google.cloud.firestore_v1.query_profile import ( + ExplainMetrics, + ExplainOptions, + PlanSummary, + QueryExplainError, + ) + + collection, _, _ = query_docs + + # Tests either `stream()` or `get()`. + method_under_test = getattr(collection, method) + results = method_under_test(explain_options=ExplainOptions(analyze=False)) + + # Verify explain_metrics and plan_summary. + explain_metrics = results.get_explain_metrics() + assert isinstance(explain_metrics, ExplainMetrics) + plan_summary = explain_metrics.plan_summary + assert isinstance(plan_summary, PlanSummary) + assert len(plan_summary.indexes_used) > 0 + assert plan_summary.indexes_used[0]["properties"] == "(__name__ ASC)" + assert plan_summary.indexes_used[0]["query_scope"] == "Collection" + + # Verify execution_stats isn't available. + with pytest.raises( + QueryExplainError, + match="execution_stats not available when explain_options.analyze=False", + ): + explain_metrics.execution_stats + + +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["get", "stream"]) +@pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) +def test_collection_stream_or_get_w_explain_options_analyze_true( + database, method, query_docs +): + from google.cloud.firestore_v1.query_profile import ( + ExecutionStats, + ExplainMetrics, + ExplainOptions, + PlanSummary, + QueryExplainError, + ) + + collection, _, _ = query_docs + + # Tests either `stream()` or `get()`. + method_under_test = getattr(collection, method) + results = method_under_test(explain_options=ExplainOptions(analyze=True)) + + # In the case of `stream()`, an exception should be raised when accessing + # explain_metrics before query finishes. + if method == "stream": + with pytest.raises( + QueryExplainError, + match="explain_metrics not available until query is complete", + ): + results.get_explain_metrics() + + # Finish iterating results, and explain_metrics should be available. + num_results = len(list(results)) + + # Verify explain_metrics and plan_summary. + explain_metrics = results.get_explain_metrics() + assert isinstance(explain_metrics, ExplainMetrics) + plan_summary = explain_metrics.plan_summary + assert isinstance(plan_summary, PlanSummary) + assert len(plan_summary.indexes_used) > 0 + assert plan_summary.indexes_used[0]["properties"] == "(__name__ ASC)" + assert plan_summary.indexes_used[0]["query_scope"] == "Collection" + + # Verify execution_stats. + execution_stats = explain_metrics.execution_stats + assert isinstance(execution_stats, ExecutionStats) + assert execution_stats.results_returned == num_results + assert execution_stats.read_operations == num_results + duration = execution_stats.execution_duration.total_seconds() + assert duration > 0 + assert duration < 1 # we expect a number closer to 0.05 + assert isinstance(execution_stats.debug_stats, dict) + assert "billing_details" in execution_stats.debug_stats + assert "documents_scanned" in execution_stats.debug_stats + assert "index_entries_scanned" in execution_stats.debug_stats + assert len(execution_stats.debug_stats) > 0 + + @pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) def test_create_document(client, cleanup, database): now = datetime.datetime.now(tz=datetime.timezone.utc) @@ -414,6 +532,156 @@ def test_vector_search_collection_group_with_distance_parameters_cosine( } +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["stream", "get"]) +@pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) +def test_vector_query_stream_or_get_w_no_explain_options(client, database, method): + from google.cloud.firestore_v1.query_profile import QueryExplainError + + collection_id = "vector_search" + collection_group = client.collection_group(collection_id) + + vector_query = collection_group.where("color", "==", "red").find_nearest( + vector_field="embedding", + query_vector=Vector([1.0, 2.0, 3.0]), + distance_measure=DistanceMeasure.EUCLIDEAN, + limit=1, + ) + + # Tests either `stream()` or `get()`. + method_under_test = getattr(vector_query, method) + results = method_under_test() + + # verify explain_metrics isn't available + with pytest.raises( + QueryExplainError, + match="explain_options not set on query.", + ): + results.get_explain_metrics() + + +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["stream", "get"]) +@pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) +def test_vector_query_stream_or_get_w_explain_options_analyze_true( + client, database, method +): + from google.cloud.firestore_v1.query_profile import ( + ExecutionStats, + ExplainMetrics, + ExplainOptions, + PlanSummary, + QueryExplainError, + ) + + collection_id = "vector_search" + collection_group = client.collection_group(collection_id) + + vector_query = collection_group.where("color", "==", "red").find_nearest( + vector_field="embedding", + query_vector=Vector([1.0, 2.0, 3.0]), + distance_measure=DistanceMeasure.EUCLIDEAN, + limit=1, + ) + + # Tests either `stream()` or `get()`. + method_under_test = getattr(vector_query, method) + results = method_under_test(explain_options=ExplainOptions(analyze=True)) + + # With `stream()`, an exception should be raised when accessing + # explain_metrics before query finishes. + if method == "stream": + with pytest.raises( + QueryExplainError, + match="explain_metrics not available until query is complete", + ): + results.get_explain_metrics() + + # Finish iterating results, and explain_metrics should be available. + num_results = len(list(results)) + + # Verify explain_metrics and plan_summary. + explain_metrics = results.get_explain_metrics() + assert isinstance(explain_metrics, ExplainMetrics) + plan_summary = explain_metrics.plan_summary + assert isinstance(plan_summary, PlanSummary) + assert len(plan_summary.indexes_used) > 0 + assert ( + plan_summary.indexes_used[0]["properties"] + == "(color ASC, __name__ ASC, embedding VECTOR<3>)" + ) + assert plan_summary.indexes_used[0]["query_scope"] == "Collection group" + + # Verify execution_stats. + execution_stats = explain_metrics.execution_stats + assert isinstance(execution_stats, ExecutionStats) + assert execution_stats.results_returned == num_results + assert execution_stats.read_operations > 0 + duration = execution_stats.execution_duration.total_seconds() + assert duration > 0 + assert duration < 1 # we expect a number closer to 0.05 + assert isinstance(execution_stats.debug_stats, dict) + assert "billing_details" in execution_stats.debug_stats + assert "documents_scanned" in execution_stats.debug_stats + assert "index_entries_scanned" in execution_stats.debug_stats + assert len(execution_stats.debug_stats) > 0 + + +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["stream", "get"]) +@pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) +def test_vector_query_stream_or_get_w_explain_options_analyze_false( + client, database, method +): + from google.cloud.firestore_v1.query_profile import ( + ExplainMetrics, + ExplainOptions, + PlanSummary, + QueryExplainError, + ) + + collection_id = "vector_search" + collection_group = client.collection_group(collection_id) + + vector_query = collection_group.where("color", "==", "red").find_nearest( + vector_field="embedding", + query_vector=Vector([1.0, 2.0, 3.0]), + distance_measure=DistanceMeasure.EUCLIDEAN, + limit=1, + ) + # Tests either `stream()` or `get()`. + method_under_test = getattr(vector_query, method) + results = method_under_test(explain_options=ExplainOptions(analyze=False)) + + results_list = list(results) + assert len(results_list) == 0 + + # Verify explain_metrics and plan_summary. + explain_metrics = results.get_explain_metrics() + assert isinstance(explain_metrics, ExplainMetrics) + plan_summary = explain_metrics.plan_summary + assert isinstance(plan_summary, PlanSummary) + assert len(plan_summary.indexes_used) > 0 + assert ( + plan_summary.indexes_used[0]["properties"] + == "(color ASC, __name__ ASC, embedding VECTOR<3>)" + ) + assert plan_summary.indexes_used[0]["query_scope"] == "Collection group" + + # Verify execution_stats isn't available. + with pytest.raises( + QueryExplainError, + match="execution_stats not available when explain_options.analyze=False", + ): + explain_metrics.execution_stats + + @pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) def test_create_document_w_subcollection(client, cleanup, database): collection_id = "doc-create-sub" + UNIQUE_RESOURCE_ID @@ -1056,6 +1324,131 @@ def test_query_stream_w_offset(query_docs, database): assert value["b"] == 2 +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["stream", "get"]) +@pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) +def test_query_stream_or_get_w_no_explain_options(query_docs, database, method): + from google.cloud.firestore_v1.query_profile import QueryExplainError + + collection, _, allowed_vals = query_docs + num_vals = len(allowed_vals) + query = collection.where(filter=FieldFilter("a", "in", [1, num_vals + 100])) + + # Tests either `stream()` or `get()`. + method_under_test = getattr(query, method) + results = method_under_test() + + # If no explain_option is passed, raise an exception if explain_metrics + # is called + with pytest.raises(QueryExplainError, match="explain_options not set on query"): + results.get_explain_metrics() + + +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["stream", "get"]) +@pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) +def test_query_stream_or_get_w_explain_options_analyze_true( + query_docs, database, method +): + from google.cloud.firestore_v1.query_profile import ( + ExecutionStats, + ExplainMetrics, + ExplainOptions, + PlanSummary, + QueryExplainError, + ) + + collection, _, allowed_vals = query_docs + num_vals = len(allowed_vals) + query = collection.where(filter=FieldFilter("a", "in", [1, num_vals + 100])) + + # Tests either `stream()` or `get()`. + method_under_test = getattr(query, method) + results = method_under_test(explain_options=ExplainOptions(analyze=True)) + + # With `stream()`, an exception should be raised when accessing + # explain_metrics before query finishes. + if method == "stream": + with pytest.raises( + QueryExplainError, + match="explain_metrics not available until query is complete", + ): + results.get_explain_metrics() + + # Finish iterating results, and explain_metrics should be available. + num_results = len(list(results)) + + # Verify explain_metrics and plan_summary. + explain_metrics = results.get_explain_metrics() + assert isinstance(explain_metrics, ExplainMetrics) + plan_summary = explain_metrics.plan_summary + assert isinstance(plan_summary, PlanSummary) + assert len(plan_summary.indexes_used) > 0 + assert plan_summary.indexes_used[0]["properties"] == "(a ASC, __name__ ASC)" + assert plan_summary.indexes_used[0]["query_scope"] == "Collection" + + # Verify execution_stats. + execution_stats = explain_metrics.execution_stats + assert isinstance(execution_stats, ExecutionStats) + assert execution_stats.results_returned == num_results + assert execution_stats.read_operations == num_results + duration = execution_stats.execution_duration.total_seconds() + assert duration > 0 + assert duration < 1 # we expect a number closer to 0.05 + assert isinstance(execution_stats.debug_stats, dict) + assert "billing_details" in execution_stats.debug_stats + assert "documents_scanned" in execution_stats.debug_stats + assert "index_entries_scanned" in execution_stats.debug_stats + assert len(execution_stats.debug_stats) > 0 + + +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["stream", "get"]) +@pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) +def test_query_stream_or_get_w_explain_options_analyze_false( + query_docs, database, method +): + from google.cloud.firestore_v1.query_profile import ( + ExplainMetrics, + ExplainOptions, + PlanSummary, + QueryExplainError, + ) + + collection, _, allowed_vals = query_docs + num_vals = len(allowed_vals) + query = collection.where(filter=FieldFilter("a", "in", [1, num_vals + 100])) + + # Tests either `stream()` or `get()`. + method_under_test = getattr(query, method) + results = method_under_test(explain_options=ExplainOptions(analyze=False)) + + results_list = list(results) + assert len(results_list) == 0 + + # Verify explain_metrics and plan_summary. + explain_metrics = results.get_explain_metrics() + assert isinstance(explain_metrics, ExplainMetrics) + plan_summary = explain_metrics.plan_summary + assert isinstance(plan_summary, PlanSummary) + assert len(plan_summary.indexes_used) > 0 + assert plan_summary.indexes_used[0]["properties"] == "(a ASC, __name__ ASC)" + assert plan_summary.indexes_used[0]["query_scope"] == "Collection" + + # Verify execution_stats isn't available. + with pytest.raises( + QueryExplainError, + match="execution_stats not available when explain_options.analyze=False", + ): + explain_metrics.execution_stats + + @pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) def test_query_with_order_dot_key(client, cleanup, database): db = client @@ -2428,6 +2821,140 @@ def test_avg_query_with_start_at(query, database): assert avg_result[0].value == expected_avg +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["stream", "get"]) +@pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) +def test_aggregation_query_stream_or_get_w_no_explain_options(query, database, method): + # Because all aggregation methods end up calling AggregationQuery.get() or + # AggregationQuery.stream(), only use count() for testing here. + from google.cloud.firestore_v1.query_profile import QueryExplainError + + result = query.get() + start_doc = result[1] + + # start new query that starts at the second result + count_query = query.start_at(start_doc).count("a") + + # Tests either `stream()` or `get()`. + method_under_test = getattr(count_query, method) + results = method_under_test() + + # If no explain_option is passed, raise an exception if explain_metrics + # is called + with pytest.raises(QueryExplainError, match="explain_options not set on query"): + results.get_explain_metrics() + + +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["stream", "get"]) +@pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) +def test_aggregation_query_stream_or_get_w_explain_options_analyze_true( + query, database, method +): + # Because all aggregation methods end up calling AggregationQuery.get() or + # AggregationQuery.stream(), only use count() for testing here. + from google.cloud.firestore_v1.query_profile import ( + ExecutionStats, + ExplainMetrics, + ExplainOptions, + PlanSummary, + QueryExplainError, + ) + + result = query.get() + start_doc = result[1] + + # start new query that starts at the second result + count_query = query.start_at(start_doc).count("a") + + # Tests either `stream()` or `get()`. + method_under_test = getattr(count_query, method) + results = method_under_test(explain_options=ExplainOptions(analyze=True)) + + # With `stream()`, an exception should be raised when accessing + # explain_metrics before query finishes. + if method == "stream": + with pytest.raises( + QueryExplainError, + match="explain_metrics not available until query is complete", + ): + results.get_explain_metrics() + + # Finish iterating results, and explain_metrics should be available. + num_results = len(list(results)) + + # Verify explain_metrics and plan_summary. + explain_metrics = results.get_explain_metrics() + assert isinstance(explain_metrics, ExplainMetrics) + plan_summary = explain_metrics.plan_summary + assert isinstance(plan_summary, PlanSummary) + assert len(plan_summary.indexes_used) > 0 + assert plan_summary.indexes_used[0]["properties"] == "(a ASC, __name__ ASC)" + assert plan_summary.indexes_used[0]["query_scope"] == "Collection" + + # Verify execution_stats. + execution_stats = explain_metrics.execution_stats + assert isinstance(execution_stats, ExecutionStats) + assert execution_stats.results_returned == num_results + assert execution_stats.read_operations == num_results + duration = execution_stats.execution_duration.total_seconds() + assert duration > 0 + assert duration < 1 # we expect a number closer to 0.05 + assert isinstance(execution_stats.debug_stats, dict) + assert "billing_details" in execution_stats.debug_stats + assert "documents_scanned" in execution_stats.debug_stats + assert "index_entries_scanned" in execution_stats.debug_stats + assert len(execution_stats.debug_stats) > 0 + + +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["stream", "get"]) +@pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) +def test_aggregation_query_stream_or_get_w_explain_options_analyze_false( + query, database, method +): + # Because all aggregation methods end up calling AggregationQuery.get() or + # AggregationQuery.stream(), only use count() for testing here. + from google.cloud.firestore_v1.query_profile import ( + ExplainMetrics, + ExplainOptions, + PlanSummary, + QueryExplainError, + ) + + result = query.get() + start_doc = result[1] + + # start new query that starts at the second result + count_query = query.start_at(start_doc).count("a") + + # Tests either `stream()` or `get()`. + method_under_test = getattr(count_query, method) + results = method_under_test(explain_options=ExplainOptions(analyze=False)) + + # Verify explain_metrics and plan_summary. + explain_metrics = results.get_explain_metrics() + assert isinstance(explain_metrics, ExplainMetrics) + plan_summary = explain_metrics.plan_summary + assert isinstance(plan_summary, PlanSummary) + assert len(plan_summary.indexes_used) > 0 + assert plan_summary.indexes_used[0]["properties"] == "(a ASC, __name__ ASC)" + assert plan_summary.indexes_used[0]["query_scope"] == "Collection" + + # Verify execution_stats isn't available. + with pytest.raises( + QueryExplainError, + match="execution_stats not available when explain_options.analyze=False", + ): + explain_metrics.execution_stats + + @pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) def test_query_with_and_composite_filter(collection, database): and_filter = And( @@ -2602,6 +3129,61 @@ def in_transaction(transaction): assert inner_fn_ran is True +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) +def test_query_in_transaction_with_explain_options(client, cleanup, database): + """ + Test query profiling in transactions. + """ + from google.cloud.firestore_v1.query_profile import ( + ExplainMetrics, + ExplainOptions, + QueryExplainError, + ) + + collection_id = "doc-create" + UNIQUE_RESOURCE_ID + doc_ids = [f"doc{i}" + UNIQUE_RESOURCE_ID for i in range(5)] + doc_refs = [client.document(collection_id, doc_id) for doc_id in doc_ids] + for doc_ref in doc_refs: + cleanup(doc_ref.delete) + doc_refs[0].create({"a": 1, "b": 2}) + doc_refs[1].create({"a": 1, "b": 1}) + + collection = client.collection(collection_id) + query = collection.where(filter=FieldFilter("a", "==", 1)) + + with client.transaction() as transaction: + # should work when transaction is initiated through transactional decorator + @firestore.transactional + def in_transaction(transaction): + global inner_fn_ran + + # When no explain_options value is passed, an exception shoud be + # raised when accessing explain_metrics. + result_1 = query.get(transaction=transaction) + with pytest.raises( + QueryExplainError, match="explain_options not set on query." + ): + result_1.get_explain_metrics() + + result_2 = query.get( + transaction=transaction, + explain_options=ExplainOptions(analyze=True), + ) + explain_metrics = result_2.get_explain_metrics() + assert isinstance(explain_metrics, ExplainMetrics) + assert explain_metrics.plan_summary is not None + assert explain_metrics.execution_stats is not None + + inner_fn_ran = True + + in_transaction(transaction) + # make sure we didn't skip assertions in inner function + assert inner_fn_ran is True + + @pytest.mark.parametrize("with_rollback,expected", [(True, 2), (False, 3)]) @pytest.mark.parametrize("database", [None, FIRESTORE_OTHER_DB], indirect=True) def test_transaction_rollback(client, cleanup, database, with_rollback, expected): diff --git a/tests/unit/v1/_test_helpers.py b/tests/unit/v1/_test_helpers.py index 564ec32bc3..39f27ee8c2 100644 --- a/tests/unit/v1/_test_helpers.py +++ b/tests/unit/v1/_test_helpers.py @@ -76,11 +76,20 @@ def make_async_aggregation_query(*args, **kw): return AsyncAggregationQuery(*args, **kw) -def make_aggregation_query_response(aggregations, read_time=None, transaction=None): +def make_aggregation_query_response( + aggregations, + read_time=None, + transaction=None, + explain_metrics=None, +): from google.cloud._helpers import _datetime_to_pb_timestamp from google.cloud.firestore_v1 import _helpers - from google.cloud.firestore_v1.types import aggregation_result, firestore + from google.cloud.firestore_v1.types import ( + aggregation_result, + firestore, + query_profile, + ) if read_time is None: now = datetime.datetime.now(tz=datetime.timezone.utc) @@ -99,6 +108,9 @@ def make_aggregation_query_response(aggregations, read_time=None, transaction=No if transaction is not None: kwargs["transaction"] = transaction + if explain_metrics is not None: + kwargs["explain_metrics"] = query_profile.ExplainMetrics(explain_metrics) + return firestore.RunAggregationQueryResponse(**kwargs) diff --git a/tests/unit/v1/test_aggregation.py b/tests/unit/v1/test_aggregation.py index 59fe5378c8..4d1eed1980 100644 --- a/tests/unit/v1/test_aggregation.py +++ b/tests/unit/v1/test_aggregation.py @@ -23,6 +23,9 @@ CountAggregation, SumAggregation, ) +from google.cloud.firestore_v1.query_profile import ExplainMetrics, QueryExplainError +from google.cloud.firestore_v1.query_results import QueryResultsList +from google.cloud.firestore_v1.stream_generator import StreamGenerator from tests.unit.v1._test_helpers import ( make_aggregation_query, make_aggregation_query_response, @@ -355,10 +358,45 @@ def test_aggregation_query_prep_stream_with_transaction(): assert kwargs == {"retry": None} -def _aggregation_query_get_helper(retry=None, timeout=None, read_time=None): +def test_aggregation_query_prep_stream_with_explain_options(): + from google.cloud.firestore_v1 import query_profile + + client = make_client() + parent = client.collection("dee") + query = make_query(parent) + aggregation_query = make_aggregation_query(query) + + aggregation_query.count(alias="all") + aggregation_query.sum("someref", alias="sumall") + aggregation_query.avg("anotherref", alias="avgall") + + explain_options = query_profile.ExplainOptions(analyze=True) + request, kwargs = aggregation_query._prep_stream(explain_options=explain_options) + + parent_path, _ = parent._parent_info() + expected_request = { + "parent": parent_path, + "structured_aggregation_query": aggregation_query._to_protobuf(), + "transaction": None, + "explain_options": explain_options._to_dict(), + } + assert request == expected_request + assert kwargs == {"retry": None} + + +def _aggregation_query_get_helper( + retry=None, + timeout=None, + read_time=None, + explain_options=None, +): from google.cloud._helpers import _datetime_to_pb_timestamp from google.cloud.firestore_v1 import _helpers + from google.cloud.firestore_v1.query_profile import ( + ExplainMetrics, + QueryExplainError, + ) # Create a minimal fake GAPIC. firestore_api = mock.Mock(spec=["run_aggregation_query"]) @@ -375,15 +413,21 @@ def _aggregation_query_get_helper(retry=None, timeout=None, read_time=None): aggregation_result = AggregationResult(alias="total", value=5, read_time=read_time) + if explain_options is not None: + explain_metrics = {"execution_stats": {"results_returned": 1}} + else: + explain_metrics = None response_pb = make_aggregation_query_response( - [aggregation_result], read_time=read_time + [aggregation_result], + read_time=read_time, + explain_metrics=explain_metrics, ) firestore_api.run_aggregation_query.return_value = iter([response_pb]) kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) # Execute the query and check the response. - returned = aggregation_query.get(**kwargs) - assert isinstance(returned, list) + returned = aggregation_query.get(**kwargs, explain_options=explain_options) + assert isinstance(returned, QueryResultsList) assert len(returned) == 1 for result in returned: @@ -394,14 +438,29 @@ def _aggregation_query_get_helper(retry=None, timeout=None, read_time=None): result_datetime = _datetime_to_pb_timestamp(r.read_time) assert result_datetime == read_time - # Verify the mock call. + assert returned._explain_options == explain_options + assert returned.explain_options == explain_options + + if explain_options is None: + with pytest.raises(QueryExplainError, match="explain_options not set"): + returned.get_explain_metrics() + else: + actual_explain_metrics = returned.get_explain_metrics() + assert isinstance(actual_explain_metrics, ExplainMetrics) + assert actual_explain_metrics.execution_stats.results_returned == 1 + parent_path, _ = parent._parent_info() + expected_request = { + "parent": parent_path, + "structured_aggregation_query": aggregation_query._to_protobuf(), + "transaction": None, + } + if explain_options is not None: + expected_request["explain_options"] = explain_options._to_dict() + + # Verify the mock call. firestore_api.run_aggregation_query.assert_called_once_with( - request={ - "parent": parent_path, - "structured_aggregation_query": aggregation_query._to_protobuf(), - "transaction": None, - }, + request=expected_request, metadata=client._rpc_metadata, **kwargs, ) @@ -482,6 +541,12 @@ def test_aggregation_query_get_transaction(): ) +def test_aggregation_query_get_w_explain_options(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + _aggregation_query_get_helper(explain_options=ExplainOptions(analyze=True)) + + _not_passed = object() @@ -604,6 +669,113 @@ def test_aggregation_query_stream_w_retriable_exc_w_transaction(): _aggregation_query_stream_w_retriable_exc_helper(transaction=txn) +def _aggregation_query_stream_helper( + retry=None, + timeout=None, + read_time=None, + explain_options=None, +): + from google.cloud._helpers import _datetime_to_pb_timestamp + + from google.cloud.firestore_v1 import _helpers + + # Create a minimal fake GAPIC. + firestore_api = mock.Mock(spec=["run_aggregation_query"]) + + # Attach the fake GAPIC to a real client. + client = make_client() + client._firestore_api_internal = firestore_api + + # Make a **real** collection reference as parent. + parent = client.collection("dee") + query = make_query(parent) + aggregation_query = make_aggregation_query(query) + aggregation_query.count(alias="all") + + if explain_options is not None and explain_options.analyze is False: + results_list = [] + else: + aggregation_result = AggregationResult( + alias="total", value=5, read_time=read_time + ) + results_list = [aggregation_result] + + if explain_options is not None: + explain_metrics = {"execution_stats": {"results_returned": 1}} + else: + explain_metrics = None + response_pb = make_aggregation_query_response( + results_list, + read_time=read_time, + explain_metrics=explain_metrics, + ) + firestore_api.run_aggregation_query.return_value = iter([response_pb]) + kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) + + # Execute the query and check the response. + returned = aggregation_query.stream(**kwargs, explain_options=explain_options) + assert isinstance(returned, StreamGenerator) + + results = [] + for result in returned: + for r in result: + assert r.alias == aggregation_result.alias + assert r.value == aggregation_result.value + if read_time is not None: + result_datetime = _datetime_to_pb_timestamp(r.read_time) + assert result_datetime == read_time + results.append(result) + assert len(results) == len(results_list) + + if explain_options is None: + with pytest.raises(QueryExplainError, match="explain_options not set"): + returned.get_explain_metrics() + else: + explain_metrics = returned.get_explain_metrics() + assert isinstance(explain_metrics, ExplainMetrics) + assert explain_metrics.execution_stats.results_returned == 1 + + parent_path, _ = parent._parent_info() + expected_request = { + "parent": parent_path, + "structured_aggregation_query": aggregation_query._to_protobuf(), + "transaction": None, + } + if explain_options is not None: + expected_request["explain_options"] = explain_options._to_dict() + + # Verify the mock call. + firestore_api.run_aggregation_query.assert_called_once_with( + request=expected_request, + metadata=client._rpc_metadata, + **kwargs, + ) + + +def test_aggregation_query_stream(): + _aggregation_query_stream_helper() + + +def test_aggregation_query_stream_with_readtime(): + from google.cloud._helpers import _datetime_to_pb_timestamp + + one_hour_ago = datetime.now(tz=timezone.utc) - timedelta(hours=1) + read_time = _datetime_to_pb_timestamp(one_hour_ago) + _aggregation_query_stream_helper(read_time=read_time) + + +def test_aggregation_query_stream_w_explain_options_analyze_true(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + _aggregation_query_stream_helper(explain_options=ExplainOptions(analyze=True)) + + +def test_aggregation_query_stream_w_explain_options_analyze_false(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + _aggregation_query_stream_helper(explain_options=ExplainOptions(analyze=False)) + + def test_aggregation_from_query(): from google.cloud.firestore_v1 import _helpers diff --git a/tests/unit/v1/test_base_document.py b/tests/unit/v1/test_base_document.py index 8098afd76a..b2dff117cd 100644 --- a/tests/unit/v1/test_base_document.py +++ b/tests/unit/v1/test_base_document.py @@ -362,6 +362,92 @@ def test_documentsnapshot_non_existent(): assert as_dict is None +def _make_query_results_list(*args, **kwargs): + from google.cloud.firestore_v1.query_results import QueryResultsList + + return QueryResultsList(*args, **kwargs) + + +def _make_explain_metrics(): + from google.cloud.firestore_v1.query_profile import ExplainMetrics, PlanSummary + + plan_summary = PlanSummary( + indexes_used=[{"properties": "(__name__ ASC)", "query_scope": "Collection"}], + ) + return ExplainMetrics(plan_summary=plan_summary) + + +def test_query_results_list_constructor(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + client = mock.sentinel.client + reference = _make_base_document_reference("hi", "bye", client=client) + data_1 = {"zoop": 83} + data_2 = {"zoop": 30} + snapshot_1 = _make_document_snapshot( + reference, + data_1, + True, + mock.sentinel.read_time, + mock.sentinel.create_time, + mock.sentinel.update_time, + ) + snapshot_2 = _make_document_snapshot( + reference, + data_2, + True, + mock.sentinel.read_time, + mock.sentinel.create_time, + mock.sentinel.update_time, + ) + explain_metrics = _make_explain_metrics() + explain_options = ExplainOptions(analyze=True) + snapshot_list = _make_query_results_list( + [snapshot_1, snapshot_2], + explain_options=explain_options, + explain_metrics=explain_metrics, + ) + assert len(snapshot_list) == 2 + assert snapshot_list[0] == snapshot_1 + assert snapshot_list[1] == snapshot_2 + assert snapshot_list._explain_options == explain_options + assert snapshot_list._explain_metrics == explain_metrics + + +def test_query_results_list_explain_options(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + explain_options = ExplainOptions(analyze=True) + explain_metrics = _make_explain_metrics() + snapshot_list = _make_query_results_list( + [], explain_options=explain_options, explain_metrics=explain_metrics + ) + + assert snapshot_list.explain_options == explain_options + + +def test_query_results_list_explain_metrics_w_explain_options(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + explain_metrics = _make_explain_metrics() + snapshot_list = _make_query_results_list( + [], + explain_options=ExplainOptions(analyze=True), + explain_metrics=explain_metrics, + ) + + assert snapshot_list.get_explain_metrics() == explain_metrics + + +def test_query_results_list_explain_metrics_wo_explain_options(): + from google.cloud.firestore_v1.query_profile import QueryExplainError + + snapshot_list = _make_query_results_list([]) + + with pytest.raises(QueryExplainError): + snapshot_list.get_explain_metrics() + + def test__get_document_path(): from google.cloud.firestore_v1.base_document import _get_document_path diff --git a/tests/unit/v1/test_base_query.py b/tests/unit/v1/test_base_query.py index 227b46933f..24caa5e40c 100644 --- a/tests/unit/v1/test_base_query.py +++ b/tests/unit/v1/test_base_query.py @@ -1962,11 +1962,12 @@ def _make_order_pb(field_path, direction): def _make_query_response(**kwargs): - # kwargs supported are ``skipped_results``, ``name`` and ``data`` + # kwargs supported are ``skipped_results``, ``name``, ``data`` + # and ``explain_metrics`` from google.cloud._helpers import _datetime_to_pb_timestamp from google.cloud.firestore_v1 import _helpers - from google.cloud.firestore_v1.types import document, firestore + from google.cloud.firestore_v1.types import document, firestore, query_profile now = datetime.datetime.now(tz=datetime.timezone.utc) read_time = _datetime_to_pb_timestamp(now) @@ -1984,6 +1985,10 @@ def _make_query_response(**kwargs): kwargs["document"] = document_pb + explain_metrics = kwargs.pop("explain_metrics", None) + if explain_metrics is not None: + kwargs["explain_metrics"] = query_profile.ExplainMetrics(explain_metrics) + return firestore.RunQueryResponse(**kwargs) diff --git a/tests/unit/v1/test_collection.py b/tests/unit/v1/test_collection.py index 98c83664e1..29f76108d1 100644 --- a/tests/unit/v1/test_collection.py +++ b/tests/unit/v1/test_collection.py @@ -385,6 +385,24 @@ def test_get_with_transaction(query_class): query_instance.get.assert_called_once_with(transaction=transaction) +@mock.patch("google.cloud.firestore_v1.query.Query", autospec=True) +def test_get_w_explain_options(query_class): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + explain_options = ExplainOptions(analyze=True) + collection = _make_collection_reference("collection") + get_response = collection.get(explain_options=explain_options) + + query_class.assert_called_once_with(collection) + query_instance = query_class.return_value + + assert get_response is query_instance.get.return_value + query_instance.get.assert_called_once_with( + transaction=None, + explain_options=explain_options, + ) + + @mock.patch("google.cloud.firestore_v1.query.Query", autospec=True) def test_stream(query_class): collection = _make_collection_reference("collection") @@ -427,6 +445,24 @@ def test_stream_with_transaction(query_class): query_instance.stream.assert_called_once_with(transaction=transaction) +@mock.patch("google.cloud.firestore_v1.query.Query", autospec=True) +def test_stream_w_explain_options(query_class): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + explain_options = ExplainOptions(analyze=True) + collection = _make_collection_reference("collection") + get_response = collection.stream(explain_options=explain_options) + + query_class.assert_called_once_with(collection) + query_instance = query_class.return_value + + assert get_response is query_instance.stream.return_value + query_instance.stream.assert_called_once_with( + transaction=None, + explain_options=explain_options, + ) + + @mock.patch("google.cloud.firestore_v1.collection.Watch", autospec=True) def test_on_snapshot(watch): collection = _make_collection_reference("collection") diff --git a/tests/unit/v1/test_query.py b/tests/unit/v1/test_query.py index b7add63f36..1774879022 100644 --- a/tests/unit/v1/test_query.py +++ b/tests/unit/v1/test_query.py @@ -18,6 +18,8 @@ import pytest from google.cloud.firestore_v1.base_client import DEFAULT_DATABASE +from google.cloud.firestore_v1.query_profile import ExplainMetrics, QueryExplainError +from google.cloud.firestore_v1.query_results import QueryResultsList from tests.unit.v1._test_helpers import DEFAULT_TEST_PROJECT, make_client, make_query from tests.unit.v1.test_base_query import _make_cursor_pb, _make_query_response @@ -35,7 +37,12 @@ def test_query_constructor(): assert not query._all_descendants -def _query_get_helper(retry=None, timeout=None, database=None): +def _query_get_helper( + retry=None, + timeout=None, + database=None, + explain_options=None, +): from google.cloud.firestore_v1 import _helpers # Create a minimal fake GAPIC. @@ -52,30 +59,48 @@ def _query_get_helper(retry=None, timeout=None, database=None): _, expected_prefix = parent._parent_info() name = "{}/sleep".format(expected_prefix) data = {"snooze": 10} + explain_metrics = {"execution_stats": {"results_returned": 1}} - response_pb = _make_query_response(name=name, data=data) + response_pb = _make_query_response( + name=name, + data=data, + explain_metrics=explain_metrics, + ) firestore_api.run_query.return_value = iter([response_pb]) kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) # Execute the query and check the response. query = make_query(parent) - returned = query.get(**kwargs) + returned = query.get(**kwargs, explain_options=explain_options) - assert isinstance(returned, list) + assert isinstance(returned, QueryResultsList) assert len(returned) == 1 snapshot = returned[0] assert snapshot.reference._path, "dee" == "sleep" assert snapshot.to_dict() == data - # Verify the mock call. + if explain_options is None: + with pytest.raises(QueryExplainError, match="explain_options not set"): + returned.get_explain_metrics() + else: + actual_explain_metrics = returned.get_explain_metrics() + assert isinstance(actual_explain_metrics, ExplainMetrics) + assert actual_explain_metrics.execution_stats.results_returned == 1 + + # Create expected request body. parent_path, _ = parent._parent_info() + request = { + "parent": parent_path, + "structured_query": query._to_protobuf(), + "transaction": None, + } + if explain_options: + request["explain_options"] = explain_options._to_dict() + + # Verify the mock call. firestore_api.run_query.assert_called_once_with( - request={ - "parent": parent_path, - "structured_query": query._to_protobuf(), - "transaction": None, - }, + request=request, metadata=client._rpc_metadata, **kwargs, ) @@ -149,6 +174,13 @@ def test_query_get_limit_to_last(database): ) +def test_query_get_w_explain_options(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + explain_options = ExplainOptions(analyze=True) + _query_get_helper(explain_options=explain_options) + + @pytest.mark.parametrize("database", [None, "somedb"]) def test_query_sum(database): from google.cloud.firestore_v1.base_aggregation import SumAggregation @@ -301,7 +333,12 @@ def test_query_chunkify_w_chunksize_gt_limit(database, expected): assert chunk_ids == expected_ids -def _query_stream_helper(retry=None, timeout=None, database=None): +def _query_stream_helper( + retry=None, + timeout=None, + database=None, + explain_options=None, +): from google.cloud.firestore_v1 import _helpers from google.cloud.firestore_v1.stream_generator import StreamGenerator @@ -319,14 +356,20 @@ def _query_stream_helper(retry=None, timeout=None, database=None): _, expected_prefix = parent._parent_info() name = "{}/sleep".format(expected_prefix) data = {"snooze": 10} - response_pb = _make_query_response(name=name, data=data) + if explain_options is not None: + explain_metrics = {"execution_stats": {"results_returned": 1}} + else: + explain_metrics = None + response_pb = _make_query_response( + name=name, data=data, explain_metrics=explain_metrics + ) firestore_api.run_query.return_value = iter([response_pb]) kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) # Execute the query and check the response. query = make_query(parent) - get_response = query.stream(**kwargs) + get_response = query.stream(**kwargs, explain_options=explain_options) assert isinstance(get_response, StreamGenerator) returned = list(get_response) @@ -335,14 +378,27 @@ def _query_stream_helper(retry=None, timeout=None, database=None): assert snapshot.reference._path == ("dee", "sleep") assert snapshot.to_dict() == data - # Verify the mock call. + if explain_options is None: + with pytest.raises(QueryExplainError, match="explain_options not set"): + get_response.get_explain_metrics() + else: + explain_metrics = get_response.get_explain_metrics() + assert isinstance(explain_metrics, ExplainMetrics) + assert explain_metrics.execution_stats.results_returned == 1 + + # Create expected request body. parent_path, _ = parent._parent_info() + request = { + "parent": parent_path, + "structured_query": query._to_protobuf(), + "transaction": None, + } + if explain_options is not None: + request["explain_options"] = explain_options._to_dict() + + # Verify the mock call. firestore_api.run_query.assert_called_once_with( - request={ - "parent": parent_path, - "structured_query": query._to_protobuf(), - "transaction": None, - }, + request=request, metadata=client._rpc_metadata, **kwargs, ) @@ -747,6 +803,13 @@ def test_query_stream_w_retriable_exc_w_transaction(): _query_stream_w_retriable_exc_helper(transaction=txn) +def test_query_stream_w_explain_options(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + explain_options = ExplainOptions(analyze=True) + _query_stream_helper(explain_options=explain_options) + + @mock.patch("google.cloud.firestore_v1.query.Watch", autospec=True) def test_query_on_snapshot(watch): query = make_query(mock.sentinel.parent) diff --git a/tests/unit/v1/test_query_profile.py b/tests/unit/v1/test_query_profile.py new file mode 100644 index 0000000000..a3b0390c61 --- /dev/null +++ b/tests/unit/v1/test_query_profile.py @@ -0,0 +1,126 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + + +def test_explain_metrics__from_pb(): + """ + Test creating an instance of ExplainMetrics from a protobuf. + """ + from google.cloud.firestore_v1.query_profile import ( + ExplainMetrics, + _ExplainAnalyzeMetrics, + QueryExplainError, + PlanSummary, + ) + from google.cloud.firestore_v1.types import query_profile as query_profile_pb2 + from google.protobuf import struct_pb2, duration_pb2 + + # test without execution_stats field + expected_metrics = query_profile_pb2.ExplainMetrics( + plan_summary=query_profile_pb2.PlanSummary( + indexes_used=struct_pb2.ListValue(values=[]) + ) + ) + metrics = ExplainMetrics._from_pb(expected_metrics) + assert isinstance(metrics, ExplainMetrics) + assert isinstance(metrics.plan_summary, PlanSummary) + assert metrics.plan_summary.indexes_used == [] + with pytest.raises(QueryExplainError) as exc: + metrics.execution_stats + assert "execution_stats not available when explain_options.analyze=False" in str( + exc.value + ) + # test with execution_stats field + expected_metrics.execution_stats = query_profile_pb2.ExecutionStats( + results_returned=1, + execution_duration=duration_pb2.Duration(seconds=2), + read_operations=3, + debug_stats=struct_pb2.Struct( + fields={"foo": struct_pb2.Value(string_value="bar")} + ), + ) + metrics = ExplainMetrics._from_pb(expected_metrics) + assert isinstance(metrics, ExplainMetrics) + assert isinstance(metrics, _ExplainAnalyzeMetrics) + assert metrics.execution_stats.results_returned == 1 + assert metrics.execution_stats.execution_duration.total_seconds() == 2 + assert metrics.execution_stats.read_operations == 3 + assert metrics.execution_stats.debug_stats == {"foo": "bar"} + + +def test_explain_metrics__from_pb_empty(): + """ + Test with empty ExplainMetrics protobuf. + """ + from google.cloud.firestore_v1.query_profile import ( + ExplainMetrics, + ExecutionStats, + _ExplainAnalyzeMetrics, + PlanSummary, + ) + from google.cloud.firestore_v1.types import query_profile as query_profile_pb2 + from google.protobuf import struct_pb2 + + expected_metrics = query_profile_pb2.ExplainMetrics( + plan_summary=query_profile_pb2.PlanSummary( + indexes_used=struct_pb2.ListValue(values=[]) + ), + execution_stats=query_profile_pb2.ExecutionStats(), + ) + metrics = ExplainMetrics._from_pb(expected_metrics) + assert isinstance(metrics, ExplainMetrics) + assert isinstance(metrics, _ExplainAnalyzeMetrics) + assert isinstance(metrics.plan_summary, PlanSummary) + assert isinstance(metrics.execution_stats, ExecutionStats) + assert metrics.plan_summary.indexes_used == [] + assert metrics.execution_stats.results_returned == 0 + assert metrics.execution_stats.execution_duration.total_seconds() == 0 + assert metrics.execution_stats.read_operations == 0 + assert metrics.execution_stats.debug_stats == {} + + +def test_explain_metrics_execution_stats(): + """ + Standard ExplainMetrics class should raise exception when execution_stats is accessed. + _ExplainAnalyzeMetrics should include the field + """ + from google.cloud.firestore_v1.query_profile import ( + ExplainMetrics, + QueryExplainError, + _ExplainAnalyzeMetrics, + ) + + metrics = ExplainMetrics(plan_summary=object()) + with pytest.raises(QueryExplainError) as exc: + metrics.execution_stats + assert "execution_stats not available when explain_options.analyze=False" in str( + exc.value + ) + expected_stats = object() + metrics = _ExplainAnalyzeMetrics( + plan_summary=object(), _execution_stats=expected_stats + ) + assert metrics.execution_stats is expected_stats + + +def test_explain_options__to_dict(): + """ + Should be able to create a dict representation of ExplainOptions + """ + from google.cloud.firestore_v1.query_profile import ExplainOptions + + assert ExplainOptions(analyze=True)._to_dict() == {"analyze": True} + assert ExplainOptions(analyze=False)._to_dict() == {"analyze": False} diff --git a/tests/unit/v1/test_query_results.py b/tests/unit/v1/test_query_results.py new file mode 100644 index 0000000000..59e7878de7 --- /dev/null +++ b/tests/unit/v1/test_query_results.py @@ -0,0 +1,158 @@ +# Copyright 2020 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import mock +import pytest + +from google.cloud.firestore_v1.query_profile import QueryExplainError + + +def _make_base_document_reference(*args, **kwargs): + from google.cloud.firestore_v1.base_document import BaseDocumentReference + + return BaseDocumentReference(*args, **kwargs) + + +def _make_document_snapshot(*args, **kwargs): + from google.cloud.firestore_v1.document import DocumentSnapshot + + return DocumentSnapshot(*args, **kwargs) + + +def _make_query_results_list(*args, **kwargs): + from google.cloud.firestore_v1.query_results import QueryResultsList + + return QueryResultsList(*args, **kwargs) + + +def _make_explain_metrics(): + from google.cloud.firestore_v1.query_profile import ExplainMetrics, PlanSummary + + plan_summary = PlanSummary( + indexes_used=[{"properties": "(__name__ ASC)", "query_scope": "Collection"}], + ) + return ExplainMetrics(plan_summary=plan_summary) + + +def test_query_results_list_constructor(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + client = mock.sentinel.client + reference = _make_base_document_reference("hi", "bye", client=client) + data_1 = {"zoop": 83} + data_2 = {"zoop": 30} + snapshot_1 = _make_document_snapshot( + reference, + data_1, + True, + mock.sentinel.read_time, + mock.sentinel.create_time, + mock.sentinel.update_time, + ) + snapshot_2 = _make_document_snapshot( + reference, + data_2, + True, + mock.sentinel.read_time, + mock.sentinel.create_time, + mock.sentinel.update_time, + ) + explain_metrics = _make_explain_metrics() + explain_options = ExplainOptions(analyze=True) + snapshot_list = _make_query_results_list( + [snapshot_1, snapshot_2], + explain_options=explain_options, + explain_metrics=explain_metrics, + ) + assert len(snapshot_list) == 2 + assert snapshot_list[0] == snapshot_1 + assert snapshot_list[1] == snapshot_2 + assert snapshot_list._explain_options == explain_options + assert snapshot_list._explain_metrics == explain_metrics + + +def test_query_results_list_constructor_w_explain_options_and_wo_explain_metrics(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + with pytest.raises( + ValueError, + match="If explain_options is set, explain_metrics must be non-empty.", + ): + _make_query_results_list( + [], + explain_options=ExplainOptions(analyze=True), + explain_metrics=None, + ) + + +def test_query_results_list_constructor_wo_explain_options_and_w_explain_metrics(): + with pytest.raises( + ValueError, match="If explain_options is empty, explain_metrics must be empty." + ): + _make_query_results_list( + [], + explain_options=None, + explain_metrics=_make_explain_metrics(), + ) + + +def test_query_results_list_explain_options(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + explain_options = ExplainOptions(analyze=True) + explain_metrics = _make_explain_metrics() + snapshot_list = _make_query_results_list( + [], explain_options=explain_options, explain_metrics=explain_metrics + ) + + assert snapshot_list.explain_options == explain_options + + +def test_query_results_list_explain_metrics_w_explain_options(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + explain_metrics = _make_explain_metrics() + snapshot_list = _make_query_results_list( + [], + explain_options=ExplainOptions(analyze=True), + explain_metrics=explain_metrics, + ) + + assert snapshot_list.get_explain_metrics() == explain_metrics + + +def test_query_results_list_explain_metrics_wo_explain_options(): + snapshot_list = _make_query_results_list([]) + + with pytest.raises(QueryExplainError, match="explain_options not set on query."): + snapshot_list.get_explain_metrics() + + +def test_query_results_list_explain_metrics_empty(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + explain_metrics = _make_explain_metrics() + snapshot_list = _make_query_results_list( + [], + explain_options=ExplainOptions(analyze=True), + explain_metrics=explain_metrics, + ) + snapshot_list._explain_metrics = None + + with pytest.raises( + QueryExplainError, + match="explain_metrics is empty despite explain_options is set.", + ): + snapshot_list.get_explain_metrics() diff --git a/tests/unit/v1/test_stream_generator.py b/tests/unit/v1/test_stream_generator.py index bfc11cf6f6..0e8a552607 100644 --- a/tests/unit/v1/test_stream_generator.py +++ b/tests/unit/v1/test_stream_generator.py @@ -14,8 +14,10 @@ import pytest +from google.protobuf import struct_pb2 -def _make_stream_generator(iterable): + +def _make_stream_generator(iterable, explain_options=None, explain_metrics=None): from google.cloud.firestore_v1.stream_generator import StreamGenerator def _inner_generator(): @@ -23,14 +25,27 @@ def _inner_generator(): X = yield i if X: yield X + return explain_metrics + + return StreamGenerator(_inner_generator(), explain_options) + + +def test_stream_generator_constructor(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + from google.cloud.firestore_v1.stream_generator import StreamGenerator + + explain_options = ExplainOptions(analyze=True) + inner_generator = object() + inst = StreamGenerator(inner_generator, explain_options) - return StreamGenerator(_inner_generator()) + assert inst._generator == inner_generator + assert inst._explain_options == explain_options + assert inst._explain_metrics is None def test_stream_generator_iter(): expected_results = [0, 1, 2] inst = _make_stream_generator(expected_results) - actual_results = [] for result in inst: actual_results.append(result) @@ -82,3 +97,159 @@ def test_stream_generator_close(): # Verifies that generator is closed. with pytest.raises(StopIteration): next(inst) + + +def test_stream_generator_explain_options(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + explain_options = ExplainOptions(analyze=True) + inst = _make_stream_generator([], explain_options) + assert inst.explain_options == explain_options + + +def test_stream_generator_explain_metrics_explain_options_analyze_true(): + from google.protobuf import duration_pb2 + from google.protobuf import struct_pb2 + + import google.cloud.firestore_v1.query_profile as query_profile + import google.cloud.firestore_v1.types.query_profile as query_profile_pb2 + + iterator = [1, 2] + + indexes_used_dict = { + "indexes_used": struct_pb2.Value( + struct_value=struct_pb2.Struct( + fields={ + "query_scope": struct_pb2.Value(string_value="Collection"), + "properties": struct_pb2.Value( + string_value="(foo ASC, **name** ASC)" + ), + } + ) + ) + } + plan_summary = query_profile_pb2.PlanSummary() + plan_summary.indexes_used.append(indexes_used_dict) + execution_stats = query_profile_pb2.ExecutionStats( + { + "results_returned": 1, + "execution_duration": duration_pb2.Duration(seconds=2), + "read_operations": 3, + "debug_stats": struct_pb2.Struct( + fields={ + "billing_details": struct_pb2.Value( + string_value="billing_details_results" + ), + "documents_scanned": struct_pb2.Value( + string_value="documents_scanned_results" + ), + "index_entries_scanned": struct_pb2.Value( + string_value="index_entries_scanned" + ), + } + ), + } + ) + + explain_options = query_profile.ExplainOptions(analyze=True) + expected_explain_metrics = query_profile_pb2.ExplainMetrics( + plan_summary=plan_summary, + execution_stats=execution_stats, + ) + + inst = _make_stream_generator(iterator, explain_options, expected_explain_metrics) + + # Raise an exception if query isn't complete when explain_metrics is called. + with pytest.raises( + query_profile.QueryExplainError, + match="explain_metrics not available until query is complete.", + ): + inst.get_explain_metrics() + + list(inst) + + actual_explain_metrics = inst.get_explain_metrics() + assert isinstance(actual_explain_metrics, query_profile._ExplainAnalyzeMetrics) + assert actual_explain_metrics == query_profile.ExplainMetrics._from_pb( + expected_explain_metrics + ) + assert actual_explain_metrics.plan_summary.indexes_used == [ + { + "indexes_used": { + "query_scope": "Collection", + "properties": "(foo ASC, **name** ASC)", + } + } + ] + assert actual_explain_metrics.execution_stats.results_returned == 1 + duration = actual_explain_metrics.execution_stats.execution_duration.total_seconds() + assert duration == 2 + assert actual_explain_metrics.execution_stats.read_operations == 3 + + expected_debug_stats = { + "billing_details": "billing_details_results", + "documents_scanned": "documents_scanned_results", + "index_entries_scanned": "index_entries_scanned", + } + assert actual_explain_metrics.execution_stats.debug_stats == expected_debug_stats + + +def test_stream_generator_explain_metrics_explain_options_analyze_false(): + import google.cloud.firestore_v1.query_profile as query_profile + import google.cloud.firestore_v1.types.query_profile as query_profile_pb2 + + iterator = [] + + explain_options = query_profile.ExplainOptions(analyze=False) + indexes_used_dict = { + "indexes_used": struct_pb2.Value( + struct_value=struct_pb2.Struct( + fields={ + "query_scope": struct_pb2.Value(string_value="Collection"), + "properties": struct_pb2.Value( + string_value="(foo ASC, **name** ASC)" + ), + } + ) + ) + } + plan_summary = query_profile_pb2.PlanSummary() + plan_summary.indexes_used.append(indexes_used_dict) + expected_explain_metrics = query_profile_pb2.ExplainMetrics( + plan_summary=plan_summary + ) + + inst = _make_stream_generator(iterator, explain_options, expected_explain_metrics) + actual_explain_metrics = inst.get_explain_metrics() + assert isinstance(actual_explain_metrics, query_profile.ExplainMetrics) + assert actual_explain_metrics.plan_summary.indexes_used == [ + { + "indexes_used": { + "query_scope": "Collection", + "properties": "(foo ASC, **name** ASC)", + } + } + ] + + +def test_stream_generator_explain_metrics_missing_explain_options_analyze_false(): + import google.cloud.firestore_v1.query_profile as query_profile + + explain_options = query_profile.ExplainOptions(analyze=False) + inst = _make_stream_generator([("1", None)], explain_options) + with pytest.raises( + query_profile.QueryExplainError, match="Did not receive explain_metrics" + ): + inst.get_explain_metrics() + + +def test_stream_generator_explain_metrics_no_explain_options(): + from google.cloud.firestore_v1.query_profile import QueryExplainError + + inst = _make_stream_generator([]) + + with pytest.raises( + QueryExplainError, + match="explain_options not set on query.", + ): + inst.get_explain_metrics() diff --git a/tests/unit/v1/test_transaction.py b/tests/unit/v1/test_transaction.py index d37be34ea0..c1be7fbcf4 100644 --- a/tests/unit/v1/test_transaction.py +++ b/tests/unit/v1/test_transaction.py @@ -328,7 +328,11 @@ def test_transaction_get_all_w_retry_timeout(): _transaction_get_all_helper(retry=retry, timeout=timeout) -def _transaction_get_w_document_ref_helper(retry=None, timeout=None): +def _transaction_get_w_document_ref_helper( + retry=None, + timeout=None, + explain_options=None, +): from google.cloud.firestore_v1 import _helpers from google.cloud.firestore_v1.document import DocumentReference @@ -337,8 +341,14 @@ def _transaction_get_w_document_ref_helper(retry=None, timeout=None): ref = DocumentReference("documents", "doc-id") kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) + if explain_options is not None: + kwargs["explain_options"] = explain_options + result = transaction.get(ref, **kwargs) + # explain_options should not be in the request even if it's provided. + kwargs.pop("explain_options", None) + assert result is client.get_all.return_value client.get_all.assert_called_once_with([ref], transaction=transaction, **kwargs) @@ -355,7 +365,22 @@ def test_transaction_get_w_document_ref_w_retry_timeout(): _transaction_get_w_document_ref_helper(retry=retry, timeout=timeout) -def _transaction_get_w_query_helper(retry=None, timeout=None): +def test_transaction_get_w_document_ref_w_explain_options(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + with pytest.warns(UserWarning) as warned: + _transaction_get_w_document_ref_helper( + explain_options=ExplainOptions(analyze=True), + ) + assert len(warned) == 1 + assert "not supported in transanction with document" in str(warned[0]) + + +def _transaction_get_w_query_helper( + retry=None, + timeout=None, + explain_options=None, +): from google.cloud.firestore_v1 import _helpers from google.cloud.firestore_v1.query import Query @@ -364,6 +389,8 @@ def _transaction_get_w_query_helper(retry=None, timeout=None): query = Query(parent=mock.Mock(spec=[])) query.stream = mock.MagicMock() kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) + if explain_options is not None: + kwargs["explain_options"] = explain_options result = transaction.get(query, **kwargs) @@ -383,6 +410,12 @@ def test_transaction_get_w_query_w_retry_timeout(): _transaction_get_w_query_helper(retry=retry, timeout=timeout) +def test_transaction_get_w_query_w_explain_options(): + from google.cloud.firestore_v1.query_profile import ExplainOptions + + _transaction_get_w_query_helper(explain_options=ExplainOptions(analyze=True)) + + @pytest.mark.parametrize("database", [None, "somedb"]) def test_transaction_get_failure(database): client = _make_client(database=database) diff --git a/tests/unit/v1/test_vector_query.py b/tests/unit/v1/test_vector_query.py index a5b1d342bd..61ae866e8a 100644 --- a/tests/unit/v1/test_vector_query.py +++ b/tests/unit/v1/test_vector_query.py @@ -17,6 +17,12 @@ from google.cloud.firestore_v1._helpers import encode_value, make_retry_timeout_kwargs from google.cloud.firestore_v1.base_vector_query import DistanceMeasure +from google.cloud.firestore_v1.query_profile import ( + ExplainMetrics, + ExplainOptions, + QueryExplainError, +) +from google.cloud.firestore_v1.query_results import QueryResultsList from google.cloud.firestore_v1.types.query import StructuredQuery from google.cloud.firestore_v1.vector import Vector from tests.unit.v1._test_helpers import make_client, make_query, make_vector_query @@ -146,21 +152,7 @@ def _expected_pb( return expected_pb -@pytest.mark.parametrize( - "distance_measure, expected_distance", - [ - ( - DistanceMeasure.EUCLIDEAN, - StructuredQuery.FindNearest.DistanceMeasure.EUCLIDEAN, - ), - (DistanceMeasure.COSINE, StructuredQuery.FindNearest.DistanceMeasure.COSINE), - ( - DistanceMeasure.DOT_PRODUCT, - StructuredQuery.FindNearest.DistanceMeasure.DOT_PRODUCT, - ), - ], -) -def test_vector_query(distance_measure, expected_distance): +def _vector_query_get_helper(distance_measure, expected_distance, explain_options=None): # Create a minimal fake GAPIC. firestore_api = mock.Mock(spec=["run_query"]) client = make_client() @@ -171,8 +163,14 @@ def test_vector_query(distance_measure, expected_distance): parent_path, expected_prefix = parent._parent_info() data = {"snooze": 10, "embedding": Vector([1.0, 2.0, 3.0])} + if explain_options is not None: + explain_metrics = {"execution_stats": {"results_returned": 1}} + else: + explain_metrics = None response_pb = _make_query_response( - name="{}/test_doc".format(expected_prefix), data=data + name="{}/test_doc".format(expected_prefix), + data=data, + explain_metrics=explain_metrics, ) kwargs = make_retry_timeout_kwargs(retry=None, timeout=None) @@ -187,11 +185,21 @@ def test_vector_query(distance_measure, expected_distance): limit=5, ) - returned = vector_query.get(transaction=_transaction(client), **kwargs) - assert isinstance(returned, list) + returned = vector_query.get( + transaction=_transaction(client), **kwargs, explain_options=explain_options + ) + assert isinstance(returned, QueryResultsList) assert len(returned) == 1 assert returned[0].to_dict() == data + if explain_options is None: + with pytest.raises(QueryExplainError, match="explain_options not set"): + returned.get_explain_metrics() + else: + actual_explain_metrics = returned.get_explain_metrics() + assert isinstance(actual_explain_metrics, ExplainMetrics) + assert actual_explain_metrics.execution_stats.results_returned == 1 + expected_pb = _expected_pb( parent=parent, vector_field="embedding", @@ -199,17 +207,104 @@ def test_vector_query(distance_measure, expected_distance): distance_type=expected_distance, limit=5, ) + expected_request = { + "parent": parent_path, + "structured_query": expected_pb, + "transaction": _TXN_ID, + } + if explain_options is not None: + expected_request["explain_options"] = explain_options._to_dict() firestore_api.run_query.assert_called_once_with( - request={ - "parent": parent_path, - "structured_query": expected_pb, - "transaction": _TXN_ID, - }, + request=expected_request, metadata=client._rpc_metadata, **kwargs, ) +@pytest.mark.parametrize( + "distance_measure, expected_distance", + [ + ( + DistanceMeasure.EUCLIDEAN, + StructuredQuery.FindNearest.DistanceMeasure.EUCLIDEAN, + ), + (DistanceMeasure.COSINE, StructuredQuery.FindNearest.DistanceMeasure.COSINE), + ( + DistanceMeasure.DOT_PRODUCT, + StructuredQuery.FindNearest.DistanceMeasure.DOT_PRODUCT, + ), + ], +) +def test_vector_query(distance_measure, expected_distance): + _vector_query_get_helper( + distance_measure=distance_measure, expected_distance=expected_distance + ) + + +def test_vector_query_w_explain_options(): + explain_options = ExplainOptions(analyze=True) + _vector_query_get_helper( + distance_measure=DistanceMeasure.EUCLIDEAN, + expected_distance=StructuredQuery.FindNearest.DistanceMeasure.EUCLIDEAN, + explain_options=explain_options, + ) + # # Create a minimal fake GAPIC. + # firestore_api = mock.Mock(spec=["run_query"]) + # client = make_client() + # client._firestore_api_internal = firestore_api + + # # Make a **real** collection reference as parent. + # parent = client.collection("dee") + # parent_path, expected_prefix = parent._parent_info() + + # data = {"snooze": 10, "embedding": Vector([1.0, 2.0, 3.0])} + # response_pb = _make_query_response( + # name="{}/test_doc".format(expected_prefix), + # data=data, + # explain_metrics={"execution_stats": {"results_returned": 1}}, + # ) + + # kwargs = make_retry_timeout_kwargs(retry=None, timeout=None) + + # # Execute the vector query and check the response. + # firestore_api.run_query.return_value = iter([response_pb]) + # vector_query = parent.find_nearest( + # vector_field="embedding", + # query_vector=Vector([1.0, 2.0, 3.0]), + # distance_measure=DistanceMeasure.EUCLIDEAN, + # limit=5, + # ) + + # explain_options = ExplainOptions(analyze=True) + # returned = vector_query.get( + # transaction=_transaction(client), + # **kwargs, + # explain_options=explain_options, + # ) + # assert isinstance(returned, QueryResultsList) + # assert len(returned) == 1 + # assert returned[0].to_dict() == data + # assert returned.explain_metrics is not None + + # expected_pb = _expected_pb( + # parent=parent, + # vector_field="embedding", + # vector=Vector([1.0, 2.0, 3.0]), + # distance_type=StructuredQuery.FindNearest.DistanceMeasure.EUCLIDEAN, + # limit=5, + # ) + # firestore_api.run_query.assert_called_once_with( + # request={ + # "parent": parent_path, + # "structured_query": expected_pb, + # "transaction": _TXN_ID, + # "explain_options": explain_options._to_dict(), + # }, + # metadata=client._rpc_metadata, + # **kwargs, + # ) + + @pytest.mark.parametrize( "distance_measure, expected_distance", [