Skip to content

Commit

Permalink
Merge pull request #887 from dlt-hub/852-cannot-create-partitioned-ta…
Browse files Browse the repository at this point in the history
…ble-in-bigquery-destination

BigQuery Partitioning Improvements
  • Loading branch information
sh-rp authored Jan 18, 2024
2 parents 77b070d + be60ecf commit 3921d61
Show file tree
Hide file tree
Showing 9 changed files with 485 additions and 212 deletions.
4 changes: 2 additions & 2 deletions dlt/destinations/impl/bigquery/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Loader account setup

1. Create new services account, add private key to it and download the `services.json` file
1. Create a new services account, add private key to it and download the `services.json` file
2. Make sure that this newly created account has access to BigQuery API
3. You must add following roles to the account above: `BigQuery Data Editor`, `BigQuey Job User` and `BigQuery Read Session User` (storage API)
3. You must add the following roles to the account above: `BigQuery Data Editor`, `BigQuey Job User` and `BigQuery Read Session User` (storage API)
4. IAM to add roles is here https://console.cloud.google.com/iam-admin/iam?project=chat-analytics-rasa-ci
147 changes: 76 additions & 71 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import os
from pathlib import Path
from typing import ClassVar, Dict, Optional, Sequence, Tuple, List, cast, Type, Any
from typing import ClassVar, Optional, Sequence, Tuple, List, cast, Any

import google.cloud.bigquery as bigquery # noqa: I250
from google.cloud import exceptions as gcp_exceptions
from google.api_core import exceptions as api_core_exceptions
from google.cloud import exceptions as gcp_exceptions

from dlt.common import json, logger
from dlt.common.destination import DestinationCapabilitiesContext
Expand All @@ -14,30 +15,26 @@
LoadJob,
SupportsStagingDestination,
)
from dlt.common.data_types import TDataType
from dlt.common.storages.file_storage import FileStorage
from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns
from dlt.common.schema.typing import TTableSchema, TColumnType, TTableFormat
from dlt.common.schema.exceptions import UnknownTableException

from dlt.destinations.job_client_impl import SqlJobClientWithStaging
from dlt.common.schema.typing import TTableSchema, TColumnType, TTableFormat
from dlt.common.schema.utils import table_schema_has_type
from dlt.common.storages.file_storage import FileStorage
from dlt.destinations.exceptions import (
DestinationSchemaWillNotUpdate,
DestinationTransientException,
LoadJobNotExistsException,
LoadJobTerminalException,
)

from dlt.destinations.impl.bigquery import capabilities
from dlt.destinations.impl.bigquery.configuration import BigQueryClientConfiguration
from dlt.destinations.impl.bigquery.sql_client import BigQuerySqlClient, BQ_TERMINAL_REASONS
from dlt.destinations.sql_jobs import SqlMergeJob, SqlStagingCopyJob, SqlJobParams
from dlt.destinations.job_client_impl import SqlJobClientWithStaging
from dlt.destinations.job_impl import NewReferenceJob
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.sql_jobs import SqlMergeJob, SqlStagingCopyJob, SqlJobParams
from dlt.destinations.type_mapping import TypeMapper

from dlt.common.schema.utils import table_schema_has_type


class BigQueryTypeMapper(TypeMapper):
sct_to_unbound_dbt = {
Expand All @@ -49,7 +46,7 @@ class BigQueryTypeMapper(TypeMapper):
"timestamp": "TIMESTAMP",
"bigint": "INTEGER",
"binary": "BYTES",
"wei": "BIGNUMERIC", # non parametrized should hold wei values
"wei": "BIGNUMERIC", # non-parametrized should hold wei values
"time": "TIME",
}

Expand All @@ -73,12 +70,12 @@ class BigQueryTypeMapper(TypeMapper):
"TIME": "time",
}

# noinspection PyTypeChecker,PydanticTypeChecker
def from_db_type(
self, db_type: str, precision: Optional[int], scale: Optional[int]
) -> TColumnType:
if db_type == "BIGNUMERIC":
if precision is None: # biggest numeric possible
return dict(data_type="wei")
if db_type == "BIGNUMERIC" and precision is None:
return dict(data_type="wei")
return super().from_db_type(db_type, precision, scale)


