Skip to content

Commit

Permalink
feat: implement async client for LROs (#707)
Browse files Browse the repository at this point in the history
* feat: implement `AbstractOperationsAsyncClient` to support long running operations

* remove coverage guards

* address presubmit failures

* fix coverage for cancel operation

* tests cleanup

* fix incorrect tests

* file bugs

* add auth import

* address PR comments

* address PR comments

* fix unit tests and address more comments

* disable retry parameter

* add retry parameter

* address PR comments

---------

Co-authored-by: ohmayr <[email protected]>
Co-authored-by: ohmayr <[email protected]>
  • Loading branch information
3 people authored Oct 7, 2024
1 parent 3c7e43e commit 043785e
Show file tree
Hide file tree
Showing 11 changed files with 1,593 additions and 528 deletions.
14 changes: 8 additions & 6 deletions google/api_core/operations_v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@

"""Package for interacting with the google.longrunning.operations meta-API."""

from google.api_core.operations_v1.abstract_operations_client import (
AbstractOperationsClient,
)
from google.api_core.operations_v1.abstract_operations_client import AbstractOperationsClient
from google.api_core.operations_v1.operations_async_client import OperationsAsyncClient
from google.api_core.operations_v1.operations_client import OperationsClient
from google.api_core.operations_v1.transports.rest import OperationsRestTransport
Expand All @@ -29,10 +27,14 @@
]

try:
from google.api_core.operations_v1.transports.rest_asyncio import OperationsRestAsyncTransport # noqa: F401
__all__.append("OperationsRestAsyncTransport")
from google.api_core.operations_v1.transports.rest_asyncio import (
AsyncOperationsRestTransport,
)
from google.api_core.operations_v1.operations_rest_client_async import AsyncOperationsRestClient

__all__ += ["AsyncOperationsRestClient", "AsyncOperationsRestTransport"]
except ImportError:
# This import requires the `async_rest` extra.
# Don't raise an exception if `OperationsRestAsyncTransport` cannot be imported
# Don't raise an exception if `AsyncOperationsRestTransport` cannot be imported
# as other transports are still available.
pass
370 changes: 370 additions & 0 deletions google/api_core/operations_v1/abstract_operations_base_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,370 @@
# -*- coding: utf-8 -*-
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import OrderedDict
import os
import re
from typing import Dict, Optional, Type, Union

from google.api_core import client_options as client_options_lib # type: ignore
from google.api_core import gapic_v1 # type: ignore
from google.api_core.operations_v1.transports.base import (
DEFAULT_CLIENT_INFO,
OperationsTransport,
)
from google.api_core.operations_v1.transports.rest import OperationsRestTransport

try:
from google.api_core.operations_v1.transports.rest_asyncio import (
AsyncOperationsRestTransport,
)

HAS_ASYNC_REST_DEPENDENCIES = True
except ImportError as e:
HAS_ASYNC_REST_DEPENDENCIES = False
ASYNC_REST_EXCEPTION = e

from google.auth import credentials as ga_credentials # type: ignore
from google.auth.exceptions import MutualTLSChannelError # type: ignore
from google.auth.transport import mtls # type: ignore


class AbstractOperationsBaseClientMeta(type):
"""Metaclass for the Operations Base client.
This provides base class-level methods for building and retrieving
support objects (e.g. transport) without polluting the client instance
objects.
"""

_transport_registry = OrderedDict() # type: Dict[str, Type[OperationsTransport]]
_transport_registry["rest"] = OperationsRestTransport
if HAS_ASYNC_REST_DEPENDENCIES:
_transport_registry["rest_asyncio"] = AsyncOperationsRestTransport

def get_transport_class(
cls,
label: Optional[str] = None,
) -> Type[OperationsTransport]:
"""Returns an appropriate transport class.
Args:
label: The name of the desired transport. If none is
provided, then the first transport in the registry is used.
Returns:
The transport class to use.
"""
# If a specific transport is requested, return that one.
if (
label == "rest_asyncio" and not HAS_ASYNC_REST_DEPENDENCIES
): # pragma: NO COVER
raise ASYNC_REST_EXCEPTION

if label:
return cls._transport_registry[label]

# No transport is requested; return the default (that is, the first one
# in the dictionary).
return next(iter(cls._transport_registry.values()))


