From 4d84a027197e1f486fd53644699f81dd7c4671c4 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 7 Feb 2024 09:55:38 +0100 Subject: [PATCH] Move tests to pipeline extra --- tests/load/pipeline/test_pydantic_models.py | 161 -------------------- tests/pipeline/test_pipeline_extra.py | 154 +++++++++++++++++++ 2 files changed, 154 insertions(+), 161 deletions(-) delete mode 100644 tests/load/pipeline/test_pydantic_models.py diff --git a/tests/load/pipeline/test_pydantic_models.py b/tests/load/pipeline/test_pydantic_models.py deleted file mode 100644 index 854ca71b4d..0000000000 --- a/tests/load/pipeline/test_pydantic_models.py +++ /dev/null @@ -1,161 +0,0 @@ -import typing as t - -import dlt -import pytest - -from pydantic import BaseModel -from dlt.common.libs.pydantic import DltConfig - -from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration - - -class Child(BaseModel): - child_attribute: str - optional_child_attribute: t.Optional[str] = None - - -@pytest.mark.parametrize( - "destination_config", - destinations_configs(default_sql_configs=True, subset=["duckdb"]), - ids=lambda x: x.name, -) -def test_flattens_model_when_skip_complex_types_is_set( - destination_config: DestinationTestConfiguration, -) -> None: - class Parent(BaseModel): - child: Child - optional_parent_attribute: t.Optional[str] = None - dlt_config: t.ClassVar[DltConfig] = {"skip_complex_types": True} - - example_data = { - "optional_parent_attribute": None, - "child": { - "child_attribute": "any string", - "optional_child_attribute": None, - }, - } - - @dlt.resource - def res(): - yield [example_data] - - @dlt.source(max_table_nesting=1) - def src(): - yield res() - - p = destination_config.setup_pipeline("example", full_refresh=True) - p.run(src(), table_name="items", columns=Parent) - - with p.sql_client() as client: - with client.execute_query("SELECT * FROM items") as cursor: - loaded_values = { - col[0]: val - for val, col in zip(cursor.fetchall()[0], cursor.description) - if col[0] not in ("_dlt_id", "_dlt_load_id") - } - assert loaded_values == { - "child__child_attribute": "any string", - "child__optional_child_attribute": None, - "optional_parent_attribute": None, - } - - keys = p.default_schema.tables["items"]["columns"].keys() - columns = p.default_schema.tables["items"]["columns"] - - assert keys == { - "child__child_attribute", - "child__optional_child_attribute", - "optional_parent_attribute", - "_dlt_load_id", - "_dlt_id", - } - - assert columns["child__child_attribute"] == { - "name": "child__child_attribute", - "data_type": "text", - "nullable": False, - } - - assert columns["child__optional_child_attribute"] == { - "name": "child__optional_child_attribute", - "data_type": "text", - "nullable": True, - } - - assert columns["optional_parent_attribute"] == { - "name": "optional_parent_attribute", - "data_type": "text", - "nullable": True, - } - - -@pytest.mark.parametrize( - "destination_config", - destinations_configs(default_sql_configs=True, subset=["duckdb"]), - ids=lambda x: x.name, -) -def test_flattens_model_when_skip_complex_types_is_not_set(destination_config: DestinationTestConfiguration,): - class Parent(BaseModel): - child: Child - optional_parent_attribute: t.Optional[str] = None - data_dictionary: t.Dict[str, t.Any] = None - dlt_config: t.ClassVar[DltConfig] = {"skip_complex_types": False} - - example_data = { - "optional_parent_attribute": None, - "data_dictionary": { - "child_attribute": "any string", - }, - "child": { - "child_attribute": "any string", - "optional_child_attribute": None, - }, - } - - @dlt.resource - def res(): - yield [example_data] - - @dlt.source(max_table_nesting=1) - def src(): - yield res() - - p = destination_config.setup_pipeline("example", full_refresh=True) - p.run(src(), table_name="items", columns=Parent) - - with p.sql_client() as client: - with client.execute_query("SELECT * FROM items") as cursor: - loaded_values = { - col[0]: val - for val, col in zip(cursor.fetchall()[0], cursor.description) - if col[0] not in ("_dlt_id", "_dlt_load_id") - } - - assert loaded_values == { - "child": '{"child_attribute":"any string","optional_child_attribute":null}', - "optional_parent_attribute": None, - "data_dictionary": '{"child_attribute":"any string"}', - } - - keys = p.default_schema.tables["items"]["columns"].keys() - assert keys == { - "child", - "optional_parent_attribute", - "data_dictionary", - "_dlt_load_id", - "_dlt_id", - } - - columns = p.default_schema.tables["items"]["columns"] - - assert columns["optional_parent_attribute"] == { - "name": "optional_parent_attribute", - "data_type": "text", - "nullable": True, - } - - assert columns["data_dictionary"] == { - "name": "data_dictionary", - "data_type": "complex", - "nullable": False, - } diff --git a/tests/pipeline/test_pipeline_extra.py b/tests/pipeline/test_pipeline_extra.py index dd60002e6c..3dc4a66505 100644 --- a/tests/pipeline/test_pipeline_extra.py +++ b/tests/pipeline/test_pipeline_extra.py @@ -200,3 +200,157 @@ def generic(start=8): pipeline = dlt.pipeline(destination="duckdb") pipeline.run(generic(), loader_file_format=file_format) + + +class Child(BaseModel): + child_attribute: str + optional_child_attribute: Optional[str] = None + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, subset=["duckdb"]), + ids=lambda x: x.name, +) +def test_flattens_model_when_skip_complex_types_is_set( + destination_config: DestinationTestConfiguration, +) -> None: + class Parent(BaseModel): + child: Child + optional_parent_attribute: Optional[str] = None + dlt_config: ClassVar[DltConfig] = {"skip_complex_types": True} + + example_data = { + "optional_parent_attribute": None, + "child": { + "child_attribute": "any string", + "optional_child_attribute": None, + }, + } + + @dlt.resource + def res(): + yield [example_data] + + @dlt.source(max_table_nesting=1) + def src(): + yield res() + + p = destination_config.setup_pipeline("example", full_refresh=True) + p.run(src(), table_name="items", columns=Parent) + + with p.sql_client() as client: + with client.execute_query("SELECT * FROM items") as cursor: + loaded_values = { + col[0]: val + for val, col in zip(cursor.fetchall()[0], cursor.description) + if col[0] not in ("_dlt_id", "_dlt_load_id") + } + assert loaded_values == { + "child__child_attribute": "any string", + "child__optional_child_attribute": None, + "optional_parent_attribute": None, + } + + keys = p.default_schema.tables["items"]["columns"].keys() + columns = p.default_schema.tables["items"]["columns"] + + assert keys == { + "child__child_attribute", + "child__optional_child_attribute", + "optional_parent_attribute", + "_dlt_load_id", + "_dlt_id", + } + + assert columns["child__child_attribute"] == { + "name": "child__child_attribute", + "data_type": "text", + "nullable": False, + } + + assert columns["child__optional_child_attribute"] == { + "name": "child__optional_child_attribute", + "data_type": "text", + "nullable": True, + } + + assert columns["optional_parent_attribute"] == { + "name": "optional_parent_attribute", + "data_type": "text", + "nullable": True, + } + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, subset=["duckdb"]), + ids=lambda x: x.name, +) +def test_flattens_model_when_skip_complex_types_is_not_set( + destination_config: DestinationTestConfiguration, +): + class Parent(BaseModel): + child: Child + optional_parent_attribute: Optional[str] = None + data_dictionary: Dict[str, Any] = None + dlt_config: ClassVar[DltConfig] = {"skip_complex_types": False} + + example_data = { + "optional_parent_attribute": None, + "data_dictionary": { + "child_attribute": "any string", + }, + "child": { + "child_attribute": "any string", + "optional_child_attribute": None, + }, + } + + @dlt.resource + def res(): + yield [example_data] + + @dlt.source(max_table_nesting=1) + def src(): + yield res() + + p = destination_config.setup_pipeline("example", full_refresh=True) + p.run(src(), table_name="items", columns=Parent) + + with p.sql_client() as client: + with client.execute_query("SELECT * FROM items") as cursor: + loaded_values = { + col[0]: val + for val, col in zip(cursor.fetchall()[0], cursor.description) + if col[0] not in ("_dlt_id", "_dlt_load_id") + } + + assert loaded_values == { + "child": '{"child_attribute":"any string","optional_child_attribute":null}', + "optional_parent_attribute": None, + "data_dictionary": '{"child_attribute":"any string"}', + } + + keys = p.default_schema.tables["items"]["columns"].keys() + assert keys == { + "child", + "optional_parent_attribute", + "data_dictionary", + "_dlt_load_id", + "_dlt_id", + } + + columns = p.default_schema.tables["items"]["columns"] + + assert columns["optional_parent_attribute"] == { + "name": "optional_parent_attribute", + "data_type": "text", + "nullable": True, + } + + assert columns["data_dictionary"] == { + "name": "data_dictionary", + "data_type": "complex", + "nullable": False, + }