Skip to content

Commit

Permalink
change stream generator to return ExplainMetrics instead of yield
Browse files Browse the repository at this point in the history
  • Loading branch information
Linchin committed Aug 12, 2024
1 parent 05424bf commit 5e15e6a
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 40 deletions.
17 changes: 7 additions & 10 deletions google/cloud/firestore_v1/stream_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,13 @@ def __iter__(self):
return self

def __next__(self):
next_value, explain_metrics = self._generator.__next__()

if explain_metrics is not None:
self._explain_metrics = ExplainMetrics._from_pb(explain_metrics)

# Need to run the following iteration too, to ensure the length of
# the iteration isn't increased due to yielding explain_metrics.
return next(self)
else:
return next_value
try:
return self._generator.__next__()
except StopIteration as e:
# If explain_metrics is available, it would be returned.
if e.value:
self._explain_metrics = ExplainMetrics._from_pb(e.value)
raise

def send(self, value=None):
return self._generator.send(value)
Expand Down
124 changes: 94 additions & 30 deletions tests/unit/v1/test_stream_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
from google.protobuf import struct_pb2


def _make_stream_generator(iterable, explain_options=None):
def _make_stream_generator(iterable, explain_options=None, explain_metrics=None):
from google.cloud.firestore_v1.stream_generator import StreamGenerator

def _inner_generator():
for i in iterable:
X = yield i
if X:
yield X
return explain_metrics

return StreamGenerator(_inner_generator(), explain_options)

Expand All @@ -43,9 +44,8 @@ def test_stream_generator_constructor():


def test_stream_generator_iter():
iterable = [(0, None), (1, None), (2, None)]
expected_results = [0, 1, 2]
inst = _make_stream_generator(iterable)
inst = _make_stream_generator(expected_results)
actual_results = []
for result in inst:
actual_results.append(result)
Expand All @@ -54,9 +54,8 @@ def test_stream_generator_iter():


def test_stream_generator_next():
iterable = [(0, None), (1, None)]
expected_results = [0, 1]
inst = _make_stream_generator(iterable)
inst = _make_stream_generator(expected_results)

actual_results = []
actual_results.append(next(inst))
Expand All @@ -69,9 +68,8 @@ def test_stream_generator_next():


def test_stream_generator_send():
iterable = [(0, None), (1, None)]
expected_results = [0, 1]
inst = _make_stream_generator(iterable)
inst = _make_stream_generator(expected_results)

actual_results = []
actual_results.append(next(inst))
Expand Down Expand Up @@ -110,22 +108,56 @@ def test_stream_generator_explain_options():


def test_stream_generator_explain_metrics_explain_options_analyze_true():
from google.protobuf import duration_pb2
from google.protobuf import struct_pb2

import google.cloud.firestore_v1.query_profile as query_profile
import google.cloud.firestore_v1.types.query_profile as query_profile_pb2

iterator = [
(1, None),
(
None,
query_profile_pb2.ExplainMetrics(
plan_summary=query_profile_pb2.PlanSummary()
iterator = [1, 2]

indexes_used_dict = {
"indexes_used": struct_pb2.Value(
struct_value=struct_pb2.Struct(
fields={
"query_scope": struct_pb2.Value(string_value="Collection"),
"properties": struct_pb2.Value(
string_value="(foo ASC, **name** ASC)"
),
}
)
)
}
plan_summary = query_profile_pb2.PlanSummary()
plan_summary.indexes_used.append(indexes_used_dict)
execution_stats = query_profile_pb2.ExecutionStats(
{
"results_returned": 1,
"execution_duration": duration_pb2.Duration(seconds=2),
"read_operations": 3,
"debug_stats": struct_pb2.Struct(
fields={
"billing_details": struct_pb2.Value(
string_value="billing_details_results"
),
"documents_scanned": struct_pb2.Value(
string_value="documents_scanned_results"
),
"index_entries_scanned": struct_pb2.Value(
string_value="index_entries_scanned"
),
}
),
),
(2, None),
]
}
)

explain_options = query_profile.ExplainOptions(analyze=True)
inst = _make_stream_generator(iterator, explain_options)
explain_metrics = query_profile_pb2.ExplainMetrics(
plan_summary=plan_summary,
execution_stats=execution_stats,
)

inst = _make_stream_generator(iterator, explain_options, explain_metrics)

# Raise an exception if query isn't complete when explain_metrics is called.
with pytest.raises(
Expand All @@ -136,33 +168,65 @@ def test_stream_generator_explain_metrics_explain_options_analyze_true():

list(inst)

assert isinstance(inst.explain_metrics, query_profile.ExplainMetrics)
assert isinstance(inst.explain_metrics, query_profile._ExplainAnalyzeMetrics)
assert inst.explain_metrics == query_profile.ExplainMetrics._from_pb(
explain_metrics
)
assert inst.explain_metrics.plan_summary.indexes_used == [
{
"indexes_used": {
"query_scope": "Collection",
"properties": "(foo ASC, **name** ASC)",
}
}
]
assert inst.explain_metrics.execution_stats.results_returned == 1
duration = inst.explain_metrics.execution_stats.execution_duration.total_seconds()
assert duration == 2
assert inst.explain_metrics.execution_stats.read_operations == 3

expected_debug_stats = {
"billing_details": "billing_details_results",
"documents_scanned": "documents_scanned_results",
"index_entries_scanned": "index_entries_scanned",
}
assert inst.explain_metrics.execution_stats.debug_stats == expected_debug_stats


def test_stream_generator_explain_metrics_explain_options_analyze_false():
import google.cloud.firestore_v1.query_profile as query_profile
import google.cloud.firestore_v1.types.query_profile as query_profile_pb2

plan_summary = query_profile_pb2.PlanSummary(
indexes_used=struct_pb2.ListValue(values=[])
)
(
iterator = []

explain_options = query_profile.ExplainOptions(analyze=False)
indexes_used_dict = {
"indexes_used": struct_pb2.Value(
struct_value=struct_pb2.Struct(
fields={
"query_scope": struct_pb2.Value(string_value="Collection"),
"properties": struct_pb2.Value(
string_value="(foo ASC, **name** ASC)"
),
}
)
)
}
plan_summary = query_profile_pb2.PlanSummary()
plan_summary.indexes_used.append(indexes_used_dict)
explain_metrics = query_profile_pb2.ExplainMetrics(plan_summary=plan_summary)

inst = _make_stream_generator(iterator, explain_options, explain_metrics)
assert isinstance(inst.explain_metrics, query_profile.ExplainMetrics)
assert inst.explain_metrics.plan_summary.indexes_used == [
{
"indexes_used": {
"query_scope": "Collection",
"properties": "(foo ASC, **name** ASC)",
}
}
)

iterator = [
(None, query_profile_pb2.ExplainMetrics(plan_summary=plan_summary)),
]

explain_options = query_profile.ExplainOptions(analyze=False)
inst = _make_stream_generator(iterator, explain_options)
assert isinstance(inst.explain_metrics, query_profile.ExplainMetrics)


def test_stream_generator_explain_metrics_missing_explain_options_analyze_false():
import google.cloud.firestore_v1.query_profile as query_profile
Expand Down

0 comments on commit 5e15e6a

Please sign in to comment.