Skip to content

Commit

Permalink
add support for directly passing through the naming convention to the…
Browse files Browse the repository at this point in the history
… sink
  • Loading branch information
sh-rp committed Mar 4, 2024
1 parent 3229745 commit dbbbe7c
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 3 deletions.
11 changes: 9 additions & 2 deletions dlt/destinations/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,21 @@


def sink(
loader_file_format: TLoaderFileFormat = None, batch_size: int = 10, name: str = None
loader_file_format: TLoaderFileFormat = None,
batch_size: int = 10,
name: str = None,
naming_convention: str = "direct",
) -> Any:
def decorator(f: TSinkCallable) -> TDestinationReferenceArg:
nonlocal name
if name is None:
name = get_callable_name(f)
return _sink(
credentials=f, loader_file_format=loader_file_format, batch_size=batch_size, name=name
credentials=f,
loader_file_format=loader_file_format,
batch_size=batch_size,
name=name,
naming_convention=naming_convention,
)

return decorator
7 changes: 6 additions & 1 deletion dlt/destinations/impl/sink/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ class sink(Destination[SinkClientConfiguration, "SinkClient"]):
spec = SinkClientConfiguration

def capabilities(self) -> DestinationCapabilitiesContext:
return capabilities(self.config_params.get("loader_file_format", "puae-jsonl"))
return capabilities(
self.config_params.get("loader_file_format", "puae-jsonl"),
self.config_params.get("naming_convention", "direct"),
)

@property
def client_class(self) -> t.Type["SinkClient"]:
Expand All @@ -33,6 +36,7 @@ def __init__(
environment: t.Optional[str] = None,
loader_file_format: TLoaderFileFormat = None,
batch_size: int = 10,
naming_convention: str = "direct",
**kwargs: t.Any,
) -> None:
super().__init__(
Expand All @@ -41,5 +45,6 @@ def __init__(
environment=environment,
loader_file_format=loader_file_format,
batch_size=batch_size,
naming_convention=naming_convention,
**kwargs,
)
30 changes: 30 additions & 0 deletions tests/load/sink/test_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,33 @@ def assert_items_in_range(c: List[TDataItems], start: int, end: int) -> None:
# both calls combined should have every item called just once
assert_items_in_range(calls["items"] + first_calls["items"], 0, 100)
assert_items_in_range(calls["items2"] + first_calls["items2"], 0, 100)


def test_naming_convention() -> None:
@dlt.resource(table_name="PErson")
def resource():
yield [{"UpperCase": 1, "snake_case": 1, "camelCase": 1}]

# check snake case
@dlt.destination(naming_convention="snake_case")
def snake_sink(items, table):
if table["name"].startswith("_dlt"):
return
assert table["name"] == "p_erson"
assert table["columns"]["upper_case"]["name"] == "upper_case"
assert table["columns"]["snake_case"]["name"] == "snake_case"
assert table["columns"]["camel_case"]["name"] == "camel_case"

dlt.pipeline("sink_test", destination=snake_sink, full_refresh=True).run(resource())

# check default (which is direct)
@dlt.destination()
def direct_sink(items, table):
if table["name"].startswith("_dlt"):
return
assert table["name"] == "PErson"
assert table["columns"]["UpperCase"]["name"] == "UpperCase"
assert table["columns"]["snake_case"]["name"] == "snake_case"
assert table["columns"]["camelCase"]["name"] == "camelCase"

dlt.pipeline("sink_test", destination=direct_sink, full_refresh=True).run(resource())

0 comments on commit dbbbe7c

Please sign in to comment.