Skip to content

Commit

Permalink
update schema handling
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Oct 11, 2023
1 parent aae4313 commit f50565e
Show file tree
Hide file tree
Showing 17 changed files with 119 additions and 100 deletions.
17 changes: 14 additions & 3 deletions dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,6 @@ def _add_row_id(self, table: str, row: TDataItemRow, parent_row_id: str, pos: in

def _get_propagated_values(self, table: str, row: TDataItemRow, _r_lvl: int) -> StrAny:
extend: DictStrAny = {}
# propagate root id for merge tables
if _r_lvl == 0 and self.schema.tables.get(table, {}).get("write_disposition") == "merge":
extend["_dlt_root_id"] = row["_dlt_id"]

config = self.propagation_config
if config:
Expand Down Expand Up @@ -261,6 +258,20 @@ def extend_schema(self) -> None:
}
)

# for every table with the write disposition merge, we propagate the root_key
for table_name, table in self.schema.tables.items():
if not table.get("parent") and table["write_disposition"] == "merge":
prop_config: RelationalNormalizerConfigPropagation = {
"root": {
},
"tables": {
table_name: {
"_dlt_id": TColumnName("_dlt_root_id")
}
}
}
DataItemNormalizer.update_normalizer_config(self.schema, {"propagation": prop_config})

