Skip to content

Commit

Permalink
Merge pull request #660 from dlt-hub/rfix/duck-case
Browse files Browse the repository at this point in the history
Rfix/duck case
  • Loading branch information
rudolfix authored Sep 29, 2023
2 parents 2c5043c + c11e9db commit 35fcd00
Show file tree
Hide file tree
Showing 19 changed files with 166 additions and 63 deletions.
16 changes: 7 additions & 9 deletions dlt/common/normalizers/naming/duck_case.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
import re
from functools import lru_cache

from dlt.common.normalizers.naming.snake_case import NamingConvention as BaseNamingConvention
from dlt.common.normalizers.naming.snake_case import NamingConvention as SnakeCaseNamingConvention


class NamingConvention(BaseNamingConvention):
class NamingConvention(SnakeCaseNamingConvention):

_RE_NON_ALPHANUMERIC = re.compile(r"[^a-zA-Z\d_+-]+")
_REDUCE_ALPHABET = ("*@|", "xal")
_TR_REDUCE_ALPHABET = str.maketrans(_REDUCE_ALPHABET[0], _REDUCE_ALPHABET[1])
_CLEANUP_TABLE = str.maketrans("\n\r\"", "___")
_RE_LEADING_DIGITS = None # do not remove leading digits

@staticmethod
@lru_cache(maxsize=None)
def _normalize_identifier(identifier: str, max_length: int) -> str:
"""Normalizes the identifier according to naming convention represented by this function"""
# all characters that are not letters digits or a few special chars are replaced with underscore
normalized_ident = identifier.translate(NamingConvention._TR_REDUCE_ALPHABET)
normalized_ident = NamingConvention._RE_NON_ALPHANUMERIC.sub("_", normalized_ident)

normalized_ident = identifier.translate(NamingConvention._CLEANUP_TABLE)

# shorten identifier
return NamingConvention.shorten_identifier(
NamingConvention._to_snake_case(normalized_ident),
NamingConvention._RE_UNDERSCORES.sub("_", normalized_ident),
identifier,
max_length
)
16 changes: 8 additions & 8 deletions dlt/common/normalizers/naming/snake_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,21 @@ def _normalize_identifier(identifier: str, max_length: int) -> str:
max_length
)

@staticmethod
def _to_snake_case(identifier: str) -> str:
@classmethod
def _to_snake_case(cls, identifier: str) -> str:
# then convert to snake case
identifier = NamingConvention._SNAKE_CASE_BREAK_1.sub(r'\1_\2', identifier)
identifier = NamingConvention._SNAKE_CASE_BREAK_2.sub(r'\1_\2', identifier).lower()
identifier = cls._SNAKE_CASE_BREAK_1.sub(r'\1_\2', identifier)
identifier = cls._SNAKE_CASE_BREAK_2.sub(r'\1_\2', identifier).lower()

# leading digits will be prefixed
if NamingConvention._RE_LEADING_DIGITS.match(identifier):
# leading digits will be prefixed (if regex is defined)
if cls._RE_LEADING_DIGITS and cls._RE_LEADING_DIGITS.match(identifier):
identifier = "_" + identifier

# replace trailing _ with x
stripped_ident = identifier.rstrip("_")
strip_count = len(identifier) - len(stripped_ident)
stripped_ident += "x" * strip_count

# identifier = NamingConvention._RE_ENDING_UNDERSCORES.sub("x", identifier)
# identifier = cls._RE_ENDING_UNDERSCORES.sub("x", identifier)
# replace consecutive underscores with single one to prevent name clashes with PATH_SEPARATOR
return NamingConvention._RE_UNDERSCORES.sub("_", stripped_ident)
return cls._RE_UNDERSCORES.sub("_", stripped_ident)
1 change: 1 addition & 0 deletions dlt/common/storages/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class FileItem(TypedDict):
file_name: str
mime_type: str
modification_date: pendulum.DateTime
size_in_bytes: int
file_content: Optional[Union[str, bytes]]


