diff --git a/tests/load/bigquery/test_bigquery_streaming_insert.py b/tests/load/bigquery/test_bigquery_streaming_insert.py new file mode 100644 index 0000000000..ef3bef9b34 --- /dev/null +++ b/tests/load/bigquery/test_bigquery_streaming_insert.py @@ -0,0 +1,33 @@ +import dlt +from dlt.destinations.impl.bigquery.bigquery_adapter import bigquery_adapter +from tests.pipeline.utils import assert_load_info + + +def test_bigquery_streaming_insert(): + pipe = dlt.pipeline(destination="bigquery") + pack = pipe.run([{"field1": 1, "field2": 2}], table_name="test_streaming_items") + + assert_load_info(pack) + + with pipe.sql_client() as client: + with client.execute_query("SELECT * FROM test_streaming_items;") as cursor: + res = cursor.fetchall() + assert tuple(res[0])[:2] == (1, 2) + + +def test_bigquery_adapter_streaming_insert(): + @dlt.resource + def test_resource(): + yield {"field1": 1, "field2": 2} + + bigquery_adapter(test_resource, insert_api="streaming") + + pipe = dlt.pipeline(destination="bigquery") + pack = pipe.run(test_resource, table_name="test_streaming_items") + + assert_load_info(pack) + + with pipe.sql_client() as client: + with client.execute_query("SELECT * FROM test_streaming_items;") as cursor: + res = cursor.fetchall() + assert tuple(res[0])[:2] == (1, 2) diff --git a/tests/load/pipeline/test_bigquery.py b/tests/load/pipeline/test_bigquery.py index d2ccc7ef2b..711d45fb1f 100644 --- a/tests/load/pipeline/test_bigquery.py +++ b/tests/load/pipeline/test_bigquery.py @@ -1,8 +1,6 @@ import pytest -import dlt from dlt.common import Decimal -from dlt.destinations.impl.bigquery.bigquery_adapter import bigquery_adapter from tests.pipeline.utils import assert_load_info from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration @@ -38,33 +36,3 @@ def test_bigquery_numeric_types(destination_config: DestinationTestConfiguration row = q.fetchone() assert row[0] == data[0]["col_big_numeric"] assert row[1] == data[0]["col_numeric"] - - -def test_bigquery_streaming_insert(): - pipe = dlt.pipeline(destination="bigquery") - pack = pipe.run([{"field1": 1, "field2": 2}], table_name="test_streaming_items") - - assert_load_info(pack) - - with pipe.sql_client() as client: - with client.execute_query("SELECT * FROM test_streaming_items;") as cursor: - res = cursor.fetchall() - assert tuple(res[0])[:2] == (1, 2) - - -def test_bigquery_adapter_streaming_insert(): - @dlt.resource - def test_resource(): - yield {"field1": 1, "field2": 2} - - bigquery_adapter(test_resource, insert_api="streaming") - - pipe = dlt.pipeline(destination="bigquery") - pack = pipe.run(test_resource, table_name="test_streaming_items") - - assert_load_info(pack) - - with pipe.sql_client() as client: - with client.execute_query("SELECT * FROM test_streaming_items;") as cursor: - res = cursor.fetchall() - assert tuple(res[0])[:2] == (1, 2)