Expand All @@ -96,29 +93,24 @@ def __init__(
super().__init__(file_name)

def state(self) -> TLoadJobState:
# check server if done
done = self.bq_load_job.done(retry=self.default_retry, timeout=self.http_timeout)
if done:
# rows processed
if self.bq_load_job.output_rows is not None and self.bq_load_job.error_result is None:
return "completed"
else:
reason = self.bq_load_job.error_result.get("reason")
if reason in BQ_TERMINAL_REASONS:
# the job permanently failed for the reason above
return "failed"
elif reason in ["internalError"]:
logger.warning(
f"Got reason {reason} for job {self.file_name}, job considered still"
f" running. ({self.bq_load_job.error_result})"
)
# status of the job could not be obtained, job still running
return "running"
else:
# retry on all other reasons, including `backendError` which requires retry when the job is done
return "retry"
else:
if not self.bq_load_job.done(retry=self.default_retry, timeout=self.http_timeout):
return "running"
if self.bq_load_job.output_rows is not None and self.bq_load_job.error_result is None:
return "completed"
reason = self.bq_load_job.error_result.get("reason")
if reason in BQ_TERMINAL_REASONS:
# the job permanently failed for the reason above
return "failed"
elif reason in ["internalError"]:
logger.warning(
f"Got reason {reason} for job {self.file_name}, job considered still"
f" running. ({self.bq_load_job.error_result})"
)
# the status of the job couldn't be obtained, job still running
return "running"
else:
# retry on all other reasons, including `backendError` which requires retry when the job is done
return "retry"

def bigquery_job_id(self) -> str:
return BigQueryLoadJob.get_job_id_from_file_path(super().file_name())
Expand Down Expand Up @@ -149,13 +141,11 @@ def gen_key_table_clauses(
key_clauses: Sequence[str],
for_delete: bool,
) -> List[str]:
# generate several clauses: BigQuery does not support OR nor unions
sql: List[str] = []
for clause in key_clauses:
sql.append(
f"FROM {root_table_name} AS d WHERE EXISTS (SELECT 1 FROM"
f" {staging_root_table_name} AS s WHERE {clause.format(d='d', s='s')})"
)
sql: List[str] = [
f"FROM {root_table_name} AS d WHERE EXISTS (SELECT 1 FROM {staging_root_table_name} AS"
f" s WHERE {clause.format(d='d', s='s')})"
for clause in key_clauses
]
return sql


Expand All @@ -172,10 +162,12 @@ def generate_sql(
with sql_client.with_staging_dataset(staging=True):
staging_table_name = sql_client.make_qualified_table_name(table["name"])
table_name = sql_client.make_qualified_table_name(table["name"])
# drop destination table
sql.append(f"DROP TABLE IF EXISTS {table_name};")
# recreate destination table with data cloned from staging table
sql.append(f"CREATE TABLE {table_name} CLONE {staging_table_name};")
sql.extend(
(
f"DROP TABLE IF EXISTS {table_name};",
f"CREATE TABLE {table_name} CLONE {staging_table_name};",
)
)
return sql


Expand Down Expand Up @@ -208,7 +200,8 @@ def _create_replace_followup_jobs(
def restore_file_load(self, file_path: str) -> LoadJob:
"""Returns a completed SqlLoadJob or restored BigQueryLoadJob
See base class for details on SqlLoadJob. BigQueryLoadJob is restored with job id derived from `file_path`
See base class for details on SqlLoadJob.
BigQueryLoadJob is restored with a job id derived from `file_path`
Args:
file_path (str): a path to a job file
Expand All @@ -228,11 +221,13 @@ def restore_file_load(self, file_path: str) -> LoadJob:
except api_core_exceptions.GoogleAPICallError as gace:
reason = BigQuerySqlClient._get_reason_from_errors(gace)
if reason == "notFound":
raise LoadJobNotExistsException(file_path)
raise LoadJobNotExistsException(file_path) from gace
elif reason in BQ_TERMINAL_REASONS:
raise LoadJobTerminalException(file_path, f"The server reason was: {reason}")
raise LoadJobTerminalException(
file_path, f"The server reason was: {reason}"
) from gace
else:
raise DestinationTransientException(gace)
raise DestinationTransientException(gace) from gace
return job

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
Expand All @@ -250,15 +245,17 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
reason = BigQuerySqlClient._get_reason_from_errors(gace)
if reason == "notFound":
# google.api_core.exceptions.NotFound: 404 - table not found
raise UnknownTableException(table["name"])
raise UnknownTableException(table["name"]) from gace
elif reason == "duplicate":
# google.api_core.exceptions.Conflict: 409 PUT - already exists
return self.restore_file_load(file_path)
elif reason in BQ_TERMINAL_REASONS:
# google.api_core.exceptions.BadRequest - will not be processed ie bad job name
raise LoadJobTerminalException(file_path, f"The server reason was: {reason}")
raise LoadJobTerminalException(
file_path, f"The server reason was: {reason}"
) from gace
else:
raise DestinationTransientException(gace)
raise DestinationTransientException(gace) from gace
return job

def _get_table_update_sql(
Expand All @@ -274,23 +271,31 @@ def _get_table_update_sql(
cluster_list = [
self.capabilities.escape_identifier(c["name"]) for c in new_columns if c.get("cluster")
]
partition_list = [
self.capabilities.escape_identifier(c["name"])
for c in new_columns
if c.get("partition")
]

# partition by must be added first
if len(partition_list) > 0:
if partition_list := [c for c in new_columns if c.get("partition")]:
if len(partition_list) > 1:
col_names = [self.capabilities.escape_identifier(c["name"]) for c in partition_list]
raise DestinationSchemaWillNotUpdate(
canonical_name, partition_list, "Partition requested for more than one column"
canonical_name, col_names, "Partition requested for more than one column"
)
else:
sql[0] = sql[0] + f"\nPARTITION BY DATE({partition_list[0]})"
if len(cluster_list) > 0:
elif (c := partition_list[0])["data_type"] == "date":
sql[0] = f"{sql[0]}\nPARTITION BY {self.capabilities.escape_identifier(c['name'])}"
elif (c := partition_list[0])["data_type"] == "timestamp":
sql[0] = (
f"{sql[0]}\nPARTITION BY DATE({self.capabilities.escape_identifier(c['name'])})"
)
# Automatic partitioning of an INT64 type requires us to be prescriptive - we treat the column as a UNIX timestamp.
# This is due to the bounds requirement of GENERATE_ARRAY function for partitioning.
# The 10,000 partitions limit makes it infeasible to cover the entire `bigint` range.
# The array bounds, with daily partitions (86400 seconds in a day), are somewhat arbitrarily chosen.
# See: https://dlthub.com/devel/dlt-ecosystem/destinations/bigquery#supported-column-hints
elif (c := partition_list[0])["data_type"] == "bigint":
sql[0] = (
f"{sql[0]}\nPARTITION BY"
f" RANGE_BUCKET({self.capabilities.escape_identifier(c['name'])},"
" GENERATE_ARRAY(-172800000, 691200000, 86400))"
)
if cluster_list:
sql[0] = sql[0] + "\nCLUSTER BY " + ",".join(cluster_list)

return sql

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
Expand Down Expand Up @@ -329,14 +334,14 @@ def _create_load_job(self, table: TTableSchema, file_path: str) -> bigquery.Load
# append to table for merge loads (append to stage) and regular appends
table_name = table["name"]

# determine wether we load from local or uri
# determine whether we load from local or uri
bucket_path = None
ext: str = os.path.splitext(file_path)[1][1:]
if NewReferenceJob.is_reference_job(file_path):
bucket_path = NewReferenceJob.resolve_reference(file_path)
ext = os.path.splitext(bucket_path)[1][1:]

# choose correct source format
# choose a correct source format
source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
decimal_target_types: List[str] = None
if ext == "parquet":
Expand All @@ -347,7 +352,7 @@ def _create_load_job(self, table: TTableSchema, file_path: str) -> bigquery.Load
"Bigquery cannot load into JSON data type from parquet. Use jsonl instead.",
)
source_format = bigquery.SourceFormat.PARQUET
# parquet needs NUMERIC type autodetection
# parquet needs NUMERIC type auto-detection
decimal_target_types = ["NUMERIC", "BIGNUMERIC"]

job_id = BigQueryLoadJob.get_job_id_from_file_path(file_path)
Expand Down
16 changes: 14 additions & 2 deletions dlt/destinations/impl/bigquery/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,17 @@ def __init__(
file_upload_timeout: float = 30 * 60.0,
retry_deadline: float = 60.0,
destination_name: str = None,
environment: str = None,
) -> None: ...
environment: str = None
) -> None:
super().__init__(
credentials=credentials,
dataset_name=dataset_name,
default_schema_name=default_schema_name,
destination_name=destination_name,
environment=environment,
)
self.retry_deadline = retry_deadline
self.file_upload_timeout = file_upload_timeout
self.http_timeout = http_timeout
self.location = location
...
1 change: 1 addition & 0 deletions dlt/destinations/impl/bigquery/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dlt.destinations.impl.bigquery.bigquery import BigQueryClient


# noinspection PyPep8Naming
class bigquery(Destination[BigQueryClientConfiguration, "BigQueryClient"]):
spec = BigQueryClientConfiguration

Expand Down
Loading

0 comments on commit 3921d61

Please sign in to comment.