From 2d9b1c79aa76162222ed2d5e18299e6b705c8329 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 2 Apr 2024 14:54:25 +0400 Subject: [PATCH] fixes --- dlt/destinations/impl/bigquery/bigquery.py | 10 ++++- .../impl/bigquery/bigquery_adapter.py | 14 ++++++- .../test_bigquery_streaming_insert.py | 42 +++++++++++++++---- 3 files changed, 57 insertions(+), 9 deletions(-) diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index f1af10e7ac..279917d3a0 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -29,6 +29,7 @@ from dlt.destinations.sql_client import SqlClientBase from dlt.destinations.exceptions import ( DestinationSchemaWillNotUpdate, + DestinationTerminalException, DestinationTransientException, LoadJobNotExistsException, LoadJobTerminalException, @@ -229,6 +230,13 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> insert_api = table.get("x-insert-api", "default") try: if insert_api == "streaming": + if table["write_disposition"] != "append": + raise DestinationTerminalException( + ( + "BigQuery streaming insert can only be used with `append` write_disposition, while " + f'the given resource has `{table["write_disposition"]}`.' + ) + ) if file_path.endswith(".jsonl"): job_cls = DestinationJsonlLoadJob elif file_path.endswith(".parquet"): @@ -479,7 +487,7 @@ def _should_retry(exc: api_core_exceptions.GoogleAPICallError) -> bool: bool: True if the exception is retryable, False otherwise. """ reason = exc.errors[0]["reason"] - return reason in (list(_RETRYABLE_REASONS) + ["notFound"]) + return reason in _RETRYABLE_REASONS full_name = sql_client.make_qualified_table_name(table["name"], escape=False) diff --git a/dlt/destinations/impl/bigquery/bigquery_adapter.py b/dlt/destinations/impl/bigquery/bigquery_adapter.py index bc7e3be5fb..8943b0da79 100644 --- a/dlt/destinations/impl/bigquery/bigquery_adapter.py +++ b/dlt/destinations/impl/bigquery/bigquery_adapter.py @@ -57,6 +57,11 @@ def bigquery_adapter( table_description (str, optional): A description for the BigQuery table. table_expiration_datetime (str, optional): String representing the datetime when the BigQuery table expires. This is always interpreted as UTC, BigQuery's default. + insert_api (Optional[Literal["streaming", "default"]]): The API to use for inserting data into BigQuery. + If "default" is chosen, the original SQL query mechanism is used. + If "streaming" is chosen, the streaming API (https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery) + is used. + NOTE: due to BigQuery features, streaming insert is only available for `append` write_disposition. Returns: A `DltResource` object that is ready to be loaded into BigQuery. @@ -135,7 +140,7 @@ def bigquery_adapter( if not isinstance(table_expiration_datetime, str): raise ValueError( "`table_expiration_datetime` must be string representing the datetime when the" - " BigQuery table." + " BigQuery table will be deleted." ) try: parsed_table_expiration_datetime = parser.parse(table_expiration_datetime).replace( @@ -146,6 +151,13 @@ def bigquery_adapter( raise ValueError(f"{table_expiration_datetime} could not be parsed!") from e if insert_api is not None: + if insert_api == "streaming" and data.write_disposition != "append": + raise ValueError( + ( + "BigQuery streaming insert can only be used with `append` write_disposition, while " + f"the given resource has `{data.write_disposition}`." + ) + ) additional_table_hints |= {"x-insert-api": insert_api} # type: ignore[operator] if column_hints or additional_table_hints: diff --git a/tests/load/bigquery/test_bigquery_streaming_insert.py b/tests/load/bigquery/test_bigquery_streaming_insert.py index 3ec385d670..c80f6ed65a 100644 --- a/tests/load/bigquery/test_bigquery_streaming_insert.py +++ b/tests/load/bigquery/test_bigquery_streaming_insert.py @@ -1,5 +1,6 @@ +import pytest + import dlt -from dlt.destinations.exceptions import DatabaseTransientException from dlt.destinations.impl.bigquery.bigquery_adapter import bigquery_adapter from tests.pipeline.utils import assert_load_info @@ -11,25 +12,49 @@ def test_resource(): bigquery_adapter(test_resource, insert_api="streaming") - pipe = dlt.pipeline(pipeline_name="insert_test", destination="bigquery") - pack = pipe.run(test_resource, table_name="test_streaming_items") + pipe = dlt.pipeline(pipeline_name="insert_test", destination="bigquery", full_refresh=True) + pack = pipe.run(test_resource, table_name="test_streaming_items44") assert_load_info(pack) with pipe.sql_client() as client: - with client.execute_query("SELECT * FROM test_streaming_items;") as cursor: + with client.execute_query("SELECT * FROM test_streaming_items44;") as cursor: res = cursor.fetchall() assert tuple(res[0])[:2] == (1, 2) +def test_bigquery_adapter_streaming_wrong_disposition(): + @dlt.resource(write_disposition="merge") + def test_resource(): + yield {"field1": 1, "field2": 2} + + with pytest.raises(ValueError): + bigquery_adapter(test_resource, insert_api="streaming") + + +def test_bigquery_streaming_wrong_disposition(): + @dlt.resource(write_disposition="merge") + def test_resource(): + yield {"field1": 1, "field2": 2} + + test_resource.apply_hints(additional_table_hints={"x-insert-api": "streaming"}) + + pipe = dlt.pipeline(pipeline_name="insert_test", destination="bigquery") + info = pipe.run(test_resource) + assert ( + """BigQuery streaming insert can only be used with `append`""" + """ write_disposition, while the given resource has `merge`.""" + ) in info.asdict()["load_packages"][0]["jobs"][0]["failed_message"] + + def test_bigquery_streaming_nested_data(): @dlt.resource def test_resource(): - yield {"field1": {"nested_field": 1}, "field2": {"nested_field": 2}} + yield {"field1": {"nested_field": 1}, "field2": [{"nested_field": 2}]} bigquery_adapter(test_resource, insert_api="streaming") - pipe = dlt.pipeline(pipeline_name="insert_test", destination="bigquery") + pipe = dlt.pipeline(pipeline_name="insert_test", destination="bigquery", full_refresh=True) pack = pipe.run(test_resource, table_name="test_streaming_items") assert_load_info(pack) @@ -38,4 +63,7 @@ def test_resource(): with client.execute_query("SELECT * FROM test_streaming_items;") as cursor: res = cursor.fetchall() assert res[0]["field1__nested_field"] == 1 # type: ignore - assert res[0]["field2__nested_field"] == 2 # type: ignore + + with client.execute_query("SELECT * FROM test_streaming_items__field2;") as cursor: + res = cursor.fetchall() + assert res[0]["nested_field"] == 2 # type: ignore