diff --git a/google/cloud/bigquery/async_client.py b/google/cloud/bigquery/async_client.py index 67550966b..08f0f4d1e 100644 --- a/google/cloud/bigquery/async_client.py +++ b/google/cloud/bigquery/async_client.py @@ -2,7 +2,6 @@ from google.cloud.bigquery import _job_helpers from google.cloud.bigquery import table import asyncio -from google.api_core import gapic_v1, retry_async class AsyncClient(Client): def __init__(self, *args, **kwargs): @@ -110,44 +109,27 @@ async def do_query(): request_body["requestId"] = _job_helpers.make_job_id() span_attributes = {"path": path} - # Wrap the RPC method; this adds retry and timeout information, - # and friendly error handling. - rpc = gapic_v1.method_async.wrap_method( - client._call_api, - default_retry=retry_async.AsyncRetry( - initial=0.1, - maximum=60.0, - multiplier=1.3, - predicate=retries.if_exception_type( - core_exceptions.ServiceUnavailable, - ), - deadline=60.0, - ), - default_timeout=60.0, - client_info=DEFAULT_CLIENT_INFO, - ) - # For easier testing, handle the retries ourselves. - # if retry is not None: - # response = retry(client._call_api)( - # retry=None, # We're calling the retry decorator ourselves. - # span_name="BigQuery.query", - # span_attributes=span_attributes, - # method="POST", - # path=path, - # data=request_body, - # timeout=api_timeout, - # ) - # else: - response = await rpc( - retry=None, - span_name="BigQuery.query", - span_attributes=span_attributes, - method="POST", - path=path, - data=request_body, - timeout=api_timeout, - ) + if retry is not None: + response = retry(client._call_api)( + retry=None, # We're calling the retry decorator ourselves. + span_name="BigQuery.query", + span_attributes=span_attributes, + method="POST", + path=path, + data=request_body, + timeout=api_timeout, + ) + else: + response = client._call_api( + retry=None, + span_name="BigQuery.query", + span_attributes=span_attributes, + method="POST", + path=path, + data=request_body, + timeout=api_timeout, + ) # Even if we run with JOB_CREATION_OPTIONAL, if there are more pages # to fetch, there will be a job ID for jobs.getQueryResults. @@ -186,12 +168,11 @@ async def do_query(): project=query_results.project, num_dml_affected_rows=query_results.num_dml_affected_rows, ) - if job_retry is not None: return job_retry(do_query)() else: - return do_query() + return await do_query() async def async_wait_or_cancel( job: job.QueryJob, @@ -215,11 +196,4 @@ async def async_wait_or_cancel( except Exception: # Don't eat the original exception if cancel fails. pass - raise - - -DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( - "3.17.2" -) - -__all__ = ("AsyncClient",) \ No newline at end of file + raise \ No newline at end of file