Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove custom retry loop #948

Merged
merged 4 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 8 additions & 84 deletions google/cloud/firestore_v1/async_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
"""Helpers for applying Google Cloud Firestore changes in a transaction."""


import asyncio
import random
from typing import Any, AsyncGenerator, Callable, Coroutine

from google.api_core import exceptions, gapic_v1
from google.api_core import retry_async as retries

from google.cloud.firestore_v1 import _helpers, async_batch, types
from google.cloud.firestore_v1 import _helpers, async_batch
from google.cloud.firestore_v1.async_document import (
AsyncDocumentReference,
DocumentSnapshot,
Expand All @@ -33,18 +31,12 @@
_CANT_COMMIT,
_CANT_ROLLBACK,
_EXCEED_ATTEMPTS_TEMPLATE,
_INITIAL_SLEEP,
_MAX_SLEEP,
_MULTIPLIER,
_WRITE_READ_ONLY,
MAX_ATTEMPTS,
BaseTransaction,
_BaseTransactional,
)

# Types needed only for Type Hints
from google.cloud.firestore_v1.client import Client


class AsyncTransaction(async_batch.AsyncWriteBatch, BaseTransaction):
"""Accumulate read-and-write operations to be sent in a transaction.
Expand Down Expand Up @@ -140,8 +132,13 @@ async def _commit(self) -> list:
if not self.in_progress:
raise ValueError(_CANT_COMMIT)

commit_response = await _commit_with_retry(
self._client, self._write_pbs, self._id
commit_response = await self._client._firestore_api.commit(
request={
"database": self._client._database_string,
"writes": self._write_pbs,
"transaction": self._id,
},
metadata=self._client._rpc_metadata,
)

self._clean_up()
Expand Down Expand Up @@ -313,76 +310,3 @@ def async_transactional(
the wrapped callable.
"""
return _AsyncTransactional(to_wrap)


# TODO(crwilcox): this was 'coroutine' from pytype merge-pyi...
async def _commit_with_retry(
client: Client, write_pbs: list, transaction_id: bytes
) -> types.CommitResponse:
"""Call ``Commit`` on the GAPIC client with retry / sleep.

Retries the ``Commit`` RPC on Unavailable. Usually this RPC-level
retry is handled by the underlying GAPICd client, but in this case it
doesn't because ``Commit`` is not always idempotent. But here we know it
is "idempotent"-like because it has a transaction ID. We also need to do
our own retry to special-case the ``INVALID_ARGUMENT`` error.

Args:
client (:class:`~google.cloud.firestore_v1.client.Client`):
A client with GAPIC client and configuration details.
write_pbs (List[:class:`google.cloud.proto.firestore.v1.write.Write`, ...]):
A ``Write`` protobuf instance to be committed.
transaction_id (bytes):
ID of an existing transaction that this commit will run in.

Returns:
:class:`google.cloud.firestore_v1.types.CommitResponse`:
The protobuf response from ``Commit``.

Raises:
~google.api_core.exceptions.GoogleAPICallError: If a non-retryable
exception is encountered.
"""
current_sleep = _INITIAL_SLEEP
while True:
try:
return await client._firestore_api.commit(
request={
"database": client._database_string,
"writes": write_pbs,
"transaction": transaction_id,
},
metadata=client._rpc_metadata,
)
except exceptions.ServiceUnavailable:
# Retry
pass

current_sleep = await _sleep(current_sleep)


async def _sleep(
current_sleep: float, max_sleep: float = _MAX_SLEEP, multiplier: float = _MULTIPLIER
) -> float:
"""Sleep and produce a new sleep time.

.. _Exponential Backoff And Jitter: https://www.awsarchitectureblog.com/\
2015/03/backoff.html

Select a duration between zero and ``current_sleep``. It might seem
counterintuitive to have so much jitter, but
`Exponential Backoff And Jitter`_ argues that "full jitter" is
the best strategy.

Args:
current_sleep (float): The current "max" for sleep interval.
max_sleep (Optional[float]): Eventual "max" sleep time
multiplier (Optional[float]): Multiplier for exponential backoff.

Returns:
float: Newly doubled ``current_sleep`` or ``max_sleep`` (whichever
is smaller)
"""
actual_sleep = random.uniform(0.0, current_sleep)
await asyncio.sleep(actual_sleep)
return min(multiplier * current_sleep, max_sleep)
6 changes: 0 additions & 6 deletions google/cloud/firestore_v1/base_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@
_CANT_ROLLBACK: str = _MISSING_ID_TEMPLATE.format("rolled back")
_CANT_COMMIT: str = _MISSING_ID_TEMPLATE.format("committed")
_WRITE_READ_ONLY: str = "Cannot perform write operation in read-only transaction."
_INITIAL_SLEEP: float = 1.0
"""float: Initial "max" for sleep interval. To be used in :func:`_sleep`."""
_MAX_SLEEP: float = 30.0
"""float: Eventual "max" sleep time. To be used in :func:`_sleep`."""
_MULTIPLIER: float = 2.0
"""float: Multiplier for exponential backoff. To be used in :func:`_sleep`."""
_EXCEED_ATTEMPTS_TEMPLATE: str = "Failed to commit transaction in {:d} attempts."
_CANT_RETRY_READ_ONLY: str = "Only read-write transactions can be retried."

