Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaFaer committed Apr 2, 2024
1 parent 23e71ae commit 2d9b1c7
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 9 deletions.
10 changes: 9 additions & 1 deletion dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.exceptions import (
DestinationSchemaWillNotUpdate,
DestinationTerminalException,
DestinationTransientException,
LoadJobNotExistsException,
LoadJobTerminalException,
Expand Down Expand Up @@ -229,6 +230,13 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
insert_api = table.get("x-insert-api", "default")
try:
if insert_api == "streaming":
if table["write_disposition"] != "append":
raise DestinationTerminalException(
(
"BigQuery streaming insert can only be used with `append` write_disposition, while "
f'the given resource has `{table["write_disposition"]}`.'
)
)
if file_path.endswith(".jsonl"):
job_cls = DestinationJsonlLoadJob
elif file_path.endswith(".parquet"):
Expand Down Expand Up @@ -479,7 +487,7 @@ def _should_retry(exc: api_core_exceptions.GoogleAPICallError) -> bool:
bool: True if the exception is retryable, False otherwise.
"""
reason = exc.errors[0]["reason"]
return reason in (list(_RETRYABLE_REASONS) + ["notFound"])
return reason in _RETRYABLE_REASONS

full_name = sql_client.make_qualified_table_name(table["name"], escape=False)

Expand Down
14 changes: 13 additions & 1 deletion dlt/destinations/impl/bigquery/bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def bigquery_adapter(
table_description (str, optional): A description for the BigQuery table.
table_expiration_datetime (str, optional): String representing the datetime when the BigQuery table expires.
This is always interpreted as UTC, BigQuery's default.
insert_api (Optional[Literal["streaming", "default"]]): The API to use for inserting data into BigQuery.
If "default" is chosen, the original SQL query mechanism is used.
If "streaming" is chosen, the streaming API (https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery)
is used.
NOTE: due to BigQuery features, streaming insert is only available for `append` write_disposition.
Returns:
A `DltResource` object that is ready to be loaded into BigQuery.
Expand Down Expand Up @@ -135,7 +140,7 @@ def bigquery_adapter(
if not isinstance(table_expiration_datetime, str):
raise ValueError(
"`table_expiration_datetime` must be string representing the datetime when the"
" BigQuery table."
" BigQuery table will be deleted."
)
try:
parsed_table_expiration_datetime = parser.parse(table_expiration_datetime).replace(
Expand All @@ -146,6 +151,13 @@ def bigquery_adapter(
raise ValueError(f"{table_expiration_datetime} could not be parsed!") from e

if insert_api is not None:
if insert_api == "streaming" and data.write_disposition != "append":
raise ValueError(
(
"BigQuery streaming insert can only be used with `append` write_disposition, while "
f"the given resource has `{data.write_disposition}`."
)
)
additional_table_hints |= {"x-insert-api": insert_api} # type: ignore[operator]

if column_hints or additional_table_hints:
Expand Down
42 changes: 35 additions & 7 deletions tests/load/bigquery/test_bigquery_streaming_insert.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest

import dlt
from dlt.destinations.exceptions import DatabaseTransientException
from dlt.destinations.impl.bigquery.bigquery_adapter import bigquery_adapter
from tests.pipeline.utils import assert_load_info

Expand All @@ -11,25 +12,49 @@ def test_resource():

bigquery_adapter(test_resource, insert_api="streaming")

pipe = dlt.pipeline(pipeline_name="insert_test", destination="bigquery")
pack = pipe.run(test_resource, table_name="test_streaming_items")
pipe = dlt.pipeline(pipeline_name="insert_test", destination="bigquery", full_refresh=True)
pack = pipe.run(test_resource, table_name="test_streaming_items44")

assert_load_info(pack)

with pipe.sql_client() as client:
with client.execute_query("SELECT * FROM test_streaming_items;") as cursor:
with client.execute_query("SELECT * FROM test_streaming_items44;") as cursor:
res = cursor.fetchall()
assert tuple(res[0])[:2] == (1, 2)


def test_bigquery_adapter_streaming_wrong_disposition():
@dlt.resource(write_disposition="merge")
def test_resource():
yield {"field1": 1, "field2": 2}

with pytest.raises(ValueError):
bigquery_adapter(test_resource, insert_api="streaming")


def test_bigquery_streaming_wrong_disposition():
@dlt.resource(write_disposition="merge")
def test_resource():
yield {"field1": 1, "field2": 2}

test_resource.apply_hints(additional_table_hints={"x-insert-api": "streaming"})

pipe = dlt.pipeline(pipeline_name="insert_test", destination="bigquery")
info = pipe.run(test_resource)
assert (
"""BigQuery streaming insert can only be used with `append`"""
""" write_disposition, while the given resource has `merge`."""
) in info.asdict()["load_packages"][0]["jobs"][0]["failed_message"]


def test_bigquery_streaming_nested_data():
@dlt.resource
def test_resource():
yield {"field1": {"nested_field": 1}, "field2": {"nested_field": 2}}
yield {"field1": {"nested_field": 1}, "field2": [{"nested_field": 2}]}

bigquery_adapter(test_resource, insert_api="streaming")

pipe = dlt.pipeline(pipeline_name="insert_test", destination="bigquery")
pipe = dlt.pipeline(pipeline_name="insert_test", destination="bigquery", full_refresh=True)
pack = pipe.run(test_resource, table_name="test_streaming_items")

assert_load_info(pack)
Expand All @@ -38,4 +63,7 @@ def test_resource():
with client.execute_query("SELECT * FROM test_streaming_items;") as cursor:
res = cursor.fetchall()
assert res[0]["field1__nested_field"] == 1 # type: ignore
assert res[0]["field2__nested_field"] == 2 # type: ignore

with client.execute_query("SELECT * FROM test_streaming_items__field2;") as cursor:
res = cursor.fetchall()
assert res[0]["nested_field"] == 2 # type: ignore

0 comments on commit 2d9b1c7

Please sign in to comment.