Expand Down
1 change: 0 additions & 1 deletion dlt/destinations/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def capabilities() -> DestinationCapabilitiesContext:
caps.wei_precision = (DEFAULT_NUMERIC_PRECISION, 0)
caps.max_identifier_length = 65536
caps.max_column_identifier_length = 65536
caps.naming_convention = "duck_case"
caps.max_query_length = 32 * 1024 * 1024
caps.is_max_query_length_in_bytes = True
caps.max_text_data_type_length = 1024 * 1024 * 1024
Expand Down
5 changes: 4 additions & 1 deletion dlt/destinations/duckdb/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ def fully_qualified_dataset_name(self, escape: bool = True) -> str:
@classmethod
def _make_database_exception(cls, ex: Exception) -> Exception:
if isinstance(ex, (duckdb.CatalogException)):
raise DatabaseUndefinedRelation(ex)
if "already exists" in str(ex):
raise DatabaseTerminalException(ex)
else:
raise DatabaseUndefinedRelation(ex)
elif isinstance(ex, duckdb.InvalidInputException):
if "Catalog Error" in str(ex):
raise DatabaseUndefinedRelation(ex)
Expand Down
1 change: 0 additions & 1 deletion dlt/destinations/motherduck/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def capabilities() -> DestinationCapabilitiesContext:
caps.wei_precision = (DEFAULT_NUMERIC_PRECISION, 0)
caps.max_identifier_length = 65536
caps.max_column_identifier_length = 65536
caps.naming_convention = "duck_case"
caps.max_query_length = 512 * 1024
caps.is_max_query_length_in_bytes = True
caps.max_text_data_type_length = 1024 * 1024 * 1024
Expand Down
7 changes: 4 additions & 3 deletions dlt/extract/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,10 @@ def _ensure_transform_step(self, step_no: int, step: TPipeStep) -> None:
else:
raise InvalidStepFunctionArguments(self.name, callable_name, sig, str(ty_ex))

def _clone(self, keep_pipe_id: bool = True) -> "Pipe":
"""Clones the pipe steps, optionally keeping the pipe id. Used internally to clone a list of connected pipes."""
p = Pipe(self.name, [], self.parent)
def _clone(self, keep_pipe_id: bool = True, new_name: str = None) -> "Pipe":
"""Clones the pipe steps, optionally keeping the pipe id or renaming the pipe. Used internally to clone a list of connected pipes."""
assert not (new_name and keep_pipe_id), "Cannot keep pipe id when renaming the pipe"
p = Pipe(new_name or self.name, [], self.parent)
p._steps = self._steps.copy()
# clone shares the id with the original
if keep_pipe_id:
Expand Down
22 changes: 16 additions & 6 deletions dlt/extract/source.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import warnings
import contextlib
from copy import copy
from copy import copy, deepcopy
import makefun
import inspect
from typing import AsyncIterable, AsyncIterator, ClassVar, Callable, ContextManager, Dict, Iterable, Iterator, List, Sequence, Tuple, Union, Any, Optional
Expand Down Expand Up @@ -112,6 +112,10 @@ def name(self) -> str:
"""Resource name inherited from the pipe"""
return self._name

def with_name(self, new_name: str) -> "DltResource":
"""Clones the resource with a new name. Such resource keeps separate state and loads data to `new_name` table by default."""
return self.clone(new_name=new_name)

@property
def is_transformer(self) -> bool:
"""Checks if the resource is a transformer that takes data from another resource"""
Expand Down Expand Up @@ -324,19 +328,25 @@ def state(self) -> StrAny:
with inject_section(self._get_config_section_context()):
return resource_state(self.name)

def clone(self, clone_pipe: bool = True, keep_pipe_id: bool = True) -> "DltResource":
"""Creates a deep copy of a current resource, optionally cloning also pipe. Note that name of a containing source will not be cloned."""
def clone(self, clone_pipe: bool = True, new_name: str = None) -> "DltResource":
"""Creates a deep copy of a current resource, optionally renaming the resource (and cloning pipe). Note that name of a containing source will not be cloned."""
assert not (new_name and not clone_pipe), "Must clone pipe when changing name"
pipe = self._pipe
if self._pipe and not self._pipe.is_empty and clone_pipe:
pipe = pipe._clone(keep_pipe_id=keep_pipe_id)
pipe = pipe._clone(keep_pipe_id=False, new_name=new_name)
# incremental and parent are already in the pipe (if any)
return DltResource(pipe, self._table_schema_template, selected=self.selected, section=self.section)
return DltResource(
pipe,
deepcopy(self._table_schema_template),
selected=self.selected,
section=self.section
)