Expand Down
87 changes: 8 additions & 79 deletions google/cloud/firestore_v1/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
"""Helpers for applying Google Cloud Firestore changes in a transaction."""


import random
import time
from typing import Any, Callable, Generator

from google.api_core import exceptions, gapic_v1
Expand All @@ -31,17 +29,13 @@
_CANT_COMMIT,
_CANT_ROLLBACK,
_EXCEED_ATTEMPTS_TEMPLATE,
_INITIAL_SLEEP,
_MAX_SLEEP,
_MULTIPLIER,
_WRITE_READ_ONLY,
MAX_ATTEMPTS,
BaseTransaction,
_BaseTransactional,
)
from google.cloud.firestore_v1.document import DocumentReference
from google.cloud.firestore_v1.query import Query
from google.cloud.firestore_v1.types import CommitResponse


class Transaction(batch.WriteBatch, BaseTransaction):
Expand Down Expand Up @@ -138,7 +132,14 @@ def _commit(self) -> list:
if not self.in_progress:
raise ValueError(_CANT_COMMIT)

commit_response = _commit_with_retry(self._client, self._write_pbs, self._id)
commit_response = self._client._firestore_api.commit(
request={
"database": self._client._database_string,
"writes": self._write_pbs,
"transaction": self._id,
},
metadata=self._client._rpc_metadata,
)

self._clean_up()
return list(commit_response.write_results)
Expand Down Expand Up @@ -301,75 +302,3 @@ def transactional(to_wrap: Callable) -> _Transactional:
the wrapped callable.
"""
return _Transactional(to_wrap)


def _commit_with_retry(
client, write_pbs: list, transaction_id: bytes
) -> CommitResponse:
"""Call ``Commit`` on the GAPIC client with retry / sleep.

Retries the ``Commit`` RPC on Unavailable. Usually this RPC-level
retry is handled by the underlying GAPICd client, but in this case it
doesn't because ``Commit`` is not always idempotent. But here we know it
is "idempotent"-like because it has a transaction ID. We also need to do
our own retry to special-case the ``INVALID_ARGUMENT`` error.

Args:
client (:class:`~google.cloud.firestore_v1.client.Client`):
A client with GAPIC client and configuration details.
write_pbs (List[:class:`google.cloud.proto.firestore.v1.write.Write`, ...]):
A ``Write`` protobuf instance to be committed.
transaction_id (bytes):
ID of an existing transaction that this commit will run in.

Returns:
:class:`google.cloud.firestore_v1.types.CommitResponse`:
The protobuf response from ``Commit``.

Raises:
~google.api_core.exceptions.GoogleAPICallError: If a non-retryable
exception is encountered.
"""
current_sleep = _INITIAL_SLEEP
while True:
try:
return client._firestore_api.commit(
request={
"database": client._database_string,
"writes": write_pbs,
"transaction": transaction_id,
},
metadata=client._rpc_metadata,
)
except exceptions.ServiceUnavailable:
# Retry
pass

current_sleep = _sleep(current_sleep)


def _sleep(
current_sleep: float, max_sleep: float = _MAX_SLEEP, multiplier: float = _MULTIPLIER
) -> float:
"""Sleep and produce a new sleep time.

.. _Exponential Backoff And Jitter: https://www.awsarchitectureblog.com/\
2015/03/backoff.html

Select a duration between zero and ``current_sleep``. It might seem
counterintuitive to have so much jitter, but
`Exponential Backoff And Jitter`_ argues that "full jitter" is
the best strategy.

Args:
current_sleep (float): The current "max" for sleep interval.
max_sleep (Optional[float]): Eventual "max" sleep time
multiplier (Optional[float]): Multiplier for exponential backoff.

Returns:
float: Newly doubled ``current_sleep`` or ``max_sleep`` (whichever
is smaller)
"""
actual_sleep = random.uniform(0.0, current_sleep)
time.sleep(actual_sleep)
return min(multiplier * current_sleep, max_sleep)
Loading
Loading