class AbstractOperationsBaseClient(metaclass=AbstractOperationsBaseClientMeta):
"""Manages long-running operations with an API service.
When an API method normally takes long time to complete, it can be
designed to return [Operation][google.api_core.operations_v1.Operation] to the
client, and the client can use this interface to receive the real
response asynchronously by polling the operation resource, or pass
the operation resource to another API (such as Google Cloud Pub/Sub
API) to receive the response. Any API service that returns
long-running operations should implement the ``Operations``
interface so developers can have a consistent client experience.
"""

@staticmethod
def _get_default_mtls_endpoint(api_endpoint):
"""Converts api endpoint to mTLS endpoint.
Convert "*.sandbox.googleapis.com" and "*.googleapis.com" to
"*.mtls.sandbox.googleapis.com" and "*.mtls.googleapis.com" respectively.
Args:
api_endpoint (Optional[str]): the api endpoint to convert.
Returns:
str: converted mTLS api endpoint.
"""
if not api_endpoint:
return api_endpoint

mtls_endpoint_re = re.compile(
r"(?P<name>[^.]+)(?P<mtls>\.mtls)?(?P<sandbox>\.sandbox)?(?P<googledomain>\.googleapis\.com)?"
)

m = mtls_endpoint_re.match(api_endpoint)
name, mtls, sandbox, googledomain = m.groups()
if mtls or not googledomain:
return api_endpoint

if sandbox:
return api_endpoint.replace(
"sandbox.googleapis.com", "mtls.sandbox.googleapis.com"
)

return api_endpoint.replace(".googleapis.com", ".mtls.googleapis.com")

DEFAULT_ENDPOINT = "longrunning.googleapis.com"
DEFAULT_MTLS_ENDPOINT = _get_default_mtls_endpoint.__func__( # type: ignore
DEFAULT_ENDPOINT
)

@classmethod
def from_service_account_info(cls, info: dict, *args, **kwargs):
"""
This class method should be overridden by the subclasses.
Args:
info (dict): The service account private key info.
args: Additional arguments to pass to the constructor.
kwargs: Additional arguments to pass to the constructor.
Raises:
NotImplementedError: If the method is called on the base class.
"""
raise NotImplementedError("`from_service_account_info` is not implemented.")

@classmethod
def from_service_account_file(cls, filename: str, *args, **kwargs):
"""
This class method should be overridden by the subclasses.
Args:
filename (str): The path to the service account private key json
file.
args: Additional arguments to pass to the constructor.
kwargs: Additional arguments to pass to the constructor.
Raises:
NotImplementedError: If the method is called on the base class.
"""
raise NotImplementedError("`from_service_account_file` is not implemented.")

from_service_account_json = from_service_account_file

@property
def transport(self) -> OperationsTransport:
"""Returns the transport used by the client instance.
Returns:
OperationsTransport: The transport used by the client
instance.
"""
return self._transport

@staticmethod
def common_billing_account_path(
billing_account: str,
) -> str:
"""Returns a fully-qualified billing_account string."""
return "billingAccounts/{billing_account}".format(
billing_account=billing_account,
)

@staticmethod
def parse_common_billing_account_path(path: str) -> Dict[str, str]:
"""Parse a billing_account path into its component segments."""
m = re.match(r"^billingAccounts/(?P<billing_account>.+?)$", path)
return m.groupdict() if m else {}

@staticmethod
def common_folder_path(
folder: str,
) -> str:
"""Returns a fully-qualified folder string."""
return "folders/{folder}".format(
folder=folder,
)

@staticmethod
def parse_common_folder_path(path: str) -> Dict[str, str]:
"""Parse a folder path into its component segments."""
m = re.match(r"^folders/(?P<folder>.+?)$", path)
return m.groupdict() if m else {}

@staticmethod
def common_organization_path(
organization: str,
) -> str:
"""Returns a fully-qualified organization string."""
return "organizations/{organization}".format(
organization=organization,
)

@staticmethod
def parse_common_organization_path(path: str) -> Dict[str, str]:
"""Parse a organization path into its component segments."""
m = re.match(r"^organizations/(?P<organization>.+?)$", path)
return m.groupdict() if m else {}

@staticmethod
def common_project_path(
project: str,
) -> str:
"""Returns a fully-qualified project string."""
return "projects/{project}".format(
project=project,
)

@staticmethod
def parse_common_project_path(path: str) -> Dict[str, str]:
"""Parse a project path into its component segments."""
m = re.match(r"^projects/(?P<project>.+?)$", path)
return m.groupdict() if m else {}