def normalize_data_item(self, item: TDataItem, load_id: str, table_name: str) -> TNormalizedRowIterator:
# wrap items that are not dictionaries in dictionary, otherwise they cannot be processed by the JSON normalizer
if not isinstance(item, dict):
Expand Down
11 changes: 10 additions & 1 deletion dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ def coerce_row(self, table_name: str, parent_table: str, row: StrAny) -> Tuple[D

return new_row, updated_table_partial

def update_schema(self, partial_table: TPartialTableSchema) -> TPartialTableSchema:
def update_table(self, partial_table: TPartialTableSchema) -> TPartialTableSchema:
"""Update table in this schema"""
table_name = partial_table["name"]
parent_table_name = partial_table.get("parent")
# check if parent table present
Expand All @@ -206,6 +207,14 @@ def update_schema(self, partial_table: TPartialTableSchema) -> TPartialTableSche
partial_table = utils.merge_tables(table, partial_table)
return partial_table


def update_schema(self, schema: "Schema") -> None:
for table in schema.tables.values():
self.update_table(table)

self.data_item_normalizer.update_normalizer_config(self, self.data_item_normalizer.get_normalizer_config(schema))


def bump_version(self) -> Tuple[int, str]:
"""Computes schema hash in order to check if schema content was modified. In such case the schema ``stored_version`` and ``stored_version_hash`` are updated.
Expand Down
2 changes: 1 addition & 1 deletion dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def extract_with_schema(
# iterate over all items in the pipeline and update the schema if dynamic table hints were present
for _, partials in extractor.items():
for partial in partials:
schema.update_schema(schema.normalize_table_identifiers(partial))
schema.update_table(schema.normalize_table_identifiers(partial))

return extract_id

2 changes: 1 addition & 1 deletion dlt/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ def discover_schema(self, item: TDataItem = None) -> Schema:
partial_table = self._schema.normalize_table_identifiers(
r.compute_table_schema(item)
)
schema.update_schema(partial_table)
schema.update_table(partial_table)
return schema

def with_resources(self, *resource_names: str) -> "DltSource":
Expand Down
10 changes: 5 additions & 5 deletions dlt/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _w_normalize_chunk(load_storage: LoadStorage, schema: Schema, load_id: str,
# theres a new table or new columns in existing table
if partial_table:
# update schema and save the change
schema.update_schema(partial_table)
schema.update_table(partial_table)
table_updates = schema_update.setdefault(table_name, [])
table_updates.append(partial_table)
# update our columns
Expand All @@ -167,13 +167,13 @@ def _w_normalize_chunk(load_storage: LoadStorage, schema: Schema, load_id: str,
signals.raise_if_signalled()
return schema_update, items_count, row_counts

def update_schema(self, schema: Schema, schema_updates: List[TSchemaUpdate]) -> None:
def update_table(self, schema: Schema, schema_updates: List[TSchemaUpdate]) -> None:
for schema_update in schema_updates:
for table_name, table_updates in schema_update.items():
logger.info(f"Updating schema for table {table_name} with {len(table_updates)} deltas")
for partial_table in table_updates:
# merge columns
schema.update_schema(partial_table)
schema.update_table(partial_table)

@staticmethod
def group_worker_files(files: Sequence[str], no_groups: int) -> List[Sequence[str]]:
Expand Down Expand Up @@ -219,7 +219,7 @@ def map_parallel(self, schema: Schema, load_id: str, files: Sequence[str]) -> TM
result: TWorkerRV = pending.get()
try:
# gather schema from all manifests, validate consistency and combine
self.update_schema(schema, result[0])
self.update_table(schema, result[0])
schema_updates.extend(result[0])
# update metrics
self.collector.update("Files", len(result[2]))
Expand Down Expand Up @@ -256,7 +256,7 @@ def map_single(self, schema: Schema, load_id: str, files: Sequence[str]) -> TMap
load_id,
files,
)
self.update_schema(schema, result[0])
self.update_table(schema, result[0])
self.collector.update("Files", len(result[2]))
self.collector.update("Items", result[1])
return result[0], result[3]
Expand Down
28 changes: 12 additions & 16 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,29 +856,25 @@ def _extract_source(self, storage: ExtractorStorage, source: DltSource, max_para
source_schema = source.schema
source_schema.update_normalizers()

extract_id = extract_with_schema(storage, source, source_schema, self.collector, max_parallel_items, workers)

# if source schema does not exist in the pipeline
if source_schema.name not in self._schema_storage:
# save schema into the pipeline
self._schema_storage.save_schema(source_schema)
# create a new schema with the right name
self._schema_storage.save_schema(Schema(source_schema.name))

# get the schema from pipeline storage and merge stuff from the new schema
pipeline_schema = self._schema_storage[source_schema.name]
pipeline_schema.update_schema(source_schema)

# extract into pipeline schema
extract_id = extract_with_schema(storage, source, pipeline_schema, self.collector, max_parallel_items, workers)

# and set as default if this is first schema in pipeline
if not self.default_schema_name:
# this performs additional validations as schema contains the naming module
self._set_default_schema_name(source_schema)
self._set_default_schema_name(pipeline_schema)

pipeline_schema = self._schema_storage[source_schema.name]

# initialize import with fully discovered schema
self._schema_storage.save_import_schema_if_not_exists(source_schema)

# get the current schema and merge tables from source_schema
# note we are not merging props like max nesting or column propagation
for table in source_schema.data_tables(include_incomplete=True):
pipeline_schema.update_schema(
pipeline_schema.normalize_table_identifiers(table)
)
# save import with fully discovered schema
self._schema_storage.save_import_schema_if_not_exists(pipeline_schema)

return extract_id

Expand Down
8 changes: 4 additions & 4 deletions tests/common/normalizers/test_json_relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def test_flatten_fix_field_name(norm: RelationalNormalizer) -> None:

def test_preserve_complex_value(norm: RelationalNormalizer) -> None:
# add table with complex column
norm.schema.update_schema(
norm.schema.update_table(
new_table("with_complex",
columns = [{
"name": "value",
Expand Down Expand Up @@ -372,10 +372,10 @@ def test_list_in_list() -> None:

# test the same setting webpath__list to complex
zen_table = new_table("zen")
schema.update_schema(zen_table)
schema.update_table(zen_table)

path_table = new_table("zen__webpath", parent_table_name="zen", columns=[{"name": "list", "data_type": "complex"}])
schema.update_schema(path_table)
schema.update_table(path_table)
rows = list(schema.normalize_data_item(chats, "1762162.1212", "zen"))
# both lists are complex types now
assert len(rows) == 3
Expand Down Expand Up @@ -544,7 +544,7 @@ def test_preserves_complex_types_list(norm: RelationalNormalizer) -> None:
# the exception to test_removes_normalized_list
# complex types should be left as they are
# add table with complex column
norm.schema.update_schema(new_table("event_slot",
norm.schema.update_table(new_table("event_slot",
columns = [{
"name": "value",
"data_type": "complex",
Expand Down
8 changes: 4 additions & 4 deletions tests/common/schema/test_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ def test_filter_parent_table_schema_update(schema: Schema) -> None:
# try to apply updates
assert len(updates) == 2
# event bot table
schema.update_schema(updates[0])
schema.update_table(updates[0])
# event_bot__metadata__elvl1__elvl2
with pytest.raises(ParentTableNotFoundException) as e:
schema.update_schema(updates[1])
schema.update_table(updates[1])
assert e.value.table_name == "event_bot__metadata__elvl1__elvl2"
assert e.value.parent_table_name == "event_bot__metadata__elvl1"

Expand All @@ -107,7 +107,7 @@ def test_filter_parent_table_schema_update(schema: Schema) -> None:
assert set(row.keys()).issuperset(["_dlt_id", "_dlt_parent_id", "_dlt_list_idx"])
row, partial_table = schema.coerce_row(t, p, row)
updates.append(partial_table)
schema.update_schema(partial_table)
schema.update_table(partial_table)

assert len(updates) == 4
# we must have leaf table
Expand All @@ -120,5 +120,5 @@ def _add_excludes(schema: Schema) -> None:
bot_table["filters"]["includes"] = [
TSimpleRegex("re:^data__custom$"), TSimpleRegex("re:^custom_data__included_object__"), TSimpleRegex("re:^metadata__elvl1__elvl2__")
]
schema.update_schema(bot_table)
schema.update_table(bot_table)
schema._compile_settings()
Loading

0 comments on commit f50565e

Please sign in to comment.