Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement async client for LROs #707

Merged
merged 14 commits into from
Oct 7, 2024
2 changes: 1 addition & 1 deletion google/api_core/operations_v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@
except ImportError:
# This import requires the `async_rest` extra.
# Don't raise an exception if `OperationsRestAsyncTransport` cannot be imported
# as other transports are still available
# as other transports are still available.
pass
121 changes: 66 additions & 55 deletions google/api_core/operations_v1/abstract_operations_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,16 @@
from google.api_core.operations_v1.abstract_operations_base_client import (
AbstractOperationsBaseClient,
)
from google.longrunning import operations_pb2

try:
from google.auth.aio import credentials as ga_credentials # type: ignore
except ImportError as e: # pragma: NO COVER
ohmayr marked this conversation as resolved.
Show resolved Hide resolved
raise ImportError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the file and the name of the class do not make it clear that this is REST-specific. So async should work if at least one of { (REST dependencies), (gRPC dependencies) } are available. So it seems to me we should check for both sets of dependencies being absent before we error.

That, at least, when we get to the steady state of having everything implemented and cleaned up. If that's the reason we're not doing that now, let's add a TODO with a link to a tracking issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. The name doesn't specify it but it is indeed REST specific. I agree that it's confusing. The name used here is consistent to what we have for the sync REST client / transport. We can have a discussion on what we want to name our async client / transport.

Updating the sync client name would be a breaking change.

"`google-api-core[async_rest]` is required to use asynchronous rest streaming. "
"Install the `async_rest` extra of `google-api-core` using "
"The `async_rest` extra of `google-api-core` is required to use long-running operations. Install it by running "
"`pip install google-api-core[async_rest]`."
) from e

from google.longrunning import operations_pb2


class AbstractOperationsAsyncClient(AbstractOperationsBaseClient):
"""Manages long-running operations with an API service for the asynchronous client.
Expand Down Expand Up @@ -68,8 +66,7 @@ def __init__(
are specified, the client will attempt to ascertain the
credentials from the environment.
transport (Union[str, OperationsTransport]): The
transport to use. If set to None, a transport is chosen
automatically.
transport to use. If set to None, this defaults to 'rest_asyncio'.
client_options (google.api_core.client_options.ClientOptions): Custom options for the
client. It won't take effect if a ``transport`` instance is provided.
(1) The ``api_endpoint`` property can be used to override the
Expand Down Expand Up @@ -99,16 +96,70 @@ def __init__(
super().__init__(
credentials=credentials, # type: ignore
# NOTE: If a transport is not provided, we force the client to use the async
# REST transport, as it should.
# REST transport.
transport=transport or "rest_asyncio",
client_options=client_options,
client_info=client_info,
)

async def get_operation(
self,
name: str,
# TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry`
# to allow configuring retryable error codes.
retry=gapic_v1.method_async.DEFAULT,
*,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> operations_pb2.Operation:
r"""Gets the latest state of a long-running operation.
Clients can use this method to poll the operation result
at intervals as recommended by the API service.

Args:
name (str):
The name of the operation resource.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.

Returns:
google.longrunning.operations_pb2.Operation:
This resource represents a long-
running operation that is the result of a
network API call.

"""

request = operations_pb2.GetOperationRequest(name=name)

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = self._transport._wrapped_methods[self._transport.get_operation]

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata or ()) + (
gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
)

# Send the request.
response = await rpc(
request,
timeout=timeout,
metadata=metadata,
)

# Done; return the response.
return response

async def list_operations(
self,
name: str,
filter_: Optional[str] = None,
# TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry`
# to allow configuring retryable error codes.
retry=gapic_v1.method_async.DEFAULT,
*,
page_size: Optional[int] = None,
page_token: Optional[str] = None,
Expand Down Expand Up @@ -186,57 +237,12 @@ async def list_operations(
# Done; return the response.
return response

async def get_operation(
self,
name: str,
*,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> operations_pb2.Operation:
r"""Gets the latest state of a long-running operation.
Clients can use this method to poll the operation result
at intervals as recommended by the API service.

Args:
name (str):
The name of the operation resource.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.

Returns:
google.longrunning.operations_pb2.Operation:
This resource represents a long-
running operation that is the result of a
network API call.

