Skip to content

Commit

Permalink
pr fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Oct 11, 2023
1 parent e0de413 commit 8c08f00
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 21 deletions.
4 changes: 4 additions & 0 deletions dlt/common/normalizers/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def normalize_data_item(self, item: TDataItem, load_id: str, table_name: str) ->
def extend_schema(self) -> None:
pass

@abc.abstractmethod
def extend_table(self, table_name: str) -> None:
pass

@classmethod
@abc.abstractmethod
def update_normalizer_config(cls, schema: Schema, config: TNormalizerConfig) -> None:
Expand Down
19 changes: 10 additions & 9 deletions dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,16 @@ def extend_schema(self) -> None:
}
)

# for every table with the write disposition merge, we propagate the root_key
for table_name, table in self.schema.tables.items():
if not table.get("parent") and table["write_disposition"] == "merge":
DataItemNormalizer.update_normalizer_config(self.schema, {"propagation": {
"tables": {
table_name: {
"_dlt_id": TColumnName("_dlt_root_id")
}
}}})
def extend_table(self, table_name: str) -> None:
# if the table has a merge w_d, add propagation info to normalizer
table = self.schema.tables.get(table_name)
if not table.get("parent") and table["write_disposition"] == "merge":
DataItemNormalizer.update_normalizer_config(self.schema, {"propagation": {
"tables": {
table_name: {
"_dlt_id": TColumnName("_dlt_root_id")
}
}}})

def normalize_data_item(self, item: TDataItem, load_id: str, table_name: str) -> TNormalizedRowIterator:
# wrap items that are not dictionaries in dictionary, otherwise they cannot be processed by the JSON normalizer
Expand Down
8 changes: 6 additions & 2 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ def update_table(self, partial_table: TPartialTableSchema) -> TPartialTableSchem
else:
# merge tables performing additional checks
partial_table = utils.merge_tables(table, partial_table)

self.data_item_normalizer.extend_table(table_name)
return partial_table


Expand All @@ -213,9 +215,11 @@ def update_schema(self, schema: "Schema") -> None:
# update all tables
for table in schema.tables.values():
self.update_table(table)
# update other settings
# update normalizer config nondestructively
self.data_item_normalizer.update_normalizer_config(self, self.data_item_normalizer.get_normalizer_config(schema))
self.merge_hints(schema.settings.get("default_hints", {}))
# update and compile settings
self._settings = deepcopy(schema.settings)
self._compile_settings()


