diff --git a/google/cloud/firestore_v1/async_transaction.py b/google/cloud/firestore_v1/async_transaction.py index 6b01fffd6c..7281a68e56 100644 --- a/google/cloud/firestore_v1/async_transaction.py +++ b/google/cloud/firestore_v1/async_transaction.py @@ -15,14 +15,12 @@ """Helpers for applying Google Cloud Firestore changes in a transaction.""" -import asyncio -import random from typing import Any, AsyncGenerator, Callable, Coroutine from google.api_core import exceptions, gapic_v1 from google.api_core import retry_async as retries -from google.cloud.firestore_v1 import _helpers, async_batch, types +from google.cloud.firestore_v1 import _helpers, async_batch from google.cloud.firestore_v1.async_document import ( AsyncDocumentReference, DocumentSnapshot, @@ -33,18 +31,12 @@ _CANT_COMMIT, _CANT_ROLLBACK, _EXCEED_ATTEMPTS_TEMPLATE, - _INITIAL_SLEEP, - _MAX_SLEEP, - _MULTIPLIER, _WRITE_READ_ONLY, MAX_ATTEMPTS, BaseTransaction, _BaseTransactional, ) -# Types needed only for Type Hints -from google.cloud.firestore_v1.client import Client - class AsyncTransaction(async_batch.AsyncWriteBatch, BaseTransaction): """Accumulate read-and-write operations to be sent in a transaction. @@ -140,8 +132,13 @@ async def _commit(self) -> list: if not self.in_progress: raise ValueError(_CANT_COMMIT) - commit_response = await _commit_with_retry( - self._client, self._write_pbs, self._id + commit_response = await self._client._firestore_api.commit( + request={ + "database": self._client._database_string, + "writes": self._write_pbs, + "transaction": self._id, + }, + metadata=self._client._rpc_metadata, ) self._clean_up() @@ -313,76 +310,3 @@ def async_transactional( the wrapped callable. """ return _AsyncTransactional(to_wrap) - - -# TODO(crwilcox): this was 'coroutine' from pytype merge-pyi... -async def _commit_with_retry( - client: Client, write_pbs: list, transaction_id: bytes -) -> types.CommitResponse: - """Call ``Commit`` on the GAPIC client with retry / sleep. - - Retries the ``Commit`` RPC on Unavailable. Usually this RPC-level - retry is handled by the underlying GAPICd client, but in this case it - doesn't because ``Commit`` is not always idempotent. But here we know it - is "idempotent"-like because it has a transaction ID. We also need to do - our own retry to special-case the ``INVALID_ARGUMENT`` error. - - Args: - client (:class:`~google.cloud.firestore_v1.client.Client`): - A client with GAPIC client and configuration details. - write_pbs (List[:class:`google.cloud.proto.firestore.v1.write.Write`, ...]): - A ``Write`` protobuf instance to be committed. - transaction_id (bytes): - ID of an existing transaction that this commit will run in. - - Returns: - :class:`google.cloud.firestore_v1.types.CommitResponse`: - The protobuf response from ``Commit``. - - Raises: - ~google.api_core.exceptions.GoogleAPICallError: If a non-retryable - exception is encountered. - """ - current_sleep = _INITIAL_SLEEP - while True: - try: - return await client._firestore_api.commit( - request={ - "database": client._database_string, - "writes": write_pbs, - "transaction": transaction_id, - }, - metadata=client._rpc_metadata, - ) - except exceptions.ServiceUnavailable: - # Retry - pass - - current_sleep = await _sleep(current_sleep) - - -async def _sleep( - current_sleep: float, max_sleep: float = _MAX_SLEEP, multiplier: float = _MULTIPLIER -) -> float: - """Sleep and produce a new sleep time. - - .. _Exponential Backoff And Jitter: https://www.awsarchitectureblog.com/\ - 2015/03/backoff.html - - Select a duration between zero and ``current_sleep``. It might seem - counterintuitive to have so much jitter, but - `Exponential Backoff And Jitter`_ argues that "full jitter" is - the best strategy. - - Args: - current_sleep (float): The current "max" for sleep interval. - max_sleep (Optional[float]): Eventual "max" sleep time - multiplier (Optional[float]): Multiplier for exponential backoff. - - Returns: - float: Newly doubled ``current_sleep`` or ``max_sleep`` (whichever - is smaller) - """ - actual_sleep = random.uniform(0.0, current_sleep) - await asyncio.sleep(actual_sleep) - return min(multiplier * current_sleep, max_sleep) diff --git a/google/cloud/firestore_v1/base_transaction.py b/google/cloud/firestore_v1/base_transaction.py index 5b6e76e1b0..09f0c1fb9a 100644 --- a/google/cloud/firestore_v1/base_transaction.py +++ b/google/cloud/firestore_v1/base_transaction.py @@ -39,12 +39,6 @@ _CANT_ROLLBACK: str = _MISSING_ID_TEMPLATE.format("rolled back") _CANT_COMMIT: str = _MISSING_ID_TEMPLATE.format("committed") _WRITE_READ_ONLY: str = "Cannot perform write operation in read-only transaction." -_INITIAL_SLEEP: float = 1.0 -"""float: Initial "max" for sleep interval. To be used in :func:`_sleep`.""" -_MAX_SLEEP: float = 30.0 -"""float: Eventual "max" sleep time. To be used in :func:`_sleep`.""" -_MULTIPLIER: float = 2.0 -"""float: Multiplier for exponential backoff. To be used in :func:`_sleep`.""" _EXCEED_ATTEMPTS_TEMPLATE: str = "Failed to commit transaction in {:d} attempts." _CANT_RETRY_READ_ONLY: str = "Only read-write transactions can be retried." diff --git a/google/cloud/firestore_v1/transaction.py b/google/cloud/firestore_v1/transaction.py index 1691b56792..8f92ddaf0d 100644 --- a/google/cloud/firestore_v1/transaction.py +++ b/google/cloud/firestore_v1/transaction.py @@ -15,8 +15,6 @@ """Helpers for applying Google Cloud Firestore changes in a transaction.""" -import random -import time from typing import Any, Callable, Generator from google.api_core import exceptions, gapic_v1 @@ -31,9 +29,6 @@ _CANT_COMMIT, _CANT_ROLLBACK, _EXCEED_ATTEMPTS_TEMPLATE, - _INITIAL_SLEEP, - _MAX_SLEEP, - _MULTIPLIER, _WRITE_READ_ONLY, MAX_ATTEMPTS, BaseTransaction, @@ -41,7 +36,6 @@ ) from google.cloud.firestore_v1.document import DocumentReference from google.cloud.firestore_v1.query import Query -from google.cloud.firestore_v1.types import CommitResponse class Transaction(batch.WriteBatch, BaseTransaction): @@ -138,7 +132,14 @@ def _commit(self) -> list: if not self.in_progress: raise ValueError(_CANT_COMMIT) - commit_response = _commit_with_retry(self._client, self._write_pbs, self._id) + commit_response = self._client._firestore_api.commit( + request={ + "database": self._client._database_string, + "writes": self._write_pbs, + "transaction": self._id, + }, + metadata=self._client._rpc_metadata, + ) self._clean_up() return list(commit_response.write_results) @@ -301,75 +302,3 @@ def transactional(to_wrap: Callable) -> _Transactional: the wrapped callable. """ return _Transactional(to_wrap) - - -def _commit_with_retry( - client, write_pbs: list, transaction_id: bytes -) -> CommitResponse: - """Call ``Commit`` on the GAPIC client with retry / sleep. - - Retries the ``Commit`` RPC on Unavailable. Usually this RPC-level - retry is handled by the underlying GAPICd client, but in this case it - doesn't because ``Commit`` is not always idempotent. But here we know it - is "idempotent"-like because it has a transaction ID. We also need to do - our own retry to special-case the ``INVALID_ARGUMENT`` error. - - Args: - client (:class:`~google.cloud.firestore_v1.client.Client`): - A client with GAPIC client and configuration details. - write_pbs (List[:class:`google.cloud.proto.firestore.v1.write.Write`, ...]): - A ``Write`` protobuf instance to be committed. - transaction_id (bytes): - ID of an existing transaction that this commit will run in. - - Returns: - :class:`google.cloud.firestore_v1.types.CommitResponse`: - The protobuf response from ``Commit``. - - Raises: - ~google.api_core.exceptions.GoogleAPICallError: If a non-retryable - exception is encountered. - """ - current_sleep = _INITIAL_SLEEP - while True: - try: - return client._firestore_api.commit( - request={ - "database": client._database_string, - "writes": write_pbs, - "transaction": transaction_id, - }, - metadata=client._rpc_metadata, - ) - except exceptions.ServiceUnavailable: - # Retry - pass - - current_sleep = _sleep(current_sleep) - - -def _sleep( - current_sleep: float, max_sleep: float = _MAX_SLEEP, multiplier: float = _MULTIPLIER -) -> float: - """Sleep and produce a new sleep time. - - .. _Exponential Backoff And Jitter: https://www.awsarchitectureblog.com/\ - 2015/03/backoff.html - - Select a duration between zero and ``current_sleep``. It might seem - counterintuitive to have so much jitter, but - `Exponential Backoff And Jitter`_ argues that "full jitter" is - the best strategy. - - Args: - current_sleep (float): The current "max" for sleep interval. - max_sleep (Optional[float]): Eventual "max" sleep time - multiplier (Optional[float]): Multiplier for exponential backoff. - - Returns: - float: Newly doubled ``current_sleep`` or ``max_sleep`` (whichever - is smaller) - """ - actual_sleep = random.uniform(0.0, current_sleep) - time.sleep(actual_sleep) - return min(multiplier * current_sleep, max_sleep) diff --git a/tests/unit/v1/test_async_transaction.py b/tests/unit/v1/test_async_transaction.py index 3c62e83d1b..85d693950e 100644 --- a/tests/unit/v1/test_async_transaction.py +++ b/tests/unit/v1/test_async_transaction.py @@ -799,208 +799,6 @@ def test_async_transactional_factory(): assert wrapped.to_wrap is mock.sentinel.callable_ -@mock.patch("google.cloud.firestore_v1.async_transaction._sleep") -@pytest.mark.asyncio -async def test__commit_with_retry_success_first_attempt(_sleep): - from google.cloud.firestore_v1.async_transaction import _commit_with_retry - - # Create a minimal fake GAPIC with a dummy result. - firestore_api = AsyncMock() - - # Attach the fake GAPIC to a real client. - client = _make_client("summer") - client._firestore_api_internal = firestore_api - - # Call function and check result. - txn_id = b"cheeeeeez" - commit_response = await _commit_with_retry(client, mock.sentinel.write_pbs, txn_id) - assert commit_response is firestore_api.commit.return_value - - # Verify mocks used. - _sleep.assert_not_called() - firestore_api.commit.assert_called_once_with( - request={ - "database": client._database_string, - "writes": mock.sentinel.write_pbs, - "transaction": txn_id, - }, - metadata=client._rpc_metadata, - ) - - -@mock.patch( - "google.cloud.firestore_v1.async_transaction._sleep", side_effect=[2.0, 4.0] -) -@pytest.mark.asyncio -async def test__commit_with_retry_success_third_attempt(_sleep): - from google.api_core import exceptions - - from google.cloud.firestore_v1.async_transaction import _commit_with_retry - - # Create a minimal fake GAPIC with a dummy result. - firestore_api = AsyncMock() - - # Make sure the first two requests fail and the third succeeds. - firestore_api.commit.side_effect = [ - exceptions.ServiceUnavailable("Server sleepy."), - exceptions.ServiceUnavailable("Server groggy."), - mock.sentinel.commit_response, - ] - - # Attach the fake GAPIC to a real client. - client = _make_client("outside") - client._firestore_api_internal = firestore_api - - # Call function and check result. - txn_id = b"the-world\x00" - commit_response = await _commit_with_retry(client, mock.sentinel.write_pbs, txn_id) - assert commit_response is mock.sentinel.commit_response - - # Verify mocks used. - # Ensure _sleep is called after commit failures, with intervals of 1 and 2 seconds - assert _sleep.call_count == 2 - _sleep.assert_any_call(1.0) - _sleep.assert_any_call(2.0) - # commit() called same way 3 times. - commit_call = mock.call( - request={ - "database": client._database_string, - "writes": mock.sentinel.write_pbs, - "transaction": txn_id, - }, - metadata=client._rpc_metadata, - ) - assert firestore_api.commit.mock_calls == [commit_call, commit_call, commit_call] - - -@mock.patch("google.cloud.firestore_v1.async_transaction._sleep") -@pytest.mark.asyncio -async def test__commit_with_retry_failure_first_attempt(_sleep): - from google.api_core import exceptions - - from google.cloud.firestore_v1.async_transaction import _commit_with_retry - - # Create a minimal fake GAPIC with a dummy result. - firestore_api = AsyncMock() - - # Make sure the first request fails with an un-retryable error. - exc = exceptions.ResourceExhausted("We ran out of fries.") - firestore_api.commit.side_effect = exc - - # Attach the fake GAPIC to a real client. - client = _make_client("peanut-butter") - client._firestore_api_internal = firestore_api - - # Call function and check result. - txn_id = b"\x08\x06\x07\x05\x03\x00\x09-jenny" - with pytest.raises(exceptions.ResourceExhausted) as exc_info: - await _commit_with_retry(client, mock.sentinel.write_pbs, txn_id) - - assert exc_info.value is exc - - # Verify mocks used. - _sleep.assert_not_called() - firestore_api.commit.assert_called_once_with( - request={ - "database": client._database_string, - "writes": mock.sentinel.write_pbs, - "transaction": txn_id, - }, - metadata=client._rpc_metadata, - ) - - -@mock.patch("google.cloud.firestore_v1.async_transaction._sleep", return_value=2.0) -@pytest.mark.asyncio -async def test__commit_with_retry_failure_second_attempt(_sleep): - from google.api_core import exceptions - - from google.cloud.firestore_v1.async_transaction import _commit_with_retry - - # Create a minimal fake GAPIC with a dummy result. - firestore_api = AsyncMock() - - # Make sure the first request fails retry-able and second - # fails non-retryable. - exc1 = exceptions.ServiceUnavailable("Come back next time.") - exc2 = exceptions.InternalServerError("Server on fritz.") - firestore_api.commit.side_effect = [exc1, exc2] - - # Attach the fake GAPIC to a real client. - client = _make_client("peanut-butter") - client._firestore_api_internal = firestore_api - - # Call function and check result. - txn_id = b"the-journey-when-and-where-well-go" - with pytest.raises(exceptions.InternalServerError) as exc_info: - await _commit_with_retry(client, mock.sentinel.write_pbs, txn_id) - - assert exc_info.value is exc2 - - # Verify mocks used. - _sleep.assert_called_once_with(1.0) - # commit() called same way 2 times. - commit_call = mock.call( - request={ - "database": client._database_string, - "writes": mock.sentinel.write_pbs, - "transaction": txn_id, - }, - metadata=client._rpc_metadata, - ) - assert firestore_api.commit.mock_calls == [commit_call, commit_call] - - -@mock.patch("random.uniform", return_value=5.5) -@mock.patch("asyncio.sleep", return_value=None) -@pytest.mark.asyncio -async def test_sleep_defaults(sleep, uniform): - from google.cloud.firestore_v1.async_transaction import _sleep - - curr_sleep = 10.0 - assert uniform.return_value <= curr_sleep - - new_sleep = await _sleep(curr_sleep) - assert new_sleep == 2.0 * curr_sleep - - uniform.assert_called_once_with(0.0, curr_sleep) - sleep.assert_called_once_with(uniform.return_value) - - -@mock.patch("random.uniform", return_value=10.5) -@mock.patch("asyncio.sleep", return_value=None) -@pytest.mark.asyncio -async def test_sleep_explicit(sleep, uniform): - from google.cloud.firestore_v1.async_transaction import _sleep - - curr_sleep = 12.25 - assert uniform.return_value <= curr_sleep - - multiplier = 1.5 - new_sleep = await _sleep(curr_sleep, max_sleep=100.0, multiplier=multiplier) - assert new_sleep == multiplier * curr_sleep - - uniform.assert_called_once_with(0.0, curr_sleep) - sleep.assert_called_once_with(uniform.return_value) - - -@mock.patch("random.uniform", return_value=6.75) -@mock.patch("asyncio.sleep", return_value=None) -@pytest.mark.asyncio -async def test_sleep_exceeds_max(sleep, uniform): - from google.cloud.firestore_v1.async_transaction import _sleep - - curr_sleep = 20.0 - assert uniform.return_value <= curr_sleep - - max_sleep = 38.5 - new_sleep = await _sleep(curr_sleep, max_sleep=max_sleep, multiplier=2.0) - assert new_sleep == max_sleep - - uniform.assert_called_once_with(0.0, curr_sleep) - sleep.assert_called_once_with(uniform.return_value) - - def _make_credentials(): import google.auth.credentials diff --git a/tests/unit/v1/test_transaction.py b/tests/unit/v1/test_transaction.py index fc56d2f9b0..d37be34ea0 100644 --- a/tests/unit/v1/test_transaction.py +++ b/tests/unit/v1/test_transaction.py @@ -810,212 +810,6 @@ def test_transactional_factory(): assert wrapped.to_wrap is mock.sentinel.callable_ -@mock.patch("google.cloud.firestore_v1.transaction._sleep") -@pytest.mark.parametrize("database", [None, "somedb"]) -def test__commit_with_retry_success_first_attempt(_sleep, database): - from google.cloud.firestore_v1.services.firestore import client as firestore_client - from google.cloud.firestore_v1.transaction import _commit_with_retry - - # Create a minimal fake GAPIC with a dummy result. - firestore_api = mock.create_autospec( - firestore_client.FirestoreClient, instance=True - ) - - # Attach the fake GAPIC to a real client. - client = _make_client("summer", database=database) - client._firestore_api_internal = firestore_api - - # Call function and check result. - txn_id = b"cheeeeeez" - commit_response = _commit_with_retry(client, mock.sentinel.write_pbs, txn_id) - assert commit_response is firestore_api.commit.return_value - - # Verify mocks used. - _sleep.assert_not_called() - firestore_api.commit.assert_called_once_with( - request={ - "database": client._database_string, - "writes": mock.sentinel.write_pbs, - "transaction": txn_id, - }, - metadata=client._rpc_metadata, - ) - - -@mock.patch("google.cloud.firestore_v1.transaction._sleep", side_effect=[2.0, 4.0]) -@pytest.mark.parametrize("database", [None, "somedb"]) -def test__commit_with_retry_success_third_attempt(_sleep, database): - from google.api_core import exceptions - - from google.cloud.firestore_v1.services.firestore import client as firestore_client - from google.cloud.firestore_v1.transaction import _commit_with_retry - - # Create a minimal fake GAPIC with a dummy result. - firestore_api = mock.create_autospec( - firestore_client.FirestoreClient, instance=True - ) - # Make sure the first two requests fail and the third succeeds. - firestore_api.commit.side_effect = [ - exceptions.ServiceUnavailable("Server sleepy."), - exceptions.ServiceUnavailable("Server groggy."), - mock.sentinel.commit_response, - ] - - # Attach the fake GAPIC to a real client. - client = _make_client("outside", database=database) - client._firestore_api_internal = firestore_api - - # Call function and check result. - txn_id = b"the-world\x00" - commit_response = _commit_with_retry(client, mock.sentinel.write_pbs, txn_id) - assert commit_response is mock.sentinel.commit_response - - # Verify mocks used. - # Ensure _sleep is called after commit failures, with intervals of 1 and 2 seconds - assert _sleep.call_count == 2 - _sleep.assert_any_call(1.0) - _sleep.assert_any_call(2.0) - # commit() called same way 3 times. - commit_call = mock.call( - request={ - "database": client._database_string, - "writes": mock.sentinel.write_pbs, - "transaction": txn_id, - }, - metadata=client._rpc_metadata, - ) - assert firestore_api.commit.mock_calls == [commit_call, commit_call, commit_call] - - -@mock.patch("google.cloud.firestore_v1.transaction._sleep") -@pytest.mark.parametrize("database", [None, "somedb"]) -def test__commit_with_retry_failure_first_attempt(_sleep, database): - from google.api_core import exceptions - - from google.cloud.firestore_v1.services.firestore import client as firestore_client - from google.cloud.firestore_v1.transaction import _commit_with_retry - - # Create a minimal fake GAPIC with a dummy result. - firestore_api = mock.create_autospec( - firestore_client.FirestoreClient, instance=True - ) - # Make sure the first request fails with an un-retryable error. - exc = exceptions.ResourceExhausted("We ran out of fries.") - firestore_api.commit.side_effect = exc - - # Attach the fake GAPIC to a real client. - client = _make_client("peanut-butter", database=database) - client._firestore_api_internal = firestore_api - - # Call function and check result. - txn_id = b"\x08\x06\x07\x05\x03\x00\x09-jenny" - with pytest.raises(exceptions.ResourceExhausted) as exc_info: - _commit_with_retry(client, mock.sentinel.write_pbs, txn_id) - - assert exc_info.value is exc - - # Verify mocks used. - _sleep.assert_not_called() - firestore_api.commit.assert_called_once_with( - request={ - "database": client._database_string, - "writes": mock.sentinel.write_pbs, - "transaction": txn_id, - }, - metadata=client._rpc_metadata, - ) - - -@mock.patch("google.cloud.firestore_v1.transaction._sleep", return_value=2.0) -@pytest.mark.parametrize("database", [None, "somedb"]) -def test__commit_with_retry_failure_second_attempt(_sleep, database): - from google.api_core import exceptions - - from google.cloud.firestore_v1.services.firestore import client as firestore_client - from google.cloud.firestore_v1.transaction import _commit_with_retry - - # Create a minimal fake GAPIC with a dummy result. - firestore_api = mock.create_autospec( - firestore_client.FirestoreClient, instance=True - ) - # Make sure the first request fails retry-able and second - # fails non-retryable. - exc1 = exceptions.ServiceUnavailable("Come back next time.") - exc2 = exceptions.InternalServerError("Server on fritz.") - firestore_api.commit.side_effect = [exc1, exc2] - - # Attach the fake GAPIC to a real client. - client = _make_client("peanut-butter", database=database) - client._firestore_api_internal = firestore_api - - # Call function and check result. - txn_id = b"the-journey-when-and-where-well-go" - with pytest.raises(exceptions.InternalServerError) as exc_info: - _commit_with_retry(client, mock.sentinel.write_pbs, txn_id) - - assert exc_info.value is exc2 - - # Verify mocks used. - _sleep.assert_called_once_with(1.0) - # commit() called same way 2 times. - commit_call = mock.call( - request={ - "database": client._database_string, - "writes": mock.sentinel.write_pbs, - "transaction": txn_id, - }, - metadata=client._rpc_metadata, - ) - assert firestore_api.commit.mock_calls == [commit_call, commit_call] - - -@mock.patch("random.uniform", return_value=5.5) -@mock.patch("time.sleep", return_value=None) -def test_defaults(sleep, uniform): - from google.cloud.firestore_v1.transaction import _sleep - - curr_sleep = 10.0 - assert uniform.return_value <= curr_sleep - - new_sleep = _sleep(curr_sleep) - assert new_sleep == 2.0 * curr_sleep - - uniform.assert_called_once_with(0.0, curr_sleep) - sleep.assert_called_once_with(uniform.return_value) - - -@mock.patch("random.uniform", return_value=10.5) -@mock.patch("time.sleep", return_value=None) -def test_explicit(sleep, uniform): - from google.cloud.firestore_v1.transaction import _sleep - - curr_sleep = 12.25 - assert uniform.return_value <= curr_sleep - - multiplier = 1.5 - new_sleep = _sleep(curr_sleep, max_sleep=100.0, multiplier=multiplier) - assert new_sleep == multiplier * curr_sleep - - uniform.assert_called_once_with(0.0, curr_sleep) - sleep.assert_called_once_with(uniform.return_value) - - -@mock.patch("random.uniform", return_value=6.75) -@mock.patch("time.sleep", return_value=None) -def test_exceeds_max(sleep, uniform): - from google.cloud.firestore_v1.transaction import _sleep - - curr_sleep = 20.0 - assert uniform.return_value <= curr_sleep - - max_sleep = 38.5 - new_sleep = _sleep(curr_sleep, max_sleep=max_sleep, multiplier=2.0) - assert new_sleep == max_sleep - - uniform.assert_called_once_with(0.0, curr_sleep) - sleep.assert_called_once_with(uniform.return_value) - - def _make_credentials(): import google.auth.credentials