Skip to content

Commit

Permalink
Allow setting gcs_tmp_bucket on GoogleBigQuery class
Browse files Browse the repository at this point in the history
This parallels more closely the set up of the Redshift class. It also
allows for a more simple implementation when using multiple different
instances of GoogleBigQuery in the same environment.
  • Loading branch information
austinweisgrau committed Oct 10, 2024
1 parent 1642d6e commit 764bc71
Showing 1 changed file with 21 additions and 5 deletions.
26 changes: 21 additions & 5 deletions parsons/google/google_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class GoogleBigQuery(DatabaseConnector):
A dictionary containing any requested client options. Defaults to the required
scopes for making API calls against External tables stored in Google Drive.
Can be set to None if these permissions are not desired
gcs_temp_bucket: str
Name of the GCS bucket that will be used for storing data during bulk transfers.
Required if you intend to perform bulk data transfers (eg. the copy_from_gcs method),
and env variable ``GCS_TEMP_BUCKET`` is not populated.
"""

def __init__(
Expand All @@ -157,6 +161,7 @@ def __init__(
"https://www.googleapis.com/auth/cloud-platform",
]
},
tmp_gcs_bucket: Optional[str] = None,
):
self.app_creds = app_creds

Expand All @@ -172,6 +177,7 @@ def __init__(
self.project = project
self.location = location
self.client_options = client_options
self.tmp_gcs_bucket = tmp_gcs_bucket

# We will not create the client until we need to use it, since creating the client
# without valid GOOGLE_APPLICATION_CREDENTIALS raises an exception.
Expand Down Expand Up @@ -684,7 +690,8 @@ def copy_s3(
The GoogleCloudStorage Connector to use for loading data into Google Cloud Storage.
tmp_gcs_bucket: str
The name of the Google Cloud Storage bucket to use to stage the data to load
into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified.
into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified or set on
the class instance.
template_table: str
Table name to be used as the load schema. Load operation wil use the same
columns and data types as the template table.
Expand All @@ -700,8 +707,12 @@ def copy_s3(
"""

# copy from S3 to GCS
tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)
gcs_client = gcs_client or GoogleCloudStorage(app_creds=self.app_creds)
tmp_gcs_bucket = (
tmp_gcs_bucket
or self.tmp_gcs_bucket
or check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)
)
gcs_client = gcs_client or GoogleCloudStorage()
temp_blob_uri = gcs_client.copy_s3_to_gcs(
aws_source_bucket=bucket,
aws_access_key_id=aws_access_key_id,
Expand Down Expand Up @@ -767,7 +778,8 @@ def copy(
the job fails.
tmp_gcs_bucket: str
The name of the Google Cloud Storage bucket to use to stage the data to load
into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified.
into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified or set on
the class instance.
gcs_client: object
The GoogleCloudStorage Connector to use for loading data into Google Cloud Storage.
job_config: object
Expand All @@ -783,7 +795,11 @@ def copy(
client.
"""
data_type = "csv"
tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)
tmp_gcs_bucket = (
tmp_gcs_bucket
or self.tmp_gcs_bucket
or check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)
)
if not tmp_gcs_bucket:
raise ValueError(
"Must set GCS_TEMP_BUCKET environment variable or pass in tmp_gcs_bucket parameter"
Expand Down

0 comments on commit 764bc71

Please sign in to comment.