From d81f73aac08b2006b594477dc7b9f150f4d5e43e Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Thu, 28 Sep 2023 23:57:05 +0200 Subject: [PATCH 1/4] reimplements and document duck_case naming convention usage --- dlt/common/normalizers/naming/duck_case.py | 16 ++++--- dlt/common/normalizers/naming/snake_case.py | 16 +++---- dlt/destinations/duckdb/__init__.py | 1 - dlt/destinations/duckdb/sql_client.py | 5 ++- dlt/destinations/motherduck/__init__.py | 1 - .../docs/dlt-ecosystem/destinations/duckdb.md | 21 ++++++++++ .../dlt-ecosystem/destinations/weaviate.md | 2 +- .../normalizers/test_naming_duck_case.py | 16 ++++--- .../normalizers/test_naming_snake_case.py | 26 +++++++----- tests/load/pipeline/test_duckdb.py | 42 +++++++++++++++++++ tests/normalize/test_normalize.py | 15 +++---- tests/pipeline/test_pipeline.py | 2 - 12 files changed, 110 insertions(+), 53 deletions(-) create mode 100644 tests/load/pipeline/test_duckdb.py diff --git a/dlt/common/normalizers/naming/duck_case.py b/dlt/common/normalizers/naming/duck_case.py index 7c59b4daa4..200c0bbdad 100644 --- a/dlt/common/normalizers/naming/duck_case.py +++ b/dlt/common/normalizers/naming/duck_case.py @@ -1,26 +1,24 @@ import re from functools import lru_cache -from dlt.common.normalizers.naming.snake_case import NamingConvention as BaseNamingConvention +from dlt.common.normalizers.naming.snake_case import NamingConvention as SnakeCaseNamingConvention -class NamingConvention(BaseNamingConvention): +class NamingConvention(SnakeCaseNamingConvention): - _RE_NON_ALPHANUMERIC = re.compile(r"[^a-zA-Z\d_+-]+") - _REDUCE_ALPHABET = ("*@|", "xal") - _TR_REDUCE_ALPHABET = str.maketrans(_REDUCE_ALPHABET[0], _REDUCE_ALPHABET[1]) + _CLEANUP_TABLE = str.maketrans("\n\r\"", "___") + _RE_LEADING_DIGITS = None # do not remove leading digits @staticmethod @lru_cache(maxsize=None) def _normalize_identifier(identifier: str, max_length: int) -> str: """Normalizes the identifier according to naming convention represented by this function""" - # all characters that are not letters digits or a few special chars are replaced with underscore - normalized_ident = identifier.translate(NamingConvention._TR_REDUCE_ALPHABET) - normalized_ident = NamingConvention._RE_NON_ALPHANUMERIC.sub("_", normalized_ident) + + normalized_ident = identifier.translate(NamingConvention._CLEANUP_TABLE) # shorten identifier return NamingConvention.shorten_identifier( - NamingConvention._to_snake_case(normalized_ident), + NamingConvention._RE_UNDERSCORES.sub("_", normalized_ident), identifier, max_length ) diff --git a/dlt/common/normalizers/naming/snake_case.py b/dlt/common/normalizers/naming/snake_case.py index 67c9fdd30e..12aa887d6e 100644 --- a/dlt/common/normalizers/naming/snake_case.py +++ b/dlt/common/normalizers/naming/snake_case.py @@ -46,14 +46,14 @@ def _normalize_identifier(identifier: str, max_length: int) -> str: max_length ) - @staticmethod - def _to_snake_case(identifier: str) -> str: + @classmethod + def _to_snake_case(cls, identifier: str) -> str: # then convert to snake case - identifier = NamingConvention._SNAKE_CASE_BREAK_1.sub(r'\1_\2', identifier) - identifier = NamingConvention._SNAKE_CASE_BREAK_2.sub(r'\1_\2', identifier).lower() + identifier = cls._SNAKE_CASE_BREAK_1.sub(r'\1_\2', identifier) + identifier = cls._SNAKE_CASE_BREAK_2.sub(r'\1_\2', identifier).lower() - # leading digits will be prefixed - if NamingConvention._RE_LEADING_DIGITS.match(identifier): + # leading digits will be prefixed (if regex is defined) + if cls._RE_LEADING_DIGITS and cls._RE_LEADING_DIGITS.match(identifier): identifier = "_" + identifier # replace trailing _ with x @@ -61,6 +61,6 @@ def _to_snake_case(identifier: str) -> str: strip_count = len(identifier) - len(stripped_ident) stripped_ident += "x" * strip_count - # identifier = NamingConvention._RE_ENDING_UNDERSCORES.sub("x", identifier) + # identifier = cls._RE_ENDING_UNDERSCORES.sub("x", identifier) # replace consecutive underscores with single one to prevent name clashes with PATH_SEPARATOR - return NamingConvention._RE_UNDERSCORES.sub("_", stripped_ident) \ No newline at end of file + return cls._RE_UNDERSCORES.sub("_", stripped_ident) \ No newline at end of file diff --git a/dlt/destinations/duckdb/__init__.py b/dlt/destinations/duckdb/__init__.py index c3dfd02db7..d9882cc0eb 100644 --- a/dlt/destinations/duckdb/__init__.py +++ b/dlt/destinations/duckdb/__init__.py @@ -28,7 +28,6 @@ def capabilities() -> DestinationCapabilitiesContext: caps.wei_precision = (DEFAULT_NUMERIC_PRECISION, 0) caps.max_identifier_length = 65536 caps.max_column_identifier_length = 65536 - caps.naming_convention = "duck_case" caps.max_query_length = 32 * 1024 * 1024 caps.is_max_query_length_in_bytes = True caps.max_text_data_type_length = 1024 * 1024 * 1024 diff --git a/dlt/destinations/duckdb/sql_client.py b/dlt/destinations/duckdb/sql_client.py index 94f9cb38d2..cd2160f676 100644 --- a/dlt/destinations/duckdb/sql_client.py +++ b/dlt/destinations/duckdb/sql_client.py @@ -135,7 +135,10 @@ def fully_qualified_dataset_name(self, escape: bool = True) -> str: @classmethod def _make_database_exception(cls, ex: Exception) -> Exception: if isinstance(ex, (duckdb.CatalogException)): - raise DatabaseUndefinedRelation(ex) + if "already exists" in str(ex): + raise DatabaseTerminalException(ex) + else: + raise DatabaseUndefinedRelation(ex) elif isinstance(ex, duckdb.InvalidInputException): if "Catalog Error" in str(ex): raise DatabaseUndefinedRelation(ex) diff --git a/dlt/destinations/motherduck/__init__.py b/dlt/destinations/motherduck/__init__.py index 493cd9834b..eae67eaa74 100644 --- a/dlt/destinations/motherduck/__init__.py +++ b/dlt/destinations/motherduck/__init__.py @@ -26,7 +26,6 @@ def capabilities() -> DestinationCapabilitiesContext: caps.wei_precision = (DEFAULT_NUMERIC_PRECISION, 0) caps.max_identifier_length = 65536 caps.max_column_identifier_length = 65536 - caps.naming_convention = "duck_case" caps.max_query_length = 512 * 1024 caps.is_max_query_length_in_bytes = True caps.max_text_data_type_length = 1024 * 1024 * 1024 diff --git a/docs/website/docs/dlt-ecosystem/destinations/duckdb.md b/docs/website/docs/dlt-ecosystem/destinations/duckdb.md index 38fd38d8e8..c4213feda1 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/duckdb.md +++ b/docs/website/docs/dlt-ecosystem/destinations/duckdb.md @@ -29,6 +29,27 @@ All write dispositions are supported ## Data loading `dlt` will load data using large INSERT VALUES statements by default. Loading is multithreaded (20 threads by default). If you are ok with installing `pyarrow` we suggest to switch to `parquet` as file format. Loading is faster (and also multithreaded). +### Names normalization +`dlt` uses standard **snake_case** naming convention to keep identical table and column identifiers across all destinations. If you want to use **duckdb** wide range of characters (ie. emojis) for table and column names, you can switch to **duck_case** naming convention which accepts almost any string as an identifier: +* `\n` `\r` and `" are translated to `_` +* multiple `_` are translated to single `_` + +Switch the naming convention using `config.toml`: +```toml +[schema] +naming="duck_case" +``` + +or via env variable `SCHEMA__NAMING` or directly in code: +```python +dlt.config["schema.naming"] = "duck_case" +``` +:::caution +**duckdb** identifiers are **case insensitive** but display names preserve case. This may create name clashes if for example you load json with +`{"Column": 1, "column": 2}` will map data to a single column. +::: + + ## Supported file formats You can configure the following file formats to load data to duckdb * [insert-values](../file-formats/insert-format.md) is used by default diff --git a/docs/website/docs/dlt-ecosystem/destinations/weaviate.md b/docs/website/docs/dlt-ecosystem/destinations/weaviate.md index 4cac747038..8c626266a4 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/weaviate.md +++ b/docs/website/docs/dlt-ecosystem/destinations/weaviate.md @@ -252,7 +252,7 @@ it will be normalized to: so your best course of action is to clean up the data yourself before loading and use default naming convention. Nevertheless you can configure the alternative in `config.toml`: ```toml [schema] -naming="dlt.destinations.weaviate.naming" +naming="dlt.destinations.weaviate.ci_naming" ``` ## Additional destination options diff --git a/tests/common/normalizers/test_naming_duck_case.py b/tests/common/normalizers/test_naming_duck_case.py index b50ee64581..ed63800ca9 100644 --- a/tests/common/normalizers/test_naming_duck_case.py +++ b/tests/common/normalizers/test_naming_duck_case.py @@ -12,15 +12,13 @@ def naming_unlimited() -> NamingConvention: def test_normalize_identifier(naming_unlimited: NamingConvention) -> None: assert naming_unlimited.normalize_identifier("+1") == "+1" assert naming_unlimited.normalize_identifier("-1") == "-1" + assert naming_unlimited.normalize_identifier("1-1") == "1-1" + assert naming_unlimited.normalize_identifier("šŸ¦šPeacock") == "šŸ¦šPeacock" + assert naming_unlimited.normalize_identifier("šŸ¦ššŸ¦šPeacocks") == "šŸ¦ššŸ¦šPeacocks" + assert naming_unlimited.normalize_identifier("šŸ¦ššŸ¦špeacocks") == "šŸ¦ššŸ¦špeacocks" + # non latin alphabets + assert naming_unlimited.normalize_identifier("ƖlĆ¼beĀµrsą¤ˆą¤‰ą¤Šą¤‹ą¤Œą¤ą¤Žą¤cā‡ØusĒsā›”lƄnder") == "ƖlĆ¼beĀµrsą¤ˆą¤‰ą¤Šą¤‹ą¤Œą¤ą¤Žą¤cā‡ØusĒsā›”lƄnder" def test_alphabet_reduction(naming_unlimited: NamingConvention) -> None: - assert naming_unlimited.normalize_identifier(NamingConvention._REDUCE_ALPHABET[0]) == NamingConvention._REDUCE_ALPHABET[1] - - -def test_duck_snake_case_compat(naming_unlimited: NamingConvention) -> None: - snake_unlimited = SnakeNamingConvention() - # same reduction duck -> snake - assert snake_unlimited.normalize_identifier(NamingConvention._REDUCE_ALPHABET[0]) == NamingConvention._REDUCE_ALPHABET[1] - # but there are differences in the reduction - assert naming_unlimited.normalize_identifier(SnakeNamingConvention._REDUCE_ALPHABET[0]) != snake_unlimited.normalize_identifier(SnakeNamingConvention._REDUCE_ALPHABET[0]) + assert naming_unlimited.normalize_identifier("A\nB\"C\rD") == "A_B_C_D" diff --git a/tests/common/normalizers/test_naming_snake_case.py b/tests/common/normalizers/test_naming_snake_case.py index 976c242930..b51801b6c4 100644 --- a/tests/common/normalizers/test_naming_snake_case.py +++ b/tests/common/normalizers/test_naming_snake_case.py @@ -1,11 +1,14 @@ +from typing import Type import pytest -from dlt.common.normalizers.naming.snake_case import NamingConvention +from dlt.common.normalizers.naming import NamingConvention +from dlt.common.normalizers.naming.snake_case import NamingConvention as SnakeCaseNamingConvention +from dlt.common.normalizers.naming.duck_case import NamingConvention as DuckCaseNamingConvention @pytest.fixture def naming_unlimited() -> NamingConvention: - return NamingConvention() + return SnakeCaseNamingConvention() def test_normalize_identifier(naming_unlimited: NamingConvention) -> None: @@ -33,12 +36,9 @@ def test_normalize_identifier(naming_unlimited: NamingConvention) -> None: assert naming_unlimited.normalize_identifier("+1") == "x1" assert naming_unlimited.normalize_identifier("-1") == "_1" - # non latin alphabets - # assert naming_unlimited.normalize_identifier("ƖlĆ¼beĀµrsą¤ˆą¤‰ą¤Šą¤‹ą¤Œą¤ą¤Žą¤cā‡ØusĒsā›”lƄnder") == "ƶlĆ¼berschusslƤnder" - def test_alphabet_reduction(naming_unlimited: NamingConvention) -> None: - assert naming_unlimited.normalize_identifier(NamingConvention._REDUCE_ALPHABET[0]) == NamingConvention._REDUCE_ALPHABET[1] + assert naming_unlimited.normalize_identifier(SnakeCaseNamingConvention._REDUCE_ALPHABET[0]) == SnakeCaseNamingConvention._REDUCE_ALPHABET[1] def test_normalize_path(naming_unlimited: NamingConvention) -> None: @@ -51,12 +51,14 @@ def test_normalize_path(naming_unlimited: NamingConvention) -> None: def test_normalize_non_alpha_single_underscore() -> None: - assert NamingConvention._RE_NON_ALPHANUMERIC.sub("_", "-=!*") == "_" - assert NamingConvention._RE_NON_ALPHANUMERIC.sub("_", "1-=!0*-") == "1_0_" - assert NamingConvention._RE_NON_ALPHANUMERIC.sub("_", "1-=!_0*-") == "1__0_" + assert SnakeCaseNamingConvention._RE_NON_ALPHANUMERIC.sub("_", "-=!*") == "_" + assert SnakeCaseNamingConvention._RE_NON_ALPHANUMERIC.sub("_", "1-=!0*-") == "1_0_" + assert SnakeCaseNamingConvention._RE_NON_ALPHANUMERIC.sub("_", "1-=!_0*-") == "1__0_" -def test_normalize_break_path(naming_unlimited: NamingConvention) -> None: +@pytest.mark.parametrize("convention", (SnakeCaseNamingConvention, DuckCaseNamingConvention)) +def test_normalize_break_path(convention: Type[NamingConvention]) -> None: + naming_unlimited = convention() assert naming_unlimited.break_path("A__B__C") == ["A", "B", "C"] # what if path has _a and _b which valid normalized idents assert naming_unlimited.break_path("_a___b__C___D") == ["_a", "_b", "C", "_D"] @@ -66,7 +68,9 @@ def test_normalize_break_path(naming_unlimited: NamingConvention) -> None: assert naming_unlimited.break_path("_a__ \t\r__b") == ["_a", "b"] -def test_normalize_make_path(naming_unlimited: NamingConvention) -> None: +@pytest.mark.parametrize("convention", (SnakeCaseNamingConvention, DuckCaseNamingConvention)) +def test_normalize_make_path(convention: Type[NamingConvention]) -> None: + naming_unlimited = convention() assert naming_unlimited.make_path("A", "B") == "A__B" assert naming_unlimited.make_path("_A", "_B") == "_A___B" assert naming_unlimited.make_path("_A", "", "_B") == "_A___B" diff --git a/tests/load/pipeline/test_duckdb.py b/tests/load/pipeline/test_duckdb.py new file mode 100644 index 0000000000..c71ac37a81 --- /dev/null +++ b/tests/load/pipeline/test_duckdb.py @@ -0,0 +1,42 @@ +import pytest +import os + +import dlt +from dlt.destinations.exceptions import DatabaseTerminalException +from dlt.pipeline.exceptions import PipelineStepFailed + +from tests.pipeline.utils import airtable_emojis +from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration, load_table_counts + + +@pytest.mark.parametrize("destination_config", destinations_configs(default_sql_configs=True, subset=["duckdb"]), ids=lambda x: x.name) +def test_duck_case_names(destination_config: DestinationTestConfiguration) -> None: + # we want to have nice tables + # dlt.config["schema.naming"] = "duck_case" + os.environ["SCHEMA__NAMING"] = "duck_case" + pipeline = destination_config.setup_pipeline("test_duck_case_names") + # create tables and columns with emojis and other special characters + pipeline.run(airtable_emojis().with_resources("šŸ“† Schedule", "šŸ¦šPeacock", "šŸ¦šWidePeacock")) + pipeline.run([{"šŸ¾Feet": 2, "1+1": "two", "\nhey": "value"}], table_name="šŸ¦šPeacocksšŸ¦š") + table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) + assert table_counts == { + "šŸ“† Schedule": 3, + "šŸ¦šPeacock": 1, + 'šŸ¦šPeacock__peacock': 3, + 'šŸ¦šPeacocksšŸ¦š': 1, + 'šŸ¦šWidePeacock': 1, + 'šŸ¦šWidePeacock__peacock': 3 + } + + # this will fail - duckdb preserves case but is case insensitive when comparing identifiers + with pytest.raises(PipelineStepFailed) as pip_ex: + pipeline.run([{"šŸ¾Feet": 2, "1+1": "two", "šŸ¾feet": "value"}], table_name="šŸ¦špeacocksšŸ¦š") + assert isinstance(pip_ex.value.__context__, DatabaseTerminalException) + + # show tables and columns + with pipeline.sql_client() as client: + with client.execute_query("DESCRIBE šŸ¦špeacocksšŸ¦š;") as q: + tables = q.df() + assert tables["column_name"].tolist() == ["šŸ¾Feet", "1+1", "hey", "_dlt_load_id", "_dlt_id"] + + diff --git a/tests/normalize/test_normalize.py b/tests/normalize/test_normalize.py index 33315e041c..75831d24b1 100644 --- a/tests/normalize/test_normalize.py +++ b/tests/normalize/test_normalize.py @@ -312,16 +312,11 @@ def test_normalize_twice_with_flatten(caps: DestinationCapabilitiesContext, raw_ # check if schema contains a few crucial tables def assert_schema(_schema: Schema): - convention = _schema._normalizers_config["names"] - if convention == "snake_case": - assert "reactions___1" in _schema.tables["issues"]["columns"] - assert "reactions__x1" in _schema.tables["issues"]["columns"] - assert "reactions__1" not in _schema.tables["issues"]["columns"] - elif convention == "duck_case": - assert "reactions__+1" in _schema.tables["issues"]["columns"] - assert "reactions__-1" in _schema.tables["issues"]["columns"] - else: - raise ValueError(f"convention {convention} cannot be checked") + # convention = _schema._normalizers_config["names"] + assert "reactions___1" in _schema.tables["issues"]["columns"] + assert "reactions__x1" in _schema.tables["issues"]["columns"] + assert "reactions__1" not in _schema.tables["issues"]["columns"] + schema = raw_normalize.load_or_create_schema(raw_normalize.schema_storage, "github") assert_schema(schema) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index c668d81073..f9acf20437 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -146,7 +146,6 @@ def test_pipeline_context() -> None: assert ctx.pipeline() is p2 assert p.is_active is False assert p2.is_active is True - assert Container()[DestinationCapabilitiesContext].naming_convention == "duck_case" p3 = dlt.pipeline(pipeline_name="more pipelines", destination="dummy") assert ctx.pipeline() is p3 @@ -159,7 +158,6 @@ def test_pipeline_context() -> None: assert ctx.pipeline() is p2 assert p3.is_active is False assert p2.is_active is True - assert Container()[DestinationCapabilitiesContext].naming_convention == "duck_case" def test_import_unknown_destination() -> None: From 04714d53f94d85c600022fd181a05620da4c113a Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Fri, 29 Sep 2023 00:00:54 +0200 Subject: [PATCH 2/4] adds clone resource with new name to separate state --- dlt/extract/pipe.py | 7 ++++--- dlt/extract/source.py | 22 ++++++++++++++++------ tests/extract/test_sources.py | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 9 deletions(-) diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 2c1cebe177..7759db4c6c 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -401,9 +401,10 @@ def _ensure_transform_step(self, step_no: int, step: TPipeStep) -> None: else: raise InvalidStepFunctionArguments(self.name, callable_name, sig, str(ty_ex)) - def _clone(self, keep_pipe_id: bool = True) -> "Pipe": - """Clones the pipe steps, optionally keeping the pipe id. Used internally to clone a list of connected pipes.""" - p = Pipe(self.name, [], self.parent) + def _clone(self, keep_pipe_id: bool = True, new_name: str = None) -> "Pipe": + """Clones the pipe steps, optionally keeping the pipe id or renaming the pipe. Used internally to clone a list of connected pipes.""" + assert not (new_name and keep_pipe_id), "Cannot keep pipe id when renaming the pipe" + p = Pipe(new_name or self.name, [], self.parent) p._steps = self._steps.copy() # clone shares the id with the original if keep_pipe_id: diff --git a/dlt/extract/source.py b/dlt/extract/source.py index f28a54023d..ecdb0f1993 100644 --- a/dlt/extract/source.py +++ b/dlt/extract/source.py @@ -1,6 +1,6 @@ import warnings import contextlib -from copy import copy +from copy import copy, deepcopy import makefun import inspect from typing import AsyncIterable, AsyncIterator, ClassVar, Callable, ContextManager, Dict, Iterable, Iterator, List, Sequence, Tuple, Union, Any, Optional @@ -112,6 +112,10 @@ def name(self) -> str: """Resource name inherited from the pipe""" return self._name + def with_name(self, new_name: str) -> "DltResource": + """Clones the resource with a new name. Such resource keeps separate state and loads data to `new_name` table by default.""" + return self.clone(new_name=new_name) + @property def is_transformer(self) -> bool: """Checks if the resource is a transformer that takes data from another resource""" @@ -324,19 +328,25 @@ def state(self) -> StrAny: with inject_section(self._get_config_section_context()): return resource_state(self.name) - def clone(self, clone_pipe: bool = True, keep_pipe_id: bool = True) -> "DltResource": - """Creates a deep copy of a current resource, optionally cloning also pipe. Note that name of a containing source will not be cloned.""" + def clone(self, clone_pipe: bool = True, new_name: str = None) -> "DltResource": + """Creates a deep copy of a current resource, optionally renaming the resource (and cloning pipe). Note that name of a containing source will not be cloned.""" + assert not (new_name and not clone_pipe), "Must clone pipe when changing name" pipe = self._pipe if self._pipe and not self._pipe.is_empty and clone_pipe: - pipe = pipe._clone(keep_pipe_id=keep_pipe_id) + pipe = pipe._clone(keep_pipe_id=False, new_name=new_name) # incremental and parent are already in the pipe (if any) - return DltResource(pipe, self._table_schema_template, selected=self.selected, section=self.section) + return DltResource( + pipe, + deepcopy(self._table_schema_template), + selected=self.selected, + section=self.section + ) def __call__(self, *args: Any, **kwargs: Any) -> "DltResource": """Binds the parametrized resources to passed arguments. Creates and returns a bound resource. Generators and iterators are not evaluated.""" if self._bound: raise TypeError("Bound DltResource object is not callable") - r = self.clone(clone_pipe=True, keep_pipe_id=False) + r = self.clone(clone_pipe=True) return r.bind(*args, **kwargs) def __or__(self, transform: Union["DltResource", AnyFun]) -> "DltResource": diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index 6e72a9a014..5d6f5aa482 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -890,3 +890,35 @@ def mysource(): assert s.exhausted is False assert next(iter(s)) == 2 # transformer is returned befor resource assert s.exhausted is True + + +def test_clone_resource_with_name() -> None: + @dlt.resource(selected=False) + def _r1(): + yield ["a", "b", "c"] + + @dlt.transformer(selected=True) + def _t1(items, suffix): + yield list(map(lambda i: i + "_" + suffix, items)) + + + r1 = _r1() + r1_clone = r1.with_name("r1_clone") + # new name of resource and pipe + assert r1_clone.name == "r1_clone" + assert r1_clone._pipe.name == "r1_clone" + # original keeps old name and pipe + assert r1._pipe != r1_clone._pipe + assert r1.name == "_r1" + + # clone transformer + bound_t1_clone = r1_clone | _t1.with_name("t1_clone")("ax") + bound_t1_clone_2 = r1_clone | _t1("ax_2").with_name("t1_clone_2") + assert bound_t1_clone.name == "t1_clone" + assert bound_t1_clone_2.name == "t1_clone_2" + # but parent is the same + assert bound_t1_clone_2._pipe.parent == bound_t1_clone._pipe.parent + + # evaluate transformers + assert list(bound_t1_clone) == ['a_ax', 'b_ax', 'c_ax'] + assert list(bound_t1_clone_2) == ['a_ax_2', 'b_ax_2', 'c_ax_2'] From 032ae5ccfd530f5b4de1face5cb190c0f20be5d9 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Fri, 29 Sep 2023 00:01:16 +0200 Subject: [PATCH 3/4] adds size_in_bytes in FileItem --- dlt/common/storages/filesystem.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dlt/common/storages/filesystem.py b/dlt/common/storages/filesystem.py index ef670c784a..55711fa4fb 100644 --- a/dlt/common/storages/filesystem.py +++ b/dlt/common/storages/filesystem.py @@ -19,6 +19,7 @@ class FileItem(TypedDict): file_name: str mime_type: str modification_date: pendulum.DateTime + size_in_bytes: int file_content: Optional[Union[str, bytes]] From c11e9dbca26f60de100f51da2d4ed591e70341b4 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Fri, 29 Sep 2023 00:01:36 +0200 Subject: [PATCH 4/4] skips mssql tests if odbc driver not installed --- tests/load/mssql/test_mssql_table_builder.py | 2 ++ tests/load/pipeline/test_athena.py | 1 + tests/load/redshift/test_redshift_table_builder.py | 1 - 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/load/mssql/test_mssql_table_builder.py b/tests/load/mssql/test_mssql_table_builder.py index a858ce57fd..4f5a6637d6 100644 --- a/tests/load/mssql/test_mssql_table_builder.py +++ b/tests/load/mssql/test_mssql_table_builder.py @@ -5,6 +5,8 @@ from dlt.common.utils import uniq_id from dlt.common.schema import Schema +pytest.importorskip("dlt.destinations.mssql.mssql", reason="MSSQL ODBC driver not installed") + from dlt.destinations.mssql.mssql import MsSqlClient from dlt.destinations.mssql.configuration import MsSqlClientConfiguration, MsSqlCredentials diff --git a/tests/load/pipeline/test_athena.py b/tests/load/pipeline/test_athena.py index 6aa4a3c7b0..dd5baae73b 100644 --- a/tests/load/pipeline/test_athena.py +++ b/tests/load/pipeline/test_athena.py @@ -11,6 +11,7 @@ from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration + @pytest.mark.parametrize("destination_config", destinations_configs(default_sql_configs=True, subset=["athena"]), ids=lambda x: x.name) def test_athena_destinations(destination_config: DestinationTestConfiguration) -> None: diff --git a/tests/load/redshift/test_redshift_table_builder.py b/tests/load/redshift/test_redshift_table_builder.py index e6b9fc5a2c..8c61ccc1f2 100644 --- a/tests/load/redshift/test_redshift_table_builder.py +++ b/tests/load/redshift/test_redshift_table_builder.py @@ -6,7 +6,6 @@ from dlt.common.schema import Schema from dlt.common.configuration import resolve_configuration -from dlt.destinations.exceptions import DestinationSchemaWillNotUpdate from dlt.destinations.redshift.redshift import RedshiftClient from dlt.destinations.redshift.configuration import RedshiftClientConfiguration, RedshiftCredentials