Skip to content

Commit

Permalink
Nested hints tests, handle table name overrides
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Dec 20, 2024
1 parent 743816f commit d0a83b2
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 14 deletions.
11 changes: 7 additions & 4 deletions dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ def _compute_table(
for item in reversed(items):
computed_tables = super()._compute_table(resource, item, Any)
for computed_table in computed_tables:
arrow_table = arrow_tables.get(computed_table['name'])
arrow_table = arrow_tables.get(computed_table["name"])
# Merge the columns to include primary_key and other hints that may be set on the resource
if arrow_table:
utils.merge_table(self.schema.name, computed_table, arrow_table)
Expand All @@ -431,7 +431,10 @@ def _compute_table(

# Add load_id column if needed
dlt_load_id = self.naming.normalize_identifier(C_DLT_LOAD_ID)
if self._normalize_config.add_dlt_load_id and dlt_load_id not in arrow_table["columns"]:
if (
self._normalize_config.add_dlt_load_id
and dlt_load_id not in arrow_table["columns"]
):
# will be normalized line below
arrow_table["columns"][C_DLT_LOAD_ID] = utils.dlt_load_id_column()

Expand All @@ -446,8 +449,8 @@ def _compute_table(
if src_hint != hint:
override_warn = True
logger.info(
f"In resource: {resource.name}, when merging arrow schema on"
f" column {col_name}. The hint {hint_name} value"
f"In resource: {resource.name}, when merging arrow schema"
f" on column {col_name}. The hint {hint_name} value"
f" {src_hint} defined in resource will overwrite arrow hint"
f" with value {hint}."
)
Expand Down
24 changes: 14 additions & 10 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,15 @@ def parent_table_name(self) -> TTableHintTemplate[str]:
return None if self._hints is None else self._hints.get("parent")

def _walk_nested_hints(
self, path: List[str] = None
) -> Iterator[Tuple[List[str], "DltResourceHints"]]:
self, path: Tuple[str] = None
) -> Iterator[Tuple[Tuple[str, ...], "DltResourceHints"]]:
"""Walk nested hints recursively to generate a flat iterator of path and `DltResourceHints` instance pairs"""
if path is None:
path = []
path = tuple()
if path:
yield path, self
for key, nested_instance in self._nested_hints.items():
yield from nested_instance._walk_nested_hints(path + [key])
yield from nested_instance._walk_nested_hints(path + (key,))

def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTableSchema:
"""Computes the table schema based on hints and column definitions passed during resource creation.
Expand Down Expand Up @@ -320,12 +320,16 @@ def compute_table_chain(self, item: TDataItem = None, meta: Any = None) -> List[
else:
root_table_name = root_table["name"]
result = [root_table]
path_to_name: Dict[Tuple[str, ...], str] = {(root_table_name,): root_table_name}
for path, instance in self._walk_nested_hints():
full_path = [root_table_name] + path
table_name = "__".join(full_path) # TODO: naming convention
full_path = (root_table_name,) + path
table = instance.compute_table_schema(item, meta)
table["name"] = table_name
parent_name = "__".join(full_path[:-1])
if not table.get("name"):
table["name"] = "__".join(full_path) # TODO: naming convention
path_to_name[full_path] = table["name"]
parent_name = path_to_name[full_path[:-1]]

# parent_name = "__".join(full_path[:-1])
table["parent"] = parent_name

result.append(table)
Expand Down Expand Up @@ -369,7 +373,7 @@ def apply_hints(
hints_instance: DltResourceHints
if path:
path = tuple(path)
hints_instance = self._nested_hints.get(path)
hints_instance = self._nested_hints.get(path) # type: ignore[call-overload]
if not hints_instance:
hints_instance = DltResourceHints()
else:
Expand Down Expand Up @@ -666,7 +670,7 @@ def _create_table_schema(resource_hints: TResourceHints, resource_name: str) ->
"""Creates table schema from resource hints and resource name. Resource hints are resolved
(do not contain callables) and will be modified in place
"""
resource_hints["name"] = resource_hints.pop("table_name")
resource_hints["name"] = resource_hints.pop("table_name") # type: ignore[typeddict-unknown-key]
DltResourceHints._merge_keys(resource_hints)
if "write_disposition" in resource_hints:
if isinstance(resource_hints["write_disposition"], str):
Expand Down
98 changes: 98 additions & 0 deletions tests/pipeline/test_table_hints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import dlt


@dlt.resource
def nested_data():
yield [
{
"id": 1,
"a": [
{
"a_id": "2",
"b": [{"b_id": "3"}],
}
],
"c": [
{
"c_id": "4",
"d": [{"d_id": "5"}],
}
],
}
]


def test_apply_hints_nested_hints_column_types() -> None:
nested_data_rs = nested_data()

nested_data_rs.apply_hints(
path=["a", "b"],
columns=[
{
"name": "b_id",
"data_type": "bigint",
},
],
)
nested_data_rs.apply_hints(
path=["c"],
columns=[
{
"name": "c_id",
"data_type": "double",
},
],
)

pipeline = dlt.pipeline(
pipeline_name="test_apply_hints_nested_hints", dev_mode=True, destination="duckdb"
)
pipeline.run(nested_data_rs)

schema_tables = pipeline.default_schema.tables

assert schema_tables["nested_data__a__b"]["columns"]["b_id"]["data_type"] == "bigint"
assert schema_tables["nested_data__c"]["columns"]["c_id"]["data_type"] == "double"

assert schema_tables["nested_data__a__b"]["parent"] == "nested_data__a"
assert schema_tables["nested_data__c"]["parent"] == "nested_data"

# Try changing the parent name
nested_data_rs.apply_hints(table_name="override_parent")

pipeline = dlt.pipeline(
pipeline_name="test_apply_hints_nested_hints_2", dev_mode=True, destination="duckdb"
)
pipeline.run(nested_data_rs)

schema_tables = pipeline.default_schema.tables

assert schema_tables["override_parent__a__b"]["parent"] == "override_parent__a"
assert schema_tables["override_parent__c"]["parent"] == "override_parent"
assert schema_tables["override_parent__a__b"]["name"] == "override_parent__a__b"
assert schema_tables["override_parent__a__b"]["columns"]["b_id"]["data_type"] == "bigint"

for key in schema_tables.keys():
assert not key.startswith("nested_data")


def test_apply_hints_nested_hints_override_child_name():
nested_data_rs = nested_data()

# Override both levels of child tables
nested_data_rs.apply_hints(path=["a"], table_name="override_child_a")
nested_data_rs.apply_hints(path=["a", "b"], table_name="override_child_a_b")

pipeline = dlt.pipeline(
pipeline_name="test_apply_hints_nested_hints_3", dev_mode=True, destination="duckdb"
)

pipeline.run(nested_data_rs)

schema_tables = pipeline.default_schema.tables

assert schema_tables["override_child_a_b"]["name"] == "override_child_a_b"
# Parent should match the overrid parent name
assert schema_tables["override_child_a_b"]["parent"] == "override_child_a"

assert schema_tables["override_child_a"]["name"] == "override_child_a"

0 comments on commit d0a83b2

Please sign in to comment.