"""

request = operations_pb2.GetOperationRequest(name=name)

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = self._transport._wrapped_methods[self._transport.get_operation]

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata or ()) + (
gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
)

# Send the request.
response = await rpc(
request,
timeout=timeout,
metadata=metadata,
)

# Done; return the response.
return response

async def delete_operation(
self,
name: str,
# TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry`
# to allow configuring retryable error codes.
retry=gapic_v1.method_async.DEFAULT,
*,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
Expand Down Expand Up @@ -281,9 +287,14 @@ async def delete_operation(
async def cancel_operation(
self,
name: Optional[str] = None,
# TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry`
# to allow configuring retryable error codes.
retry=gapic_v1.method_async.DEFAULT,
*,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
# TODO(https://github.com/googleapis/python-api-core/issues/722): Add `retry` parameter
# to allow configuring retryable error codes.
) -> None:
r"""Starts asynchronous cancellation on a long-running operation.
The server makes a best effort to cancel the operation, but
Expand Down
49 changes: 10 additions & 39 deletions google/api_core/operations_v1/abstract_operations_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from typing import Dict, Optional, Type, Union

from google.api_core import client_options as client_options_lib # type: ignore
from google.api_core import exceptions as core_exceptions # type: ignore
from google.api_core import gapic_v1 # type: ignore
from google.api_core.operations_v1.transports.base import (
DEFAULT_CLIENT_INFO,
Expand Down Expand Up @@ -359,41 +358,13 @@ def __init__(
self._transport = transport
else:
Transport = type(self).get_transport_class(transport)
# NOTE: The conditional logic below to initialize the transport can be removed
# once we have feature parity with the sync transport.
if "async" in str(Transport).lower():
unsupported_params = {
# TODO(https://github.com/googleapis/python-api-core/issues/715): Add support for `credentials_file` to async REST transport.
"google.api_core.client_options.ClientOptions.credentials_file": client_options.credentials_file,
# TODO(https://github.com/googleapis/python-api-core/issues/716): Add support for `scopes` to async REST transport.
"google.api_core.client_options.ClientOptions.scopes": client_options.scopes,
# TODO(https://github.com/googleapis/python-api-core/issues/717): Add support for `quota_project_id` to async REST transport.
"google.api_core.client_options.ClientOptions.quota_project_id": client_options.quota_project_id,
# TODO(https://github.com/googleapis/python-api-core/issues/718): Add support for `client_cert_source` to async REST transport.
"google.api_core.client_options.ClientOptions.client_cert_source": client_options.client_cert_source,
}
provided_unsupported_params = [
name
for name, value in unsupported_params.items()
if value is not None
]
if provided_unsupported_params:
raise core_exceptions.AsyncRestUnsupportedParameterError(
f"The following provided parameters are not supported for `transport=rest_asyncio`: {', '.join(provided_unsupported_params)}"
)
self._transport = Transport(
credentials=credentials,
host=api_endpoint,
client_info=client_info,
)
else:
self._transport = Transport(
credentials=credentials,
credentials_file=client_options.credentials_file,
host=api_endpoint,
scopes=client_options.scopes,
client_cert_source_for_mtls=client_cert_source_func,
quota_project_id=client_options.quota_project_id,
client_info=client_info,
always_use_jwt_access=True,
)
self._transport = Transport(
credentials=credentials,
credentials_file=client_options.credentials_file,
host=api_endpoint,
scopes=client_options.scopes,
client_cert_source_for_mtls=client_cert_source_func,
quota_project_id=client_options.quota_project_id,
client_info=client_info,
always_use_jwt_access=True,
)
43 changes: 38 additions & 5 deletions google/api_core/operations_v1/transports/rest_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,19 @@ def __init__(
*,
host: str = "longrunning.googleapis.com",
credentials: Optional[ga_credentials_async.Credentials] = None,
credentials_file: Optional[str] = None,
scopes: Optional[Sequence[str]] = None,
client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
quota_project_id: Optional[str] = None,
client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
always_use_jwt_access: Optional[bool] = False,
url_scheme: str = "https",
http_options: Optional[Dict] = None,
path_prefix: str = "v1",
# TODO(https://github.com/googleapis/python-api-core/issues/715): Add support for `credentials_file` to async REST transport.
# TODO(https://github.com/googleapis/python-api-core/issues/716): Add support for `scopes` to async REST transport.
# TODO(https://github.com/googleapis/python-api-core/issues/717): Add support for `quota_project_id` to async REST transport.
# TODO(https://github.com/googleapis/python-api-core/issues/718): Add support for `client_cert_source` to async REST transport.
# TODO(https://github.com/googleapis/python-api-core/issues/715): Add docstring for `credentials_file` to async REST transport.
# TODO(https://github.com/googleapis/python-api-core/issues/716): Add docstring for `scopes` to async REST transport.
# TODO(https://github.com/googleapis/python-api-core/issues/717): Add docstring for `quota_project_id` to async REST transport.
# TODO(https://github.com/googleapis/python-api-core/issues/718): Add docstring for `client_cert_source` to async REST transport.
) -> None:
"""Instantiate the transport.

Expand Down Expand Up @@ -109,12 +113,33 @@ def __init__(
"v1" by default.

"""
unsupported_params = {
# TODO(https://github.com/googleapis/python-api-core/issues/715): Add support for `credentials_file` to async REST transport.
"google.api_core.client_options.ClientOptions.credentials_file": credentials_file,
# TODO(https://github.com/googleapis/python-api-core/issues/716): Add support for `scopes` to async REST transport.
"google.api_core.client_options.ClientOptions.scopes": scopes,
# TODO(https://github.com/googleapis/python-api-core/issues/717): Add support for `quota_project_id` to async REST transport.
"google.api_core.client_options.ClientOptions.quota_project_id": quota_project_id,
# TODO(https://github.com/googleapis/python-api-core/issues/718): Add support for `client_cert_source` to async REST transport.
"google.api_core.client_options.ClientOptions.client_cert_source": client_cert_source_for_mtls,
# TODO(https://github.com/googleapis/python-api-core/issues/718): Add support for `client_cert_source` to async REST transport.
"google.api_core.client_options.ClientOptions.client_cert_source": client_cert_source_for_mtls,
}
provided_unsupported_params = [
name for name, value in unsupported_params.items() if value is not None
]
if provided_unsupported_params:
raise core_exceptions.AsyncRestUnsupportedParameterError(
f"The following provided parameters are not supported for `transport=rest_asyncio`: {', '.join(provided_unsupported_params)}"
)

super().__init__(
host=host,
# TODO(https://github.com/googleapis/python-api-core/issues/709): Remove `type: ignore` when the linked issue is resolved.
credentials=credentials, # type: ignore
client_info=client_info,
always_use_jwt_access=always_use_jwt_access,
# TODO(https://github.com/googleapis/python-api-core/issues/725): Set always_use_jwt_access token when supported.
always_use_jwt_access=False,
)
# TODO(https://github.com/googleapis/python-api-core/issues/708): add support for
# `default_host` in AsyncAuthorizedSession for feature parity with the synchronous
Expand Down Expand Up @@ -197,6 +222,8 @@ async def _list_operations(
*,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
# TODO(https://github.com/googleapis/python-api-core/issues/722): Add `retry` parameter
# to allow configuring retryable error codes.
) -> operations_pb2.ListOperationsResponse:
r"""Asynchronously call the list operations method over HTTP.

Expand Down Expand Up @@ -271,6 +298,8 @@ async def _get_operation(
*,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
# TODO(https://github.com/googleapis/python-api-core/issues/722): Add `retry` parameter
# to allow configuring retryable error codes.
) -> operations_pb2.Operation:
r"""Asynchronously call the get operation method over HTTP.

Expand Down Expand Up @@ -346,6 +375,8 @@ async def _delete_operation(
*,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
# TODO(https://github.com/googleapis/python-api-core/issues/722): Add `retry` parameter
# to allow configuring retryable error codes.
) -> empty_pb2.Empty:
r"""Asynchronously call the delete operation method over HTTP.

Expand Down Expand Up @@ -414,6 +445,8 @@ async def _cancel_operation(
*,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
# TODO(https://github.com/googleapis/python-api-core/issues/722): Add `retry` parameter
# to allow configuring retryable error codes.
) -> empty_pb2.Empty:
r"""Asynchronously call the cancel operation method over HTTP.

Expand Down
Loading