Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery Partitioning Improvements #887

Merged
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
f8402ca
Add tests (#852)
Pipboyguy Jan 6, 2024
b47feca
Merge remote-tracking branch 'origin/devel' into 852-cannot-create-pa…
Pipboyguy Jan 8, 2024
0822d32
Optimise imports (#852)
Pipboyguy Jan 8, 2024
17971ce
Minor changes (#852)
Pipboyguy Jan 8, 2024
0a95700
Amend for clarity, spelling etc (#852)
Pipboyguy Jan 8, 2024
fb32ada
Update field type coercion with more specific examples
sultaniman Jan 9, 2024
7e039cc
Fix typo
sultaniman Jan 9, 2024
c9f9335
Adjust explanation
sultaniman Jan 9, 2024
e3bf10f
Add encouragement for experimentation
sultaniman Jan 9, 2024
9004f9c
Update docs/website/docs/general-usage/schema.md
sultaniman Jan 9, 2024
1d5d4d2
Update table column title naming
sultaniman Jan 9, 2024
1b751b2
Adjust data to avoid naming confusion
sultaniman Jan 9, 2024
4612906
Adjust explanation phrasing
sultaniman Jan 9, 2024
618659d
Revert column name
sultaniman Jan 9, 2024
5ec87a6
Refactor and improve documentation grammar (#814)
Pipboyguy Jan 9, 2024
393f631
Refactor BigQuery and tests for clarity and correctness (#852)
Pipboyguy Jan 9, 2024
41f0bb1
Add tests (#852)
Pipboyguy Jan 6, 2024
8b927c3
Optimise imports (#852)
Pipboyguy Jan 8, 2024
5cf9436
Minor changes (#852)
Pipboyguy Jan 8, 2024
c028b6b
Amend for clarity, spelling etc (#852)
Pipboyguy Jan 8, 2024
da2bd46
Refactor and improve documentation grammar (#814)
Pipboyguy Jan 9, 2024
c1e312e
Refactor BigQuery and tests for clarity and correctness (#852)
Pipboyguy Jan 9, 2024
1109410
IDE inspection disable on function (#852)
Pipboyguy Jan 10, 2024
b2ad1da
Merge (#852
Pipboyguy Jan 10, 2024
4577176
Merge branch 'devel' of github.com:dlt-hub/dlt into 852-cannot-create…
Pipboyguy Jan 11, 2024
74a831f
Add bigquery tests for time, date and integer partitioning (#852)
Pipboyguy Jan 11, 2024
2f1791e
Add control flow to check typing and partition accordingly #852
Pipboyguy Jan 11, 2024
ce082c0
Merge branch 'devel' of github.com:dlt-hub/dlt into 852-cannot-create…
Pipboyguy Jan 11, 2024
8c5d825
Remove partitioning on support from BQ docs #852
Pipboyguy Jan 11, 2024
07bb5f6
Format and fix grammar/spelling in docs #852
Pipboyguy Jan 12, 2024
1f6578f
Merge branch 'devel' of github.com:dlt-hub/dlt into 852-cannot-create…
Pipboyguy Jan 12, 2024
a1fd3df
Merge branches '852-cannot-create-partitioned-table-in-bigquery-desti…
Pipboyguy Jan 12, 2024
c9b8c17
Relocate BigQuery tests to the appropriate folder (#852)
Pipboyguy Jan 15, 2024
3be6413
Relocate partitioning tests to the table builder tests (#852)
Pipboyguy Jan 15, 2024
b24b435
Merge branch 'devel' of github.com:dlt-hub/dlt into 852-cannot-create…
Pipboyguy Jan 15, 2024
f3bd0d5
Merge
Pipboyguy Jan 15, 2024
1130b9a
Remove
Pipboyguy Jan 15, 2024
358806f
Update BigQuery documentation and comments #852
Pipboyguy Jan 15, 2024
92221a6
Amend bq docs #852
Pipboyguy Jan 15, 2024
a1ad4b4
Merge branch 'devel' of github.com:dlt-hub/dlt into 852-cannot-create…
Pipboyguy Jan 16, 2024
2847d27
Add negative partition tests #852
Pipboyguy Jan 17, 2024
be60ecf
Add init file #852
Pipboyguy Jan 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dlt/destinations/impl/bigquery/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Loader account setup

1. Create new services account, add private key to it and download the `services.json` file
1. Create a new services account, add private key to it and download the `services.json` file
2. Make sure that this newly created account has access to BigQuery API
3. You must add following roles to the account above: `BigQuery Data Editor`, `BigQuey Job User` and `BigQuery Read Session User` (storage API)
3. You must add the following roles to the account above: `BigQuery Data Editor`, `BigQuey Job User` and `BigQuery Read Session User` (storage API)
4. IAM to add roles is here https://console.cloud.google.com/iam-admin/iam?project=chat-analytics-rasa-ci
147 changes: 76 additions & 71 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import os
from pathlib import Path
from typing import ClassVar, Dict, Optional, Sequence, Tuple, List, cast, Type, Any
from typing import ClassVar, Optional, Sequence, Tuple, List, cast, Any

import google.cloud.bigquery as bigquery # noqa: I250
from google.cloud import exceptions as gcp_exceptions
from google.api_core import exceptions as api_core_exceptions
from google.cloud import exceptions as gcp_exceptions

from dlt.common import json, logger
from dlt.common.destination import DestinationCapabilitiesContext
Expand All @@ -14,30 +15,26 @@
LoadJob,
SupportsStagingDestination,
)
from dlt.common.data_types import TDataType
from dlt.common.storages.file_storage import FileStorage
from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns
from dlt.common.schema.typing import TTableSchema, TColumnType, TTableFormat
from dlt.common.schema.exceptions import UnknownTableException

from dlt.destinations.job_client_impl import SqlJobClientWithStaging
from dlt.common.schema.typing import TTableSchema, TColumnType, TTableFormat
from dlt.common.schema.utils import table_schema_has_type
from dlt.common.storages.file_storage import FileStorage
from dlt.destinations.exceptions import (
DestinationSchemaWillNotUpdate,
DestinationTransientException,
LoadJobNotExistsException,
LoadJobTerminalException,
)

from dlt.destinations.impl.bigquery import capabilities
from dlt.destinations.impl.bigquery.configuration import BigQueryClientConfiguration
from dlt.destinations.impl.bigquery.sql_client import BigQuerySqlClient, BQ_TERMINAL_REASONS
from dlt.destinations.sql_jobs import SqlMergeJob, SqlStagingCopyJob, SqlJobParams
from dlt.destinations.job_client_impl import SqlJobClientWithStaging
from dlt.destinations.job_impl import NewReferenceJob
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.sql_jobs import SqlMergeJob, SqlStagingCopyJob, SqlJobParams
from dlt.destinations.type_mapping import TypeMapper

from dlt.common.schema.utils import table_schema_has_type


class BigQueryTypeMapper(TypeMapper):
sct_to_unbound_dbt = {
Expand All @@ -49,7 +46,7 @@ class BigQueryTypeMapper(TypeMapper):
"timestamp": "TIMESTAMP",
"bigint": "INTEGER",
"binary": "BYTES",
"wei": "BIGNUMERIC", # non parametrized should hold wei values
Pipboyguy marked this conversation as resolved.
Show resolved Hide resolved
"wei": "BIGNUMERIC", # non parametrised should hold wei values
"time": "TIME",
}

Expand All @@ -73,12 +70,12 @@ class BigQueryTypeMapper(TypeMapper):
"TIME": "time",
}

# noinspection PyTypeChecker,PydanticTypeChecker
def from_db_type(
self, db_type: str, precision: Optional[int], scale: Optional[int]
) -> TColumnType:
if db_type == "BIGNUMERIC":
if precision is None: # biggest numeric possible
return dict(data_type="wei")
if db_type == "BIGNUMERIC" and precision is None:
return dict(data_type="wei")
return super().from_db_type(db_type, precision, scale)


Expand All @@ -96,29 +93,24 @@ def __init__(
super().__init__(file_name)

def state(self) -> TLoadJobState:
# check server if done
done = self.bq_load_job.done(retry=self.default_retry, timeout=self.http_timeout)
if done:
# rows processed
if self.bq_load_job.output_rows is not None and self.bq_load_job.error_result is None:
return "completed"
else:
reason = self.bq_load_job.error_result.get("reason")
if reason in BQ_TERMINAL_REASONS:
# the job permanently failed for the reason above
return "failed"
elif reason in ["internalError"]:
logger.warning(
f"Got reason {reason} for job {self.file_name}, job considered still"
f" running. ({self.bq_load_job.error_result})"
)
# status of the job could not be obtained, job still running
return "running"
else:
# retry on all other reasons, including `backendError` which requires retry when the job is done
return "retry"
else:
if not self.bq_load_job.done(retry=self.default_retry, timeout=self.http_timeout):
return "running"
if self.bq_load_job.output_rows is not None and self.bq_load_job.error_result is None:
return "completed"
reason = self.bq_load_job.error_result.get("reason")
if reason in BQ_TERMINAL_REASONS:
# the job permanently failed for the reason above
return "failed"
elif reason in ["internalError"]:
logger.warning(
f"Got reason {reason} for job {self.file_name}, job considered still"
f" running. ({self.bq_load_job.error_result})"
)
# the status of the job could not be obtained, job still running
return "running"
else:
# retry on all other reasons, including `backendError` which requires retry when the job is done
return "retry"

def bigquery_job_id(self) -> str:
return BigQueryLoadJob.get_job_id_from_file_path(super().file_name())
Expand Down Expand Up @@ -149,13 +141,11 @@ def gen_key_table_clauses(
key_clauses: Sequence[str],
for_delete: bool,
) -> List[str]:
# generate several clauses: BigQuery does not support OR nor unions
sql: List[str] = []
for clause in key_clauses:
sql.append(
f"FROM {root_table_name} AS d WHERE EXISTS (SELECT 1 FROM"
f" {staging_root_table_name} AS s WHERE {clause.format(d='d', s='s')})"
)
sql: List[str] = [
f"FROM {root_table_name} AS d WHERE EXISTS (SELECT 1 FROM {staging_root_table_name} AS"
f" s WHERE {clause.format(d='d', s='s')})"
for clause in key_clauses
]
return sql


Expand All @@ -172,10 +162,12 @@ def generate_sql(
with sql_client.with_staging_dataset(staging=True):
staging_table_name = sql_client.make_qualified_table_name(table["name"])
table_name = sql_client.make_qualified_table_name(table["name"])
# drop destination table
sql.append(f"DROP TABLE IF EXISTS {table_name};")
# recreate destination table with data cloned from staging table
sql.append(f"CREATE TABLE {table_name} CLONE {staging_table_name};")
sql.extend(
(
f"DROP TABLE IF EXISTS {table_name};",
f"CREATE TABLE {table_name} CLONE {staging_table_name};",
)
)
return sql


Expand Down Expand Up @@ -208,7 +200,8 @@ def _create_replace_followup_jobs(
def restore_file_load(self, file_path: str) -> LoadJob:
"""Returns a completed SqlLoadJob or restored BigQueryLoadJob

See base class for details on SqlLoadJob. BigQueryLoadJob is restored with job id derived from `file_path`
See base class for details on SqlLoadJob.
BigQueryLoadJob is restored with a job id derived from `file_path`

Args:
file_path (str): a path to a job file
Expand All @@ -228,11 +221,13 @@ def restore_file_load(self, file_path: str) -> LoadJob:
except api_core_exceptions.GoogleAPICallError as gace:
reason = BigQuerySqlClient._get_reason_from_errors(gace)
if reason == "notFound":
raise LoadJobNotExistsException(file_path)
raise LoadJobNotExistsException(file_path) from gace
elif reason in BQ_TERMINAL_REASONS:
raise LoadJobTerminalException(file_path, f"The server reason was: {reason}")
raise LoadJobTerminalException(
file_path, f"The server reason was: {reason}"
) from gace
else:
raise DestinationTransientException(gace)
raise DestinationTransientException(gace) from gace
return job

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
Expand All @@ -250,15 +245,17 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
reason = BigQuerySqlClient._get_reason_from_errors(gace)
if reason == "notFound":
# google.api_core.exceptions.NotFound: 404 - table not found
raise UnknownTableException(table["name"])
raise UnknownTableException(table["name"]) from gace
elif reason == "duplicate":
# google.api_core.exceptions.Conflict: 409 PUT - already exists
return self.restore_file_load(file_path)
elif reason in BQ_TERMINAL_REASONS:
# google.api_core.exceptions.BadRequest - will not be processed ie bad job name
raise LoadJobTerminalException(file_path, f"The server reason was: {reason}")
raise LoadJobTerminalException(
file_path, f"The server reason was: {reason}"
) from gace
else:
raise DestinationTransientException(gace)
raise DestinationTransientException(gace) from gace
return job

def _get_table_update_sql(
Expand All @@ -274,23 +271,31 @@ def _get_table_update_sql(
cluster_list = [
self.capabilities.escape_identifier(c["name"]) for c in new_columns if c.get("cluster")
]
partition_list = [
self.capabilities.escape_identifier(c["name"])
for c in new_columns
if c.get("partition")
]

# partition by must be added first
if len(partition_list) > 0:
if partition_list := [c for c in new_columns if c.get("partition")]:
if len(partition_list) > 1:
col_names = [self.capabilities.escape_identifier(c["name"]) for c in partition_list]
raise DestinationSchemaWillNotUpdate(
canonical_name, partition_list, "Partition requested for more than one column"
canonical_name, col_names, "Partition requested for more than one column"
)
else:
sql[0] = sql[0] + f"\nPARTITION BY DATE({partition_list[0]})"
if len(cluster_list) > 0:
elif (c := partition_list[0])["data_type"] == "date":
sql[0] = f"{sql[0]}\nPARTITION BY {self.capabilities.escape_identifier(c['name'])}"
elif (c := partition_list[0])["data_type"] == "timestamp":
sql[0] = (
f"{sql[0]}\nPARTITION BY DATE({self.capabilities.escape_identifier(c['name'])})"
)
# BigQuery supports partitioning only when bigint represents a UNIX timestamp.
# This is due to the bounds requirement of GENERATE_ARRAY function for partitioning.
# The 10,000 partitions limit makes it infeasible to cover the entire `bigint` range.
# The array bounds, with daily partitions (86400 seconds in a day), are somewhat arbitrarily chosen.
# See: https://dlthub.com/devel/dlt-ecosystem/destinations/bigquery#supported-column-hints
elif (c := partition_list[0])["data_type"] == "bigint":
sql[0] = (
f"{sql[0]}\nPARTITION BY"
f" RANGE_BUCKET({self.capabilities.escape_identifier(c['name'])},"
" GENERATE_ARRAY(-172800000, 691200000, 86400))"
)
if cluster_list:
sql[0] = sql[0] + "\nCLUSTER BY " + ",".join(cluster_list)

return sql

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
Expand Down Expand Up @@ -329,14 +334,14 @@ def _create_load_job(self, table: TTableSchema, file_path: str) -> bigquery.Load
# append to table for merge loads (append to stage) and regular appends
table_name = table["name"]

# determine wether we load from local or uri
# determine whether we load from local or uri
bucket_path = None
ext: str = os.path.splitext(file_path)[1][1:]
if NewReferenceJob.is_reference_job(file_path):
bucket_path = NewReferenceJob.resolve_reference(file_path)
ext = os.path.splitext(bucket_path)[1][1:]

# choose correct source format
# choose a correct source format
source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
decimal_target_types: List[str] = None
if ext == "parquet":
Expand All @@ -347,7 +352,7 @@ def _create_load_job(self, table: TTableSchema, file_path: str) -> bigquery.Load
"Bigquery cannot load into JSON data type from parquet. Use jsonl instead.",
)
source_format = bigquery.SourceFormat.PARQUET
# parquet needs NUMERIC type autodetection
# parquet needs NUMERIC type auto-detection
decimal_target_types = ["NUMERIC", "BIGNUMERIC"]

job_id = BigQueryLoadJob.get_job_id_from_file_path(file_path)
Expand Down
16 changes: 14 additions & 2 deletions dlt/destinations/impl/bigquery/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,17 @@ def __init__(
file_upload_timeout: float = 30 * 60.0,
retry_deadline: float = 60.0,
destination_name: str = None,
environment: str = None,
) -> None: ...
environment: str = None
) -> None:
super().__init__(
credentials=credentials,
dataset_name=dataset_name,
default_schema_name=default_schema_name,
destination_name=destination_name,
environment=environment,
)
self.retry_deadline = retry_deadline
self.file_upload_timeout = file_upload_timeout
self.http_timeout = http_timeout
self.location = location
...
1 change: 1 addition & 0 deletions dlt/destinations/impl/bigquery/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dlt.destinations.impl.bigquery.bigquery import BigQueryClient


# noinspection PyPep8Naming
class bigquery(Destination[BigQueryClientConfiguration, "BigQueryClient"]):
spec = BigQueryClientConfiguration

Expand Down
Loading