diff --git a/google/api_core/client_info.py b/google/api_core/client_info.py index 48326799..90926beb 100644 --- a/google/api_core/client_info.py +++ b/google/api_core/client_info.py @@ -57,7 +57,8 @@ class ClientInfo(object): user_agent (Optional[str]): Prefix to the user agent header. This is used to supply information such as application name or partner tool. Recommended format: ``application-or-tool-ID/major.minor.version``. - rest_version (Optional[str]): The requests library version. + rest_version (Optional[str]): A string with labeled versions of the + dependencies used for REST transport. """ def __init__( diff --git a/google/api_core/gapic_v1/client_info.py b/google/api_core/gapic_v1/client_info.py index 2de1be7f..4516f339 100644 --- a/google/api_core/gapic_v1/client_info.py +++ b/google/api_core/gapic_v1/client_info.py @@ -45,6 +45,8 @@ class ClientInfo(client_info.ClientInfo): user_agent (Optional[str]): Prefix to the user agent header. This is used to supply information such as application name or partner tool. Recommended format: ``application-or-tool-ID/major.minor.version``. + rest_version (Optional[str]): A string with labeled versions of the + dependencies used for REST transport. """ def to_grpc_metadata(self): diff --git a/google/api_core/operations_v1/__init__.py b/google/api_core/operations_v1/__init__.py index 8b75426b..52f83fcd 100644 --- a/google/api_core/operations_v1/__init__.py +++ b/google/api_core/operations_v1/__init__.py @@ -25,5 +25,14 @@ "AbstractOperationsClient", "OperationsAsyncClient", "OperationsClient", - "OperationsRestTransport", + "OperationsRestTransport" ] + +try: + from google.api_core.operations_v1.transports.rest_asyncio import OperationsRestAsyncTransport # noqa: F401 + __all__.append("OperationsRestAsyncTransport") +except ImportError: + # This import requires the `async_rest` extra. + # Don't raise an exception if `OperationsRestAsyncTransport` cannot be imported + # as other transports are still available. + pass diff --git a/google/api_core/operations_v1/transports/__init__.py b/google/api_core/operations_v1/transports/__init__.py index df53e15e..2ae79e4e 100644 --- a/google/api_core/operations_v1/transports/__init__.py +++ b/google/api_core/operations_v1/transports/__init__.py @@ -14,16 +14,26 @@ # limitations under the License. # from collections import OrderedDict +from typing import cast, Dict, Tuple from .base import OperationsTransport from .rest import OperationsRestTransport - # Compile a registry of transports. -_transport_registry = OrderedDict() -_transport_registry["rest"] = OperationsRestTransport +_transport_registry: Dict[str, OperationsTransport] = OrderedDict() +_transport_registry["rest"] = cast(OperationsTransport, OperationsRestTransport) + +__all__: Tuple[str, ...] = ("OperationsTransport", "OperationsRestTransport") + +try: + from .rest_asyncio import OperationsRestAsyncTransport -__all__ = ( - "OperationsTransport", - "OperationsRestTransport", -) + __all__ += ("OperationsRestAsyncTransport",) + _transport_registry["rest_asyncio"] = cast( + OperationsTransport, OperationsRestAsyncTransport + ) +except ImportError: + # This import requires the `async_rest` extra. + # Don't raise an exception if `OperationsRestAsyncTransport` cannot be imported + # as other transports are still available. + pass diff --git a/google/api_core/operations_v1/transports/base.py b/google/api_core/operations_v1/transports/base.py index fb1d4fc9..50e13761 100644 --- a/google/api_core/operations_v1/transports/base.py +++ b/google/api_core/operations_v1/transports/base.py @@ -14,6 +14,7 @@ # limitations under the License. # import abc +import re from typing import Awaitable, Callable, Optional, Sequence, Union import google.api_core # type: ignore @@ -25,10 +26,13 @@ from google.auth import credentials as ga_credentials # type: ignore from google.longrunning import operations_pb2 from google.oauth2 import service_account # type: ignore -from google.protobuf import empty_pb2 # type: ignore +import google.protobuf +from google.protobuf import empty_pb2, json_format # type: ignore from grpc import Compression +PROTOBUF_VERSION = google.protobuf.__version__ + DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( gapic_version=version.__version__, ) @@ -45,12 +49,14 @@ def __init__( self, *, host: str = DEFAULT_HOST, + # TODO(https://github.com/googleapis/python-api-core/issues/709): update type hint for credentials to include `google.auth.aio.Credentials`. credentials: Optional[ga_credentials.Credentials] = None, credentials_file: Optional[str] = None, scopes: Optional[Sequence[str]] = None, quota_project_id: Optional[str] = None, client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, always_use_jwt_access: Optional[bool] = False, + url_scheme="https", **kwargs, ) -> None: """Instantiate the transport. @@ -76,10 +82,23 @@ def __init__( your own client library. always_use_jwt_access (Optional[bool]): Whether self signed JWT should be used for service account credentials. + url_scheme: the protocol scheme for the API endpoint. Normally + "https", but for testing or local servers, + "http" can be specified. """ + maybe_url_match = re.match("^(?Phttp(?:s)?://)?(?P.*)$", host) + if maybe_url_match is None: + raise ValueError( + f"Unexpected hostname structure: {host}" + ) # pragma: NO COVER + + url_match_items = maybe_url_match.groupdict() + + host = f"{url_scheme}://{host}" if not url_match_items["scheme"] else host + # Save the hostname. Default to port 443 (HTTPS) if none is specified. if ":" not in host: - host += ":443" + host += ":443" # pragma: NO COVER self._host = host scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} @@ -189,6 +208,37 @@ def close(self): """ raise NotImplementedError() + def _convert_protobuf_message_to_dict( + self, message: google.protobuf.message.Message + ): + r"""Converts protobuf message to a dictionary. + + When the dictionary is encoded to JSON, it conforms to proto3 JSON spec. + + Args: + message(google.protobuf.message.Message): The protocol buffers message + instance to serialize. + + Returns: + A dict representation of the protocol buffer message. + """ + # TODO(https://github.com/googleapis/python-api-core/issues/643): For backwards compatibility + # with protobuf 3.x 4.x, Remove once support for protobuf 3.x and 4.x is dropped. + if PROTOBUF_VERSION[0:2] in ["3.", "4."]: + result = json_format.MessageToDict( + message, + preserving_proto_field_name=True, + including_default_value_fields=True, # type: ignore # backward compatibility + ) + else: + result = json_format.MessageToDict( + message, + preserving_proto_field_name=True, + always_print_fields_with_no_presence=True, + ) + + return result + @property def list_operations( self, diff --git a/google/api_core/operations_v1/transports/rest.py b/google/api_core/operations_v1/transports/rest.py index f37bb344..b0599baa 100644 --- a/google/api_core/operations_v1/transports/rest.py +++ b/google/api_core/operations_v1/transports/rest.py @@ -14,7 +14,6 @@ # limitations under the License. # -import re from typing import Callable, Dict, Optional, Sequence, Tuple, Union from requests import __version__ as requests_version @@ -41,7 +40,7 @@ DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, grpc_version=None, - rest_version=requests_version, + rest_version=f"requests@{requests_version}", ) @@ -123,16 +122,6 @@ def __init__( # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc. # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the # credentials object - maybe_url_match = re.match("^(?Phttp(?:s)?://)?(?P.*)$", host) - if maybe_url_match is None: - raise ValueError( - f"Unexpected hostname structure: {host}" - ) # pragma: NO COVER - - url_match_items = maybe_url_match.groupdict() - - host = f"{url_scheme}://{host}" if not url_match_items["scheme"] else host - super().__init__( host=host, credentials=credentials, @@ -144,6 +133,7 @@ def __init__( ) if client_cert_source_for_mtls: self._session.configure_mtls_channel(client_cert_source_for_mtls) + # TODO(https://github.com/googleapis/python-api-core/issues/720): Add wrap logic directly to the property methods for callables. self._prep_wrapped_messages(client_info) self._http_options = http_options or {} self._path_prefix = path_prefix @@ -206,6 +196,7 @@ def _list_operations( # Send the request headers = dict(metadata) headers["Content-Type"] = "application/json" + # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. response = getattr(self._session, method)( "{host}{uri}".format(host=self._host, uri=uri), timeout=timeout, @@ -282,6 +273,7 @@ def _get_operation( # Send the request headers = dict(metadata) headers["Content-Type"] = "application/json" + # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. response = getattr(self._session, method)( "{host}{uri}".format(host=self._host, uri=uri), timeout=timeout, @@ -351,6 +343,7 @@ def _delete_operation( # Send the request headers = dict(metadata) headers["Content-Type"] = "application/json" + # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. response = getattr(self._session, method)( "{host}{uri}".format(host=self._host, uri=uri), timeout=timeout, @@ -426,6 +419,7 @@ def _cancel_operation( # Send the request headers = dict(metadata) headers["Content-Type"] = "application/json" + # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. response = getattr(self._session, method)( "{host}{uri}".format(host=self._host, uri=uri), timeout=timeout, @@ -441,38 +435,6 @@ def _cancel_operation( return empty_pb2.Empty() - def _convert_protobuf_message_to_dict( - self, message: google.protobuf.message.Message - ): - r"""Converts protobuf message to a dictionary. - - When the dictionary is encoded to JSON, it conforms to proto3 JSON spec. - - Args: - message(google.protobuf.message.Message): The protocol buffers message - instance to serialize. - - Returns: - A dict representation of the protocol buffer message. - """ - # For backwards compatibility with protobuf 3.x 4.x - # Remove once support for protobuf 3.x and 4.x is dropped - # https://github.com/googleapis/python-api-core/issues/643 - if PROTOBUF_VERSION[0:2] in ["3.", "4."]: - result = json_format.MessageToDict( - message, - preserving_proto_field_name=True, - including_default_value_fields=True, # type: ignore # backward compatibility - ) - else: - result = json_format.MessageToDict( - message, - preserving_proto_field_name=True, - always_print_fields_with_no_presence=True, - ) - - return result - @property def list_operations( self, diff --git a/google/api_core/operations_v1/transports/rest_asyncio.py b/google/api_core/operations_v1/transports/rest_asyncio.py new file mode 100644 index 00000000..5c903b90 --- /dev/null +++ b/google/api_core/operations_v1/transports/rest_asyncio.py @@ -0,0 +1,517 @@ +# -*- coding: utf-8 -*- +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +from typing import Any, Callable, Coroutine, Dict, Optional, Sequence, Tuple + +from google.auth import __version__ as auth_version + +try: + from google.auth.aio.transport.sessions import AsyncAuthorizedSession # type: ignore +except ImportError as e: # pragma: NO COVER + raise ImportError( + "The `async_rest` extra of `google-api-core` is required to use long-running operations. Install it by running " + "`pip install google-api-core[async_rest]`." + ) from e + +from google.api_core import exceptions as core_exceptions # type: ignore +from google.api_core import gapic_v1 # type: ignore +from google.api_core import path_template # type: ignore +from google.api_core import rest_helpers # type: ignore +from google.api_core import retry_async as retries_async # type: ignore +from google.auth.aio import credentials as ga_credentials_async # type: ignore +from google.longrunning import operations_pb2 # type: ignore +from google.protobuf import empty_pb2 # type: ignore +from google.protobuf import json_format # type: ignore + +from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO, OperationsTransport + +DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( + gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, + grpc_version=None, + rest_version=f"google-auth@{auth_version}", +) + + +class OperationsRestAsyncTransport(OperationsTransport): + """Asynchronous REST backend transport for Operations. + + Manages async long-running operations with an API service. + + When an API method normally takes long time to complete, it can be + designed to return [Operation][google.api_core.operations_v1.Operation] to the + client, and the client can use this interface to receive the real + response asynchronously by polling the operation resource, or pass + the operation resource to another API (such as Google Cloud Pub/Sub + API) to receive the response. Any API service that returns + long-running operations should implement the ``Operations`` + interface so developers can have a consistent client experience. + + This class defines the same methods as the primary client, so the + primary client can load the underlying transport implementation + and call it. + + It sends JSON representations of protocol buffers over HTTP/1.1 + """ + + def __init__( + self, + *, + host: str = "longrunning.googleapis.com", + credentials: Optional[ga_credentials_async.Credentials] = None, + client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, + always_use_jwt_access: Optional[bool] = False, + url_scheme: str = "https", + http_options: Optional[Dict] = None, + path_prefix: str = "v1", + ) -> None: + """Instantiate the transport. + + Args: + host (Optional[str]): + The hostname to connect to. + credentials (Optional[google.auth.aio.credentials.Credentials]): The + authorization credentials to attach to requests. These + credentials identify the application to the service; if none + are specified, the client will attempt to ascertain the + credentials from the environment. + client_info (google.api_core.gapic_v1.client_info.ClientInfo): + The client info used to send a user-agent string along with + API requests. If ``None``, then default info will be used. + Generally, you only need to set this if you're developing + your own client library. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be used for service account credentials. + url_scheme: the protocol scheme for the API endpoint. Normally + "https", but for testing or local servers, + "http" can be specified. + http_options: a dictionary of http_options for transcoding, to override + the defaults from operations.proto. Each method has an entry + with the corresponding http rules as value. + path_prefix: path prefix (usually represents API version). Set to + "v1" by default. + + """ + super().__init__( + host=host, + # TODO(https://github.com/googleapis/python-api-core/issues/709): Remove `type: ignore` when the linked issue is resolved. + credentials=credentials, # type: ignore + client_info=client_info, + always_use_jwt_access=always_use_jwt_access, + ) + # TODO(https://github.com/googleapis/python-api-core/issues/708): add support for + # `default_host` in AsyncAuthorizedSession for feature parity with the synchronous + # code. + # TODO(https://github.com/googleapis/python-api-core/issues/709): Remove `type: ignore` when the linked issue is resolved. + self._session = AsyncAuthorizedSession(self._credentials) # type: ignore + # TODO(https://github.com/googleapis/python-api-core/issues/720): Add wrap logic directly to the property methods for callables. + self._prep_wrapped_messages(client_info) + self._http_options = http_options or {} + self._path_prefix = path_prefix + + def _prep_wrapped_messages(self, client_info): + # Precompute the wrapped methods. + self._wrapped_methods = { + self.list_operations: gapic_v1.method_async.wrap_method( + self.list_operations, + default_retry=retries_async.AsyncRetry( + initial=0.5, + maximum=10.0, + multiplier=2.0, + predicate=retries_async.if_exception_type( + core_exceptions.ServiceUnavailable, + ), + deadline=10.0, + ), + default_timeout=10.0, + client_info=client_info, + ), + self.get_operation: gapic_v1.method_async.wrap_method( + self.get_operation, + default_retry=retries_async.AsyncRetry( + initial=0.5, + maximum=10.0, + multiplier=2.0, + predicate=retries_async.if_exception_type( + core_exceptions.ServiceUnavailable, + ), + deadline=10.0, + ), + default_timeout=10.0, + client_info=client_info, + ), + self.delete_operation: gapic_v1.method_async.wrap_method( + self.delete_operation, + default_retry=retries_async.AsyncRetry( + initial=0.5, + maximum=10.0, + multiplier=2.0, + predicate=retries_async.if_exception_type( + core_exceptions.ServiceUnavailable, + ), + deadline=10.0, + ), + default_timeout=10.0, + client_info=client_info, + ), + self.cancel_operation: gapic_v1.method_async.wrap_method( + self.cancel_operation, + default_retry=retries_async.AsyncRetry( + initial=0.5, + maximum=10.0, + multiplier=2.0, + predicate=retries_async.if_exception_type( + core_exceptions.ServiceUnavailable, + ), + deadline=10.0, + ), + default_timeout=10.0, + client_info=client_info, + ), + } + + async def _list_operations( + self, + request: operations_pb2.ListOperationsRequest, + *, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + # TODO(https://github.com/googleapis/python-api-core/issues/710): Add coverage and remove `# pragma: NO COVER`. + ) -> operations_pb2.ListOperationsResponse: # pragma: NO COVER + r"""Asynchronously call the list operations method over HTTP. + + Args: + request (~.operations_pb2.ListOperationsRequest): + The request object. The request message for + [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations]. + timeout (float): The timeout for this request. + metadata (Sequence[Tuple[str, str]]): Strings which should be + sent along with the request as metadata. + + Returns: + ~.operations_pb2.ListOperationsResponse: + The response message for + [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations]. + + """ + + http_options = [ + { + "method": "get", + "uri": "/{}/{{name=**}}/operations".format(self._path_prefix), + }, + ] + if "google.longrunning.Operations.ListOperations" in self._http_options: + http_options = self._http_options[ + "google.longrunning.Operations.ListOperations" + ] + + request_kwargs = self._convert_protobuf_message_to_dict(request) + transcoded_request = path_template.transcode(http_options, **request_kwargs) + + uri = transcoded_request["uri"] + method = transcoded_request["method"] + + # Jsonify the query params + query_params_request = operations_pb2.ListOperationsRequest() + json_format.ParseDict(transcoded_request["query_params"], query_params_request) + query_params = json_format.MessageToDict( + query_params_request, + preserving_proto_field_name=False, + use_integers_for_enums=False, + ) + + # Send the request + headers = dict(metadata) + headers["Content-Type"] = "application/json" + # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. + response = await getattr(self._session, method)( + "{host}{uri}".format(host=self._host, uri=uri), + timeout=timeout, + headers=headers, + params=rest_helpers.flatten_query_params(query_params), + ) + content = await response.read() + + # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception + # subclass. + if response.status_code >= 400: + payload = json.loads(content.decode("utf-8")) + request_url = "{host}{uri}".format(host=self._host, uri=uri) + raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore + + # Return the response + api_response = operations_pb2.ListOperationsResponse() + json_format.Parse(content, api_response, ignore_unknown_fields=False) + return api_response + + async def _get_operation( + self, + request: operations_pb2.GetOperationRequest, + *, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + # TODO(https://github.com/googleapis/python-api-core/issues/710): Add coverage and remove `# pragma: NO COVER`. + ) -> operations_pb2.Operation: # pragma: NO COVER + r"""Asynchronously call the get operation method over HTTP. + + Args: + request (~.operations_pb2.GetOperationRequest): + The request object. The request message for + [Operations.GetOperation][google.api_core.operations_v1.Operations.GetOperation]. + timeout (float): The timeout for this request. + metadata (Sequence[Tuple[str, str]]): Strings which should be + sent along with the request as metadata. + + Returns: + ~.operations_pb2.Operation: + This resource represents a long- + running operation that is the result of a + network API call. + + """ + + http_options = [ + { + "method": "get", + "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix), + }, + ] + if "google.longrunning.Operations.GetOperation" in self._http_options: + http_options = self._http_options[ + "google.longrunning.Operations.GetOperation" + ] + + request_kwargs = self._convert_protobuf_message_to_dict(request) + transcoded_request = path_template.transcode(http_options, **request_kwargs) + + uri = transcoded_request["uri"] + method = transcoded_request["method"] + + # Jsonify the query params + query_params_request = operations_pb2.GetOperationRequest() + json_format.ParseDict(transcoded_request["query_params"], query_params_request) + query_params = json_format.MessageToDict( + query_params_request, + preserving_proto_field_name=False, + use_integers_for_enums=False, + ) + + # Send the request + headers = dict(metadata) + headers["Content-Type"] = "application/json" + # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. + response = await getattr(self._session, method)( + "{host}{uri}".format(host=self._host, uri=uri), + timeout=timeout, + headers=headers, + params=rest_helpers.flatten_query_params(query_params), + ) + content = await response.read() + + # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception + # subclass. + if response.status_code >= 400: + payload = json.loads(content.decode("utf-8")) + request_url = "{host}{uri}".format(host=self._host, uri=uri) + raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore + + # Return the response + api_response = operations_pb2.Operation() + json_format.Parse(content, api_response, ignore_unknown_fields=False) + return api_response + + async def _delete_operation( + self, + request: operations_pb2.DeleteOperationRequest, + *, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + # TODO(https://github.com/googleapis/python-api-core/issues/710): Add coverage and remove `# pragma: NO COVER`. + ) -> empty_pb2.Empty: # pragma: NO COVER + r"""Asynchronously call the delete operation method over HTTP. + + Args: + request (~.operations_pb2.DeleteOperationRequest): + The request object. The request message for + [Operations.DeleteOperation][google.api_core.operations_v1.Operations.DeleteOperation]. + + retry (google.api_core.retry.Retry): Designation of what errors, if any, + should be retried. + timeout (float): The timeout for this request. + metadata (Sequence[Tuple[str, str]]): Strings which should be + sent along with the request as metadata. + """ + + http_options = [ + { + "method": "delete", + "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix), + }, + ] + if "google.longrunning.Operations.DeleteOperation" in self._http_options: + http_options = self._http_options[ + "google.longrunning.Operations.DeleteOperation" + ] + + request_kwargs = self._convert_protobuf_message_to_dict(request) + transcoded_request = path_template.transcode(http_options, **request_kwargs) + + uri = transcoded_request["uri"] + method = transcoded_request["method"] + + # Jsonify the query params + query_params_request = operations_pb2.DeleteOperationRequest() + json_format.ParseDict(transcoded_request["query_params"], query_params_request) + query_params = json_format.MessageToDict( + query_params_request, + preserving_proto_field_name=False, + use_integers_for_enums=False, + ) + + # Send the request + headers = dict(metadata) + headers["Content-Type"] = "application/json" + # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. + response = await getattr(self._session, method)( + "{host}{uri}".format(host=self._host, uri=uri), + timeout=timeout, + headers=headers, + params=rest_helpers.flatten_query_params(query_params), + ) + + # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception + # subclass. + if response.status_code >= 400: + content = await response.read() + payload = json.loads(content.decode("utf-8")) + request_url = "{host}{uri}".format(host=self._host, uri=uri) + raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore + + return empty_pb2.Empty() + + async def _cancel_operation( + self, + request: operations_pb2.CancelOperationRequest, + *, + timeout: Optional[float] = None, + metadata: Sequence[Tuple[str, str]] = (), + # TODO(https://github.com/googleapis/python-api-core/issues/710): Add coverage and remove `# pragma: NO COVER`. + ) -> empty_pb2.Empty: # pragma: NO COVER + r"""Asynchronously call the cancel operation method over HTTP. + + Args: + request (~.operations_pb2.CancelOperationRequest): + The request object. The request message for + [Operations.CancelOperation][google.api_core.operations_v1.Operations.CancelOperation]. + timeout (float): The timeout for this request. + metadata (Sequence[Tuple[str, str]]): Strings which should be + sent along with the request as metadata. + """ + + http_options = [ + { + "method": "post", + "uri": "/{}/{{name=**/operations/*}}:cancel".format(self._path_prefix), + "body": "*", + }, + ] + if "google.longrunning.Operations.CancelOperation" in self._http_options: + http_options = self._http_options[ + "google.longrunning.Operations.CancelOperation" + ] + + request_kwargs = self._convert_protobuf_message_to_dict(request) + transcoded_request = path_template.transcode(http_options, **request_kwargs) + + # Jsonify the request body + body_request = operations_pb2.CancelOperationRequest() + json_format.ParseDict(transcoded_request["body"], body_request) + body = json_format.MessageToDict( + body_request, + preserving_proto_field_name=False, + use_integers_for_enums=False, + ) + uri = transcoded_request["uri"] + method = transcoded_request["method"] + + # Jsonify the query params + query_params_request = operations_pb2.CancelOperationRequest() + json_format.ParseDict(transcoded_request["query_params"], query_params_request) + query_params = json_format.MessageToDict( + query_params_request, + preserving_proto_field_name=False, + use_integers_for_enums=False, + ) + + # Send the request + headers = dict(metadata) + headers["Content-Type"] = "application/json" + # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name. + response = await getattr(self._session, method)( + "{host}{uri}".format(host=self._host, uri=uri), + timeout=timeout, + headers=headers, + params=rest_helpers.flatten_query_params(query_params), + data=body, + ) + + # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception + # subclass. + if response.status_code >= 400: + content = await response.read() + payload = json.loads(content.decode("utf-8")) + request_url = "{host}{uri}".format(host=self._host, uri=uri) + raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore + + return empty_pb2.Empty() + + @property + def list_operations( + self, + ) -> Callable[ + [operations_pb2.ListOperationsRequest], + Coroutine[Any, Any, operations_pb2.ListOperationsResponse], + ]: + return self._list_operations + + @property + def get_operation( + self, + ) -> Callable[ + [operations_pb2.GetOperationRequest], + Coroutine[Any, Any, operations_pb2.Operation], + ]: + return self._get_operation + + @property + def delete_operation( + self, + ) -> Callable[ + [operations_pb2.DeleteOperationRequest], Coroutine[Any, Any, empty_pb2.Empty] + ]: + return self._delete_operation + + @property + def cancel_operation( + self, + ) -> Callable[ + [operations_pb2.CancelOperationRequest], Coroutine[Any, Any, empty_pb2.Empty] + ]: + return self._cancel_operation + + +__all__ = ("OperationsRestAsyncTransport",) diff --git a/tests/unit/operations_v1/test_operations_rest_client.py b/tests/unit/operations_v1/test_operations_rest_client.py index 4ab4f1f7..556c35c3 100644 --- a/tests/unit/operations_v1/test_operations_rest_client.py +++ b/tests/unit/operations_v1/test_operations_rest_client.py @@ -31,7 +31,6 @@ from google.api_core.operations_v1 import AbstractOperationsClient from google.api_core.operations_v1 import pagers from google.api_core.operations_v1 import transports -import google.auth from google.auth import credentials as ga_credentials from google.auth.exceptions import MutualTLSChannelError from google.longrunning import operations_pb2 @@ -39,6 +38,33 @@ from google.protobuf import json_format # type: ignore from google.rpc import status_pb2 # type: ignore +try: + import aiohttp # noqa: F401 + import google.auth.aio.transport + from google.auth.aio import credentials as ga_credentials_async + + GOOGLE_AUTH_AIO_INSTALLED = True +except ImportError: + GOOGLE_AUTH_AIO_INSTALLED = False + +if GOOGLE_AUTH_AIO_INSTALLED: + TEST_TRANSPORT_CREDS_PARAMS = [ + ( + transports.OperationsRestTransport, + ga_credentials.AnonymousCredentials(), + ), + ( + transports.OperationsRestAsyncTransport, + ga_credentials_async.AnonymousCredentials(), + ), + ] +else: + TEST_TRANSPORT_CREDS_PARAMS = [ + ( + transports.OperationsRestTransport, + ga_credentials.AnonymousCredentials(), + ) + ] HTTP_OPTIONS = { "google.longrunning.Operations.CancelOperation": [ @@ -125,11 +151,14 @@ def test_operations_client_from_service_account_info(client_class): @pytest.mark.parametrize( - "transport_class,transport_name", [(transports.OperationsRestTransport, "rest")] + "transport_class", + [ + transports.OperationsRestTransport, + # TODO(https://github.com/googleapis/python-api-core/issues/706): Add support for + # service account credentials in transports.OperationsRestAsyncTransport + ], ) -def test_operations_client_service_account_always_use_jwt( - transport_class, transport_name -): +def test_operations_client_service_account_always_use_jwt(transport_class): with mock.patch.object( service_account.Credentials, "with_always_use_jwt_access", create=True ) as use_jwt: @@ -712,11 +741,14 @@ def test_transport_instance(): assert client.transport is transport -@pytest.mark.parametrize("transport_class", [transports.OperationsRestTransport]) -def test_transport_adc(transport_class): +@pytest.mark.parametrize( + "transport_class,credentials", + TEST_TRANSPORT_CREDS_PARAMS, +) +def test_transport_adc(transport_class, credentials): # Test default credentials are used if not provided. with mock.patch.object(google.auth, "default") as adc: - adc.return_value = (ga_credentials.AnonymousCredentials(), None) + adc.return_value = (credentials, None) transport_class() adc.assert_called_once() @@ -800,12 +832,21 @@ def test_operations_auth_adc(): ) -def test_operations_http_transport_client_cert_source_for_mtls(): +# TODO(https://github.com/googleapis/python-api-core/issues/705): Add +# testing for `transports.OperationsRestAsyncTransport` once MTLS is supported +# in `google.auth.aio.transport`. +@pytest.mark.parametrize( + "transport_class", + [ + transports.OperationsRestTransport, + ], +) +def test_operations_http_transport_client_cert_source_for_mtls(transport_class): cred = ga_credentials.AnonymousCredentials() with mock.patch( "google.auth.transport.requests.AuthorizedSession.configure_mtls_channel" ) as mock_configure_mtls_channel: - transports.OperationsRestTransport( + transport_class( credentials=cred, client_cert_source_for_mtls=client_cert_source_callback ) mock_configure_mtls_channel.assert_called_once_with(client_cert_source_callback)