Skip to content

Commit

Permalink
Merged bigquery.py from Spotify's Luigi repository
Browse files Browse the repository at this point in the history
  • Loading branch information
zee2theodd committed Oct 28, 2024
1 parent cf4b6fb commit 66e4732
Showing 1 changed file with 75 additions and 4 deletions.
79 changes: 75 additions & 4 deletions luigi/contrib/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,38 @@
import time
from luigi.contrib import gcp

from tenacity import retry
from tenacity import retry_if_exception
from tenacity import retry_if_exception_type
from tenacity import wait_exponential
from tenacity import stop_after_attempt

logger = logging.getLogger('luigi-interface')

RETRYABLE_ERRORS = None
try:
import httplib2
from googleapiclient import discovery
from googleapiclient import errors
from googleapiclient import http
except ImportError:
logger.warning('BigQuery module imported, but google-api-python-client is '
'not installed. Any BigQuery task will fail')
else:
RETRYABLE_ERRORS = (httplib2.HttpLib2Error, IOError, TimeoutError, BrokenPipeError)


# Retry configurations. For more details, see https://tenacity.readthedocs.io/en/latest/
def is_error_5xx(err):
return isinstance(err, errors.HttpError) and err.resp.status >= 500


bq_retry = retry(retry=(retry_if_exception(is_error_5xx) | retry_if_exception_type(RETRYABLE_ERRORS)),
wait=wait_exponential(multiplier=1, min=1, max=10),
stop=stop_after_attempt(3),
reraise=True,
after=lambda x: x.args[0]._initialise_client()
)


class CreateDisposition:
Expand All @@ -52,6 +76,7 @@ class SourceFormat:
CSV = 'CSV'
DATASTORE_BACKUP = 'DATASTORE_BACKUP'
NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON'
PARQUET = 'PARQUET'


class FieldDelimiter:
Expand Down Expand Up @@ -121,13 +146,23 @@ class BigQueryClient:
"""

def __init__(self, oauth_credentials=None, descriptor='', http_=None):
authenticate_kwargs = gcp.get_authenticate_kwargs(oauth_credentials, http_)
# Save initialisation arguments in case we need to re-create client
# due to connection timeout
self.oauth_credentials = oauth_credentials
self.descriptor = descriptor
self.http_ = http_

self._initialise_client()

if descriptor:
self.client = discovery.build_from_document(descriptor, **authenticate_kwargs)
def _initialise_client(self):
authenticate_kwargs = gcp.get_authenticate_kwargs(self.oauth_credentials, self.http_)

if self.descriptor:
self.client = discovery.build_from_document(self.descriptor, **authenticate_kwargs)
else:
self.client = discovery.build('bigquery', 'v2', cache_discovery=False, **authenticate_kwargs)

@bq_retry
def dataset_exists(self, dataset):
"""Returns whether the given dataset exists.
If regional location is specified for the dataset, that is also checked
Expand All @@ -146,14 +181,14 @@ def dataset_exists(self, dataset):
raise Exception('''Dataset already exists with regional location {}. Can't use {}.'''.format(
fetched_location if fetched_location is not None else 'unspecified',
dataset.location))

except http.HttpError as ex:
if ex.resp.status == 404:
return False
raise

return True

@bq_retry
def table_exists(self, table):
"""Returns whether the given table exists.
Expand Down Expand Up @@ -527,6 +562,16 @@ def allow_quoted_new_lines(self):
""" Indicates if BigQuery should allow quoted data sections that contain newline characters in a CSV file. The default value is false."""
return False

def configure_job(self, configuration):
"""Set additional job configuration.
This allows to specify job configuration parameters that are not exposed via Task properties.
:param configuration: Current configuration.
:return: New or updated configuration.
"""
return configuration

def run(self):
output = self.output()
assert isinstance(output, BigQueryTarget), 'Output must be a BigQueryTarget, not %s' % (output)
Expand Down Expand Up @@ -565,6 +610,8 @@ def run(self):
else:
job['configuration']['load']['autodetect'] = True

job['configuration'] = self.configure_job(job['configuration'])

bq_client.run_job(output.table.project_id, job, dataset=output.table.dataset)


Expand Down Expand Up @@ -610,6 +657,16 @@ def use_legacy_sql(self):
"""
return True

def configure_job(self, configuration):
"""Set additional job configuration.
This allows to specify job configuration parameters that are not exposed via Task properties.
:param configuration: Current configuration.
:return: New or updated configuration.
"""
return configuration

def run(self):
output = self.output()
assert isinstance(output, BigQueryTarget), 'Output must be a BigQueryTarget, not %s' % (output)
Expand Down Expand Up @@ -643,6 +700,8 @@ def run(self):
}
}

job['configuration'] = self.configure_job(job['configuration'])

bq_client.run_job(output.table.project_id, job, dataset=output.table.dataset)


Expand Down Expand Up @@ -739,6 +798,16 @@ def compression(self):
"""Whether to use compression."""
return Compression.NONE

def configure_job(self, configuration):
"""Set additional job configuration.
This allows to specify job configuration parameters that are not exposed via Task properties.
:param configuration: Current configuration.
:return: New or updated configuration.
"""
return configuration

def run(self):
input = luigi.task.flatten(self.input())[0]
assert (
Expand Down Expand Up @@ -775,6 +844,8 @@ def run(self):
job['configuration']['extract']['fieldDelimiter'] = \
self.field_delimiter

job['configuration'] = self.configure_job(job['configuration'])

bq_client.run_job(
input.table.project_id,
job,
Expand Down

0 comments on commit 66e4732

Please sign in to comment.