@staticmethod
def common_location_path(
project: str,
location: str,
) -> str:
"""Returns a fully-qualified location string."""
return "projects/{project}/locations/{location}".format(
project=project,
location=location,
)

@staticmethod
def parse_common_location_path(path: str) -> Dict[str, str]:
"""Parse a location path into its component segments."""
m = re.match(r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)$", path)
return m.groupdict() if m else {}

def __init__(
self,
*,
credentials: Optional[ga_credentials.Credentials] = None,
transport: Union[str, OperationsTransport, None] = None,
client_options: Optional[client_options_lib.ClientOptions] = None,
client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
) -> None:
"""Instantiates the operations client.
Args:
credentials (Optional[google.auth.credentials.Credentials]): The
authorization credentials to attach to requests. These
credentials identify the application to the service; if none
are specified, the client will attempt to ascertain the
credentials from the environment.
transport (Union[str, OperationsTransport]): The
transport to use. If set to None, a transport is chosen
automatically.
client_options (google.api_core.client_options.ClientOptions): Custom options for the
client. It won't take effect if a ``transport`` instance is provided.
(1) The ``api_endpoint`` property can be used to override the
default endpoint provided by the client. GOOGLE_API_USE_MTLS_ENDPOINT
environment variable can also be used to override the endpoint:
"always" (always use the default mTLS endpoint), "never" (always
use the default regular endpoint) and "auto" (auto switch to the
default mTLS endpoint if client certificate is present, this is
the default value). However, the ``api_endpoint`` property takes
precedence if provided.
(2) If GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable
is "true", then the ``client_cert_source`` property can be used
to provide client certificate for mutual TLS transport. If
not provided, the default SSL client certificate will be used if
present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not
set, no client certificate will be used.
client_info (google.api_core.gapic_v1.client_info.ClientInfo):
The client info used to send a user-agent string along with
API requests. If ``None``, then default info will be used.
Generally, you only need to set this if you're developing
your own client library.
Raises:
google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
creation failed for any reason.
"""
if isinstance(client_options, dict):
client_options = client_options_lib.from_dict(client_options)
if client_options is None:
client_options = client_options_lib.ClientOptions()

# Create SSL credentials for mutual TLS if needed.
use_client_cert = os.getenv(
"GOOGLE_API_USE_CLIENT_CERTIFICATE", "false"
).lower()
if use_client_cert not in ("true", "false"):
raise ValueError(
"Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`"
)
client_cert_source_func = None
is_mtls = False
if use_client_cert == "true":
if client_options.client_cert_source:
is_mtls = True
client_cert_source_func = client_options.client_cert_source
else:
is_mtls = mtls.has_default_client_cert_source()
if is_mtls:
client_cert_source_func = mtls.default_client_cert_source()
else:
client_cert_source_func = None

# Figure out which api endpoint to use.
if client_options.api_endpoint is not None:
api_endpoint = client_options.api_endpoint
else:
use_mtls_env = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto")
if use_mtls_env == "never":
api_endpoint = self.DEFAULT_ENDPOINT
elif use_mtls_env == "always":
api_endpoint = self.DEFAULT_MTLS_ENDPOINT
elif use_mtls_env == "auto":
if is_mtls:
api_endpoint = self.DEFAULT_MTLS_ENDPOINT
else:
api_endpoint = self.DEFAULT_ENDPOINT
else:
raise MutualTLSChannelError(
"Unsupported GOOGLE_API_USE_MTLS_ENDPOINT value. Accepted "
"values: never, auto, always"
)

# Save or instantiate the transport.
# Ordinarily, we provide the transport, but allowing a custom transport
# instance provides an extensibility point for unusual situations.
if isinstance(transport, OperationsTransport):
# transport is a OperationsTransport instance.
if credentials or client_options.credentials_file:
raise ValueError(
"When providing a transport instance, "
"provide its credentials directly."
)
if client_options.scopes:
raise ValueError(
"When providing a transport instance, provide its scopes "
"directly."
)
self._transport = transport
else:
Transport = type(self).get_transport_class(transport)
self._transport = Transport(
credentials=credentials,
credentials_file=client_options.credentials_file,
host=api_endpoint,
scopes=client_options.scopes,
client_cert_source_for_mtls=client_cert_source_func,
quota_project_id=client_options.quota_project_id,
client_info=client_info,
always_use_jwt_access=True,
)
Loading

0 comments on commit 043785e

Please sign in to comment.