def __call__(self, *args: Any, **kwargs: Any) -> "DltResource":
"""Binds the parametrized resources to passed arguments. Creates and returns a bound resource. Generators and iterators are not evaluated."""
if self._bound:
raise TypeError("Bound DltResource object is not callable")
r = self.clone(clone_pipe=True, keep_pipe_id=False)
r = self.clone(clone_pipe=True)
return r.bind(*args, **kwargs)

def __or__(self, transform: Union["DltResource", AnyFun]) -> "DltResource":
Expand Down
21 changes: 21 additions & 0 deletions docs/website/docs/dlt-ecosystem/destinations/duckdb.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,27 @@ All write dispositions are supported
## Data loading
`dlt` will load data using large INSERT VALUES statements by default. Loading is multithreaded (20 threads by default). If you are ok with installing `pyarrow` we suggest to switch to `parquet` as file format. Loading is faster (and also multithreaded).

### Names normalization
`dlt` uses standard **snake_case** naming convention to keep identical table and column identifiers across all destinations. If you want to use **duckdb** wide range of characters (ie. emojis) for table and column names, you can switch to **duck_case** naming convention which accepts almost any string as an identifier:
* `\n` `\r` and `" are translated to `_`
* multiple `_` are translated to single `_`

Switch the naming convention using `config.toml`:
```toml
[schema]
naming="duck_case"
```

or via env variable `SCHEMA__NAMING` or directly in code:
```python
dlt.config["schema.naming"] = "duck_case"
```
:::caution
**duckdb** identifiers are **case insensitive** but display names preserve case. This may create name clashes if for example you load json with
`{"Column": 1, "column": 2}` will map data to a single column.
:::


## Supported file formats
You can configure the following file formats to load data to duckdb
* [insert-values](../file-formats/insert-format.md) is used by default
Expand Down
2 changes: 1 addition & 1 deletion docs/website/docs/dlt-ecosystem/destinations/weaviate.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ it will be normalized to:
so your best course of action is to clean up the data yourself before loading and use default naming convention. Nevertheless you can configure the alternative in `config.toml`:
```toml
[schema]
naming="dlt.destinations.weaviate.naming"
naming="dlt.destinations.weaviate.ci_naming"
```

## Additional destination options
Expand Down
16 changes: 7 additions & 9 deletions tests/common/normalizers/test_naming_duck_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ def naming_unlimited() -> NamingConvention:
def test_normalize_identifier(naming_unlimited: NamingConvention) -> None:
assert naming_unlimited.normalize_identifier("+1") == "+1"
assert naming_unlimited.normalize_identifier("-1") == "-1"
assert naming_unlimited.normalize_identifier("1-1") == "1-1"
assert naming_unlimited.normalize_identifier("🦚Peacock") == "🦚Peacock"
assert naming_unlimited.normalize_identifier("🦚🦚Peacocks") == "🦚🦚Peacocks"
assert naming_unlimited.normalize_identifier("🦚🦚peacocks") == "🦚🦚peacocks"
# non latin alphabets
assert naming_unlimited.normalize_identifier("Ölübeµrsईउऊऋऌऍऎएc⇨usǁs⛔lÄnder") == "Ölübeµrsईउऊऋऌऍऎएc⇨usǁs⛔lÄnder"


def test_alphabet_reduction(naming_unlimited: NamingConvention) -> None:
assert naming_unlimited.normalize_identifier(NamingConvention._REDUCE_ALPHABET[0]) == NamingConvention._REDUCE_ALPHABET[1]


def test_duck_snake_case_compat(naming_unlimited: NamingConvention) -> None:
snake_unlimited = SnakeNamingConvention()
# same reduction duck -> snake
assert snake_unlimited.normalize_identifier(NamingConvention._REDUCE_ALPHABET[0]) == NamingConvention._REDUCE_ALPHABET[1]
# but there are differences in the reduction
assert naming_unlimited.normalize_identifier(SnakeNamingConvention._REDUCE_ALPHABET[0]) != snake_unlimited.normalize_identifier(SnakeNamingConvention._REDUCE_ALPHABET[0])
assert naming_unlimited.normalize_identifier("A\nB\"C\rD") == "A_B_C_D"
26 changes: 15 additions & 11 deletions tests/common/normalizers/test_naming_snake_case.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from typing import Type
import pytest

from dlt.common.normalizers.naming.snake_case import NamingConvention
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.normalizers.naming.snake_case import NamingConvention as SnakeCaseNamingConvention
from dlt.common.normalizers.naming.duck_case import NamingConvention as DuckCaseNamingConvention


@pytest.fixture
def naming_unlimited() -> NamingConvention:
return NamingConvention()
return SnakeCaseNamingConvention()


def test_normalize_identifier(naming_unlimited: NamingConvention) -> None:
Expand Down Expand Up @@ -33,12 +36,9 @@ def test_normalize_identifier(naming_unlimited: NamingConvention) -> None:
assert naming_unlimited.normalize_identifier("+1") == "x1"
assert naming_unlimited.normalize_identifier("-1") == "_1"

# non latin alphabets
# assert naming_unlimited.normalize_identifier("Ölübeµrsईउऊऋऌऍऎएc⇨usǁs⛔lÄnder") == "ölüberschussländer"


def test_alphabet_reduction(naming_unlimited: NamingConvention) -> None:
assert naming_unlimited.normalize_identifier(NamingConvention._REDUCE_ALPHABET[0]) == NamingConvention._REDUCE_ALPHABET[1]
assert naming_unlimited.normalize_identifier(SnakeCaseNamingConvention._REDUCE_ALPHABET[0]) == SnakeCaseNamingConvention._REDUCE_ALPHABET[1]


def test_normalize_path(naming_unlimited: NamingConvention) -> None:
Expand All @@ -51,12 +51,14 @@ def test_normalize_path(naming_unlimited: NamingConvention) -> None:


def test_normalize_non_alpha_single_underscore() -> None:
assert NamingConvention._RE_NON_ALPHANUMERIC.sub("_", "-=!*") == "_"
assert NamingConvention._RE_NON_ALPHANUMERIC.sub("_", "1-=!0*-") == "1_0_"
assert NamingConvention._RE_NON_ALPHANUMERIC.sub("_", "1-=!_0*-") == "1__0_"
assert SnakeCaseNamingConvention._RE_NON_ALPHANUMERIC.sub("_", "-=!*") == "_"
assert SnakeCaseNamingConvention._RE_NON_ALPHANUMERIC.sub("_", "1-=!0*-") == "1_0_"
assert SnakeCaseNamingConvention._RE_NON_ALPHANUMERIC.sub("_", "1-=!_0*-") == "1__0_"


def test_normalize_break_path(naming_unlimited: NamingConvention) -> None:
@pytest.mark.parametrize("convention", (SnakeCaseNamingConvention, DuckCaseNamingConvention))
def test_normalize_break_path(convention: Type[NamingConvention]) -> None:
naming_unlimited = convention()
assert naming_unlimited.break_path("A__B__C") == ["A", "B", "C"]
# what if path has _a and _b which valid normalized idents
assert naming_unlimited.break_path("_a___b__C___D") == ["_a", "_b", "C", "_D"]
Expand All @@ -66,7 +68,9 @@ def test_normalize_break_path(naming_unlimited: NamingConvention) -> None:
assert naming_unlimited.break_path("_a__ \t\r__b") == ["_a", "b"]


def test_normalize_make_path(naming_unlimited: NamingConvention) -> None:
@pytest.mark.parametrize("convention", (SnakeCaseNamingConvention, DuckCaseNamingConvention))
def test_normalize_make_path(convention: Type[NamingConvention]) -> None:
naming_unlimited = convention()
assert naming_unlimited.make_path("A", "B") == "A__B"
assert naming_unlimited.make_path("_A", "_B") == "_A___B"
assert naming_unlimited.make_path("_A", "", "_B") == "_A___B"
Expand Down
32 changes: 32 additions & 0 deletions tests/extract/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,3 +890,35 @@ def mysource():
assert s.exhausted is False
assert next(iter(s)) == 2 # transformer is returned befor resource
assert s.exhausted is True


def test_clone_resource_with_name() -> None:
@dlt.resource(selected=False)
def _r1():
yield ["a", "b", "c"]

@dlt.transformer(selected=True)
def _t1(items, suffix):
yield list(map(lambda i: i + "_" + suffix, items))


r1 = _r1()
r1_clone = r1.with_name("r1_clone")
# new name of resource and pipe
assert r1_clone.name == "r1_clone"
assert r1_clone._pipe.name == "r1_clone"
# original keeps old name and pipe
assert r1._pipe != r1_clone._pipe
assert r1.name == "_r1"

# clone transformer
bound_t1_clone = r1_clone | _t1.with_name("t1_clone")("ax")
bound_t1_clone_2 = r1_clone | _t1("ax_2").with_name("t1_clone_2")
assert bound_t1_clone.name == "t1_clone"
assert bound_t1_clone_2.name == "t1_clone_2"
# but parent is the same
assert bound_t1_clone_2._pipe.parent == bound_t1_clone._pipe.parent

# evaluate transformers
assert list(bound_t1_clone) == ['a_ax', 'b_ax', 'c_ax']
assert list(bound_t1_clone_2) == ['a_ax_2', 'b_ax_2', 'c_ax_2']
2 changes: 2 additions & 0 deletions tests/load/mssql/test_mssql_table_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from dlt.common.utils import uniq_id
from dlt.common.schema import Schema

pytest.importorskip("dlt.destinations.mssql.mssql", reason="MSSQL ODBC driver not installed")

from dlt.destinations.mssql.mssql import MsSqlClient
from dlt.destinations.mssql.configuration import MsSqlClientConfiguration, MsSqlCredentials

Expand Down
1 change: 1 addition & 0 deletions tests/load/pipeline/test_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration


@pytest.mark.parametrize("destination_config", destinations_configs(default_sql_configs=True, subset=["athena"]), ids=lambda x: x.name)
def test_athena_destinations(destination_config: DestinationTestConfiguration) -> None:

Expand Down
42 changes: 42 additions & 0 deletions tests/load/pipeline/test_duckdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import pytest
import os

import dlt
from dlt.destinations.exceptions import DatabaseTerminalException
from dlt.pipeline.exceptions import PipelineStepFailed

from tests.pipeline.utils import airtable_emojis
from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration, load_table_counts


@pytest.mark.parametrize("destination_config", destinations_configs(default_sql_configs=True, subset=["duckdb"]), ids=lambda x: x.name)
def test_duck_case_names(destination_config: DestinationTestConfiguration) -> None:
# we want to have nice tables
# dlt.config["schema.naming"] = "duck_case"
os.environ["SCHEMA__NAMING"] = "duck_case"
pipeline = destination_config.setup_pipeline("test_duck_case_names")
# create tables and columns with emojis and other special characters
pipeline.run(airtable_emojis().with_resources("📆 Schedule", "🦚Peacock", "🦚WidePeacock"))
pipeline.run([{"🐾Feet": 2, "1+1": "two", "\nhey": "value"}], table_name="🦚Peacocks🦚")
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts == {
"📆 Schedule": 3,
"🦚Peacock": 1,
'🦚Peacock__peacock': 3,
'🦚Peacocks🦚': 1,
'🦚WidePeacock': 1,
'🦚WidePeacock__peacock': 3
}

# this will fail - duckdb preserves case but is case insensitive when comparing identifiers
with pytest.raises(PipelineStepFailed) as pip_ex:
pipeline.run([{"🐾Feet": 2, "1+1": "two", "🐾feet": "value"}], table_name="🦚peacocks🦚")
assert isinstance(pip_ex.value.__context__, DatabaseTerminalException)

# show tables and columns
with pipeline.sql_client() as client:
with client.execute_query("DESCRIBE 🦚peacocks🦚;") as q:
tables = q.df()
assert tables["column_name"].tolist() == ["🐾Feet", "1+1", "hey", "_dlt_load_id", "_dlt_id"]


1 change: 0 additions & 1 deletion tests/load/redshift/test_redshift_table_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from dlt.common.schema import Schema
from dlt.common.configuration import resolve_configuration

from dlt.destinations.exceptions import DestinationSchemaWillNotUpdate
from dlt.destinations.redshift.redshift import RedshiftClient
from dlt.destinations.redshift.configuration import RedshiftClientConfiguration, RedshiftCredentials

Expand Down
Loading

0 comments on commit 35fcd00

Please sign in to comment.