Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add streaming inserts support #1123

Merged
merged 22 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 50 additions & 30 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import functools
import os
from pathlib import Path
from typing import ClassVar, Optional, Sequence, Tuple, List, cast, Dict

import google.cloud.bigquery as bigquery # noqa: I250
from google.api_core import exceptions as api_core_exceptions
from google.cloud import exceptions as gcp_exceptions
from google.cloud.bigquery.table import _table_arg_to_table_ref

from dlt.common import json, logger
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.destinations.impl.destination.destination import DestinationJsonlLoadJob
from dlt.common.destination.reference import (
FollowupJob,
NewLoadJob,
Expand Down Expand Up @@ -43,6 +46,7 @@
from dlt.destinations.job_impl import NewReferenceJob
from dlt.destinations.sql_jobs import SqlMergeJob
from dlt.destinations.type_mapping import TypeMapper
from dlt.pipeline.current import destination_state


class BigQueryTypeMapper(TypeMapper):
Expand Down Expand Up @@ -217,33 +221,44 @@ def restore_file_load(self, file_path: str) -> LoadJob:
return job

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
job = super().start_file_load(table, file_path, load_id)
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved

if not job:
try:
job = BigQueryLoadJob(
FileStorage.get_file_name_from_file_path(file_path),
self._create_load_job(table, file_path),
self.config.http_timeout,
self.config.retry_deadline,
)
except api_core_exceptions.GoogleAPICallError as gace:
reason = BigQuerySqlClient._get_reason_from_errors(gace)
if reason == "notFound":
# google.api_core.exceptions.NotFound: 404 – table not found
raise UnknownTableException(table["name"]) from gace
elif (
reason == "duplicate"
): # google.api_core.exceptions.Conflict: 409 PUT – already exists
return self.restore_file_load(file_path)
elif reason in BQ_TERMINAL_REASONS:
# google.api_core.exceptions.BadRequest - will not be processed ie bad job name
raise LoadJobTerminalException(
file_path, f"The server reason was: {reason}"
) from gace
else:
raise DestinationTransientException(gace) from gace
return job
if self.config.loading_api == "streaming":
job = DestinationJsonlLoadJob(
table,
file_path,
self.config,
self.schema,
destination_state(),
functools.partial(streaming_load, self.sql_client),
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
[],
)
else:
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
job = super().start_file_load(table, file_path, load_id)

if not job:
try:
job = BigQueryLoadJob(
FileStorage.get_file_name_from_file_path(file_path),
self._create_load_job(table, file_path),
self.config.http_timeout,
self.config.retry_deadline,
)
except api_core_exceptions.GoogleAPICallError as gace:
reason = BigQuerySqlClient._get_reason_from_errors(gace)
if reason == "notFound":
# google.api_core.exceptions.NotFound: 404 – table not found
raise UnknownTableException(table["name"]) from gace
elif (
reason == "duplicate"
): # google.api_core.exceptions.Conflict: 409 PUT – already exists
return self.restore_file_load(file_path)
elif reason in BQ_TERMINAL_REASONS:
# google.api_core.exceptions.BadRequest - will not be processed ie bad job name
raise LoadJobTerminalException(
file_path, f"The server reason was: {reason}"
) from gace
else:
raise DestinationTransientException(gace) from gace
return job

def _get_table_update_sql(
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
Expand Down Expand Up @@ -328,9 +343,7 @@ def prepare_load_table(

def _get_column_def_sql(self, column: TColumnSchema, table_format: TTableFormat = None) -> str:
name = self.capabilities.escape_identifier(column["name"])
column_def_sql = (
f"{name} {self.type_mapper.to_db_type(column, table_format)} {self._gen_not_null(column.get('nullable', True))}"
)
column_def_sql = f"{name} {self.type_mapper.to_db_type(column, table_format)} {self._gen_not_null(column.get('nullable', True))}"
if column.get(ROUND_HALF_EVEN_HINT, False):
column_def_sql += " OPTIONS (rounding_mode='ROUND_HALF_EVEN')"
if column.get(ROUND_HALF_AWAY_FROM_ZERO_HINT, False):
Expand Down Expand Up @@ -425,3 +438,10 @@ def _from_db_type(
self, bq_t: str, precision: Optional[int], scale: Optional[int]
) -> TColumnType:
return self.type_mapper.from_db_type(bq_t, precision, scale)


def streaming_load(sql_client, items, table):
full_name = sql_client.make_qualified_table_name(table["name"], escape=False)

bq_client = sql_client._client
bq_client.insert_rows_json(full_name, items)
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions dlt/destinations/impl/bigquery/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class BigQueryClientConfiguration(DestinationClientDwhWithStagingConfiguration):
retry_deadline: float = (
60.0 # how long to retry the operation in case of error, the backoff 60 s.
)
loading_api: str = "default"
batch_size: int = 0
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved

__config_gen_annotations__: ClassVar[List[str]] = ["location"]

Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/destination/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ def run(self, start_index: int) -> Iterable[TDataItems]:
# stream items
with FileStorage.open_zipsafe_ro(self._file_path) as f:
encoded_json = json.typed_loads(f.read())
if isinstance(encoded_json, dict):
encoded_json = [encoded_json]
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved

for item in encoded_json:
# find correct start position
Expand Down
13 changes: 13 additions & 0 deletions tests/load/pipeline/test_bigquery.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import pytest

import dlt
from dlt.common import Decimal
from dlt.common.destination import Destination
from dlt.common.schema import Schema
from dlt.common.schema.typing import TTableSchema
from dlt.common.typing import TDataItems
from google.cloud import bigquery

from tests.pipeline.utils import assert_load_info
from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration
Expand Down Expand Up @@ -36,3 +42,10 @@ def test_bigquery_numeric_types(destination_config: DestinationTestConfiguration
row = q.fetchone()
assert row[0] == data[0]["col_big_numeric"]
assert row[1] == data[0]["col_numeric"]


def test_bigquery_streaming_insert():
pipe = dlt.pipeline(destination="bigquery")
pack = pipe.run([{"field": 1}, {"field": 2}], table_name="test_streaming_items")

assert_load_info(pack)
Loading