def bump_version(self) -> Tuple[int, str]:
Expand Down
18 changes: 9 additions & 9 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,26 +856,26 @@ def _extract_source(self, storage: ExtractorStorage, source: DltSource, max_para
source_schema = source.schema
source_schema.update_normalizers()

# extract into pipeline schema
extract_id = extract_with_schema(storage, source, source_schema, self.collector, max_parallel_items, workers)

# save import with fully discovered schema
self._schema_storage.save_import_schema_if_not_exists(source_schema)

# if source schema does not exist in the pipeline
if source_schema.name not in self._schema_storage:
# create a new schema with the right name
# create new schema
self._schema_storage.save_schema(Schema(source_schema.name))

# get the schema from pipeline storage and merge stuff from the new schema
# update pipeline schema (do contract checks here)
pipeline_schema = self._schema_storage[source_schema.name]
pipeline_schema.update_schema(source_schema)

# extract into pipeline schema
extract_id = extract_with_schema(storage, source, pipeline_schema, self.collector, max_parallel_items, workers)

# and set as default if this is first schema in pipeline
# set as default if this is first schema in pipeline
if not self.default_schema_name:
# this performs additional validations as schema contains the naming module
self._set_default_schema_name(pipeline_schema)

# save import with fully discovered schema
self._schema_storage.save_import_schema_if_not_exists(pipeline_schema)

return extract_id

def _get_destination_client_initial_config(self, destination: DestinationReference = None, credentials: Any = None, as_staging: bool = False) -> DestinationClientConfiguration:
Expand Down
31 changes: 30 additions & 1 deletion tests/common/normalizers/test_json_relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.schema.typing import TSimpleRegex
from dlt.common.utils import digest128, uniq_id
from dlt.common.schema import Schema
from dlt.common.schema import Schema, TTableSchema
from dlt.common.schema.utils import new_table

from dlt.common.normalizers.json.relational import RelationalNormalizerConfigPropagation, DataItemNormalizer as RelationalNormalizer, DLT_ID_LENGTH_BYTES, TDataItemRow
Expand Down Expand Up @@ -774,6 +774,35 @@ def test_normalize_empty_keys() -> None:
assert rows[1][1]["_empty"] == 2


# could also be in schema tests
def test_propagation_update_on_table_change(norm: RelationalNormalizer):

# append does not have propagated columns
table_1 = new_table("table_1", write_disposition="append")
norm.schema.update_table(table_1)
assert "config" not in norm.schema._normalizers_config["json"]

# change table to merge
table_1["write_disposition"] = "merge"
norm.schema.update_table(table_1)
assert norm.schema._normalizers_config["json"]["config"]["propagation"]["tables"][table_1["name"]] == {'_dlt_id': '_dlt_root_id'}

# add subtable
table_2 = new_table("table_2", parent_table_name="table_1")
norm.schema.update_table(table_2)
assert "table_2" not in norm.schema._normalizers_config["json"]["config"]["propagation"]["tables"]

# test merging into existing propagation
norm.schema._normalizers_config["json"]["config"]["propagation"]["tables"]["table_3"] = {'prop1': 'prop2'}
table_3 = new_table("table_3", write_disposition="merge")
norm.schema.update_table(table_3)
assert norm.schema._normalizers_config["json"]["config"]["propagation"]["tables"]["table_3"] == {
'_dlt_id': '_dlt_root_id',
'prop1': 'prop2'
}



def set_max_nesting(norm: RelationalNormalizer, max_nesting: int) -> None:
RelationalNormalizer.update_normalizer_config(norm.schema,
{
Expand Down
95 changes: 95 additions & 0 deletions tests/load/pipeline/test_write_disposition_changes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import pytest
import dlt
from typing import Any
from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration, assert_data_table_counts
from tests.pipeline.utils import assert_load_info
from dlt.pipeline.exceptions import PipelineStepFailed

def data_with_subtables(offset: int) -> Any:
for _, index in enumerate(range(offset, offset+100), 1):
yield {
"id": index,
"name": f"item {index}",
"sub_items": [{
"id": index + 1000,
"name": f"sub item {index + 1000}"
}]
}

@pytest.mark.parametrize("destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name)
def test_switch_from_merge(destination_config: DestinationTestConfiguration):
pipeline = destination_config.setup_pipeline(pipeline_name='test_switch_from_merge', full_refresh=True)

info = (pipeline.run(data_with_subtables(10), table_name="items", write_disposition="merge"))
assert_data_table_counts(pipeline, {
"items": 100,
"items__sub_items": 100
})
assert pipeline.default_schema._normalizers_config["json"]["config"]["propagation"]["tables"]["items"] == {'_dlt_id': '_dlt_root_id'}

info = (pipeline.run(data_with_subtables(10), table_name="items", write_disposition="merge"))
assert_load_info(info)
assert_data_table_counts(pipeline, {
"items": 100,
"items__sub_items": 100
})
assert pipeline.default_schema._normalizers_config["json"]["config"]["propagation"]["tables"]["items"] == {'_dlt_id': '_dlt_root_id'}

info = (pipeline.run(data_with_subtables(10), table_name="items", write_disposition="append"))
assert_load_info(info)
assert_data_table_counts(pipeline, {
"items": 200,
"items__sub_items": 200
})
assert pipeline.default_schema._normalizers_config["json"]["config"]["propagation"]["tables"]["items"] == {'_dlt_id': '_dlt_root_id'}

info = (pipeline.run(data_with_subtables(10), table_name="items", write_disposition="replace"))
assert_load_info(info)
assert_data_table_counts(pipeline, {
"items": 100,
"items__sub_items": 100
})
assert pipeline.default_schema._normalizers_config["json"]["config"]["propagation"]["tables"]["items"] == {'_dlt_id': '_dlt_root_id'}


@pytest.mark.parametrize("destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name)
@pytest.mark.parametrize("with_root_key", [True, False])
def test_switch_to_merge(destination_config: DestinationTestConfiguration, with_root_key: bool):
pipeline = destination_config.setup_pipeline(pipeline_name='test_switch_to_merge', full_refresh=True)

@dlt.resource()
def resource():
yield data_with_subtables(10)

@dlt.source()
def source():
return resource()

s = source()
s.root_key = with_root_key

info = (pipeline.run(s, table_name="items", write_disposition="append"))
assert_data_table_counts(pipeline, {
"items": 100,
"items__sub_items": 100
})

if with_root_key:
assert pipeline.default_schema._normalizers_config["json"]["config"]["propagation"]["root"] == {'_dlt_id': '_dlt_root_id'}
else:
assert "propagation" not in pipeline.default_schema._normalizers_config["json"]["config"]

# without a root key this will fail, it is expected
if not with_root_key:
with pytest.raises(PipelineStepFailed):
pipeline.run(s, table_name="items", write_disposition="merge")
return

info = (pipeline.run(s, table_name="items", write_disposition="merge"))
assert_load_info(info)
assert_data_table_counts(pipeline, {
"items": 100,
"items__sub_items": 100
})
assert pipeline.default_schema._normalizers_config["json"]["config"]["propagation"]["tables"]["items"] == {'_dlt_id': '_dlt_root_id'}

9 changes: 9 additions & 0 deletions tests/load/pipeline/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,15 @@ def load_table_counts(p: dlt.Pipeline, *table_names: str) -> DictStrAny:
result[table_name] = len(items)
return result

def load_data_table_counts(p: dlt.Pipeline) -> DictStrAny:
tables = [table["name"] for table in p.default_schema.data_tables()]
return load_table_counts(p, *tables)


def assert_data_table_counts(p: dlt.Pipeline, expected_counts: DictStrAny) -> None:
table_counts = load_data_table_counts(p)
assert table_counts == expected_counts, f"Table counts do not match, expected {expected_counts}, got {table_counts}"


def load_tables_to_dicts(p: dlt.Pipeline, *table_names: str) -> Dict[str, List[Dict[str, Any]]]:

Expand Down
84 changes: 84 additions & 0 deletions tests/pipeline/test_schema_updates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@

import dlt


def test_schema_updates() -> None:
p = dlt.pipeline(pipeline_name="test_schema_updates", full_refresh=True, destination="dummy")

@dlt.source()
def source():
@dlt.resource()
def resource():
yield [1,2,3]
return resource

# test without normalizer attributes
s = source()
p.run(s, table_name="items", write_disposition="append")
assert p.default_schema._normalizers_config["json"]["config"] == {}

# add table propagation
s = source()
p.run(s, table_name="items", write_disposition="merge")
assert p.default_schema._normalizers_config["json"]["config"] == {
"propagation": {
"tables": {
"items": {'_dlt_id': '_dlt_root_id'}
}
}
}

# set root key
s = source()
s.root_key = True
p.run(s, table_name="items", write_disposition="merge")
assert p.default_schema._normalizers_config["json"]["config"] == {
"propagation": {
"tables": {
"items": {'_dlt_id': '_dlt_root_id'}
},
"root": {'_dlt_id': '_dlt_root_id'}
}
}

# root key prevails even if not set
s = source()
s.root_key = False
p.run(s, table_name="items", write_disposition="merge")
assert p.default_schema._normalizers_config["json"]["config"] == {
"propagation": {
"tables": {
"items": {'_dlt_id': '_dlt_root_id'}
},
"root": {'_dlt_id': '_dlt_root_id'}
}
}

# set max nesting
s = source()
s.max_table_nesting = 5
p.run(s, table_name="items", write_disposition="merge")
assert p.default_schema._normalizers_config["json"]["config"] == {
"propagation": {
"tables": {
"items": {'_dlt_id': '_dlt_root_id'}
},
"root": {'_dlt_id': '_dlt_root_id'}
},
"max_nesting": 5
}

# update max nesting and new table
s = source()
s.max_table_nesting = 50
p.run(s, table_name="items2", write_disposition="merge")
assert p.default_schema._normalizers_config["json"]["config"] == {
"propagation": {
"tables": {
"items": {'_dlt_id': '_dlt_root_id'},
"items2": {'_dlt_id': '_dlt_root_id'},
},
"root": {'_dlt_id': '_dlt_root_id'}
},
"max_nesting": 50
}

0 comments on commit 8c08f00

Please sign in to comment.