From dbbbe7c9f9432955bfaec8e00dc8f1831521a424 Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 4 Mar 2024 17:46:15 +0100 Subject: [PATCH] add support for directly passing through the naming convention to the sink --- dlt/destinations/decorators.py | 11 ++++++++-- dlt/destinations/impl/sink/factory.py | 7 ++++++- tests/load/sink/test_sink.py | 30 +++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 3 deletions(-) diff --git a/dlt/destinations/decorators.py b/dlt/destinations/decorators.py index a21e8eaca8..cbeeff8975 100644 --- a/dlt/destinations/decorators.py +++ b/dlt/destinations/decorators.py @@ -7,14 +7,21 @@ def sink( - loader_file_format: TLoaderFileFormat = None, batch_size: int = 10, name: str = None + loader_file_format: TLoaderFileFormat = None, + batch_size: int = 10, + name: str = None, + naming_convention: str = "direct", ) -> Any: def decorator(f: TSinkCallable) -> TDestinationReferenceArg: nonlocal name if name is None: name = get_callable_name(f) return _sink( - credentials=f, loader_file_format=loader_file_format, batch_size=batch_size, name=name + credentials=f, + loader_file_format=loader_file_format, + batch_size=batch_size, + name=name, + naming_convention=naming_convention, ) return decorator diff --git a/dlt/destinations/impl/sink/factory.py b/dlt/destinations/impl/sink/factory.py index e65185cb8b..6b2e98271e 100644 --- a/dlt/destinations/impl/sink/factory.py +++ b/dlt/destinations/impl/sink/factory.py @@ -18,7 +18,10 @@ class sink(Destination[SinkClientConfiguration, "SinkClient"]): spec = SinkClientConfiguration def capabilities(self) -> DestinationCapabilitiesContext: - return capabilities(self.config_params.get("loader_file_format", "puae-jsonl")) + return capabilities( + self.config_params.get("loader_file_format", "puae-jsonl"), + self.config_params.get("naming_convention", "direct"), + ) @property def client_class(self) -> t.Type["SinkClient"]: @@ -33,6 +36,7 @@ def __init__( environment: t.Optional[str] = None, loader_file_format: TLoaderFileFormat = None, batch_size: int = 10, + naming_convention: str = "direct", **kwargs: t.Any, ) -> None: super().__init__( @@ -41,5 +45,6 @@ def __init__( environment=environment, loader_file_format=loader_file_format, batch_size=batch_size, + naming_convention=naming_convention, **kwargs, ) diff --git a/tests/load/sink/test_sink.py b/tests/load/sink/test_sink.py index c1ae8153c2..72dcfd5b1e 100644 --- a/tests/load/sink/test_sink.py +++ b/tests/load/sink/test_sink.py @@ -288,3 +288,33 @@ def assert_items_in_range(c: List[TDataItems], start: int, end: int) -> None: # both calls combined should have every item called just once assert_items_in_range(calls["items"] + first_calls["items"], 0, 100) assert_items_in_range(calls["items2"] + first_calls["items2"], 0, 100) + + +def test_naming_convention() -> None: + @dlt.resource(table_name="PErson") + def resource(): + yield [{"UpperCase": 1, "snake_case": 1, "camelCase": 1}] + + # check snake case + @dlt.destination(naming_convention="snake_case") + def snake_sink(items, table): + if table["name"].startswith("_dlt"): + return + assert table["name"] == "p_erson" + assert table["columns"]["upper_case"]["name"] == "upper_case" + assert table["columns"]["snake_case"]["name"] == "snake_case" + assert table["columns"]["camel_case"]["name"] == "camel_case" + + dlt.pipeline("sink_test", destination=snake_sink, full_refresh=True).run(resource()) + + # check default (which is direct) + @dlt.destination() + def direct_sink(items, table): + if table["name"].startswith("_dlt"): + return + assert table["name"] == "PErson" + assert table["columns"]["UpperCase"]["name"] == "UpperCase" + assert table["columns"]["snake_case"]["name"] == "snake_case" + assert table["columns"]["camelCase"]["name"] == "camelCase" + + dlt.pipeline("sink_test", destination=direct_sink, full_refresh=True).run(resource())