Skip to content

Commit

Permalink
allows data type diff and ensures valid migration separately
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Dec 15, 2024
1 parent b8bac75 commit d81a35b
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 253 deletions.
70 changes: 48 additions & 22 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,35 +457,15 @@ def diff_table(
* when columns with the same name have different data types
* when table links to different parent tables
"""
if tab_a["name"] != tab_b["name"]:
raise TablePropertiesConflictException(
schema_name, tab_a["name"], "name", tab_a["name"], tab_b["name"]
)
table_name = tab_a["name"]
# check if table properties can be merged
if tab_a.get("parent") != tab_b.get("parent"):
raise TablePropertiesConflictException(
schema_name, table_name, "parent", tab_a.get("parent"), tab_b.get("parent")
)
# allow for columns to differ
ensure_compatible_tables(schema_name, tab_a, tab_b, ensure_columns=False)

# get new columns, changes in the column data type or other properties are not allowed
tab_a_columns = tab_a["columns"]
new_columns: List[TColumnSchema] = []
for col_b_name, col_b in tab_b["columns"].items():
if col_b_name in tab_a_columns:
col_a = tab_a_columns[col_b_name]
# we do not support changing data types of columns
if is_complete_column(col_a) and is_complete_column(col_b):
if not compare_complete_columns(tab_a_columns[col_b_name], col_b):
# attempt to update to incompatible columns
raise CannotCoerceColumnException(
schema_name,
table_name,
col_b_name,
col_b["data_type"],
tab_a_columns[col_b_name]["data_type"],
None,
)
# all other properties can change
merged_column = merge_column(copy(col_a), col_b)
if merged_column != col_a:
Expand All @@ -494,6 +474,8 @@ def diff_table(
new_columns.append(col_b)

# return partial table containing only name and properties that differ (column, filters etc.)
table_name = tab_a["name"]

partial_table: TPartialTableSchema = {
"name": table_name,
"columns": {} if new_columns is None else {c["name"]: c for c in new_columns},
Expand All @@ -519,6 +501,50 @@ def diff_table(
return partial_table


def ensure_compatible_tables(
schema_name: str, tab_a: TTableSchema, tab_b: TPartialTableSchema, ensure_columns: bool = True
) -> None:
"""Ensures that `tab_a` and `tab_b` can be merged without conflicts. Conflicts are detected when
- tables have different names
- nested tables have different parents
- tables have any column with incompatible types
Note: all the identifiers must be already normalized
"""
if tab_a["name"] != tab_b["name"]:
raise TablePropertiesConflictException(
schema_name, tab_a["name"], "name", tab_a["name"], tab_b["name"]
)
table_name = tab_a["name"]
# check if table properties can be merged
if tab_a.get("parent") != tab_b.get("parent"):
raise TablePropertiesConflictException(
schema_name, table_name, "parent", tab_a.get("parent"), tab_b.get("parent")
)

if not ensure_columns:
return

tab_a_columns = tab_a["columns"]
for col_b_name, col_b in tab_b["columns"].items():
if col_b_name in tab_a_columns:
col_a = tab_a_columns[col_b_name]
# we do not support changing data types of columns
if is_complete_column(col_a) and is_complete_column(col_b):
if not compare_complete_columns(tab_a_columns[col_b_name], col_b):
# attempt to update to incompatible columns
raise CannotCoerceColumnException(
schema_name,
table_name,
col_b_name,
col_b["data_type"],
tab_a_columns[col_b_name]["data_type"],
None,
)


# def compare_tables(tab_a: TTableSchema, tab_b: TTableSchema) -> bool:
# try:
# table_name = tab_a["name"]
Expand Down
40 changes: 9 additions & 31 deletions dlt/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
LoadStorage,
ParsedLoadJobFileName,
)
from dlt.common.schema import TSchemaUpdate, Schema
from dlt.common.schema import Schema
from dlt.common.schema.exceptions import CannotCoerceColumnException
from dlt.common.pipeline import (
NormalizeInfo,
Expand All @@ -34,7 +34,7 @@
from dlt.normalize.configuration import NormalizeConfiguration
from dlt.normalize.exceptions import NormalizeJobFailed
from dlt.normalize.worker import w_normalize_files, group_worker_files, TWorkerRV
from dlt.normalize.validate import verify_normalized_table
from dlt.normalize.validate import validate_and_update_schema, verify_normalized_table


# normalize worker wrapping function signature
Expand Down Expand Up @@ -80,16 +80,6 @@ def create_storages(self) -> None:
config=self.config._load_storage_config,
)

def update_schema(self, schema: Schema, schema_updates: List[TSchemaUpdate]) -> None:
for schema_update in schema_updates:
for table_name, table_updates in schema_update.items():
logger.info(
f"Updating schema for table {table_name} with {len(table_updates)} deltas"
)
for partial_table in table_updates:
# merge columns where we expect identifiers to be normalized
schema.update_table(partial_table, normalize_identifiers=False)

def map_parallel(self, schema: Schema, load_id: str, files: Sequence[str]) -> TWorkerRV:
workers: int = getattr(self.pool, "_max_workers", 1)
chunk_files = group_worker_files(files, workers)
Expand Down Expand Up @@ -123,7 +113,7 @@ def map_parallel(self, schema: Schema, load_id: str, files: Sequence[str]) -> TW
result: TWorkerRV = pending.result()
try:
# gather schema from all manifests, validate consistency and combine
self.update_schema(schema, result[0])
validate_and_update_schema(schema, result[0])
summary.schema_updates.extend(result.schema_updates)
summary.file_metrics.extend(result.file_metrics)
# update metrics
Expand Down Expand Up @@ -162,7 +152,7 @@ def map_single(self, schema: Schema, load_id: str, files: Sequence[str]) -> TWor
load_id,
files,
)
self.update_schema(schema, result.schema_updates)
validate_and_update_schema(schema, result.schema_updates)
self.collector.update("Files", len(result.file_metrics))
self.collector.update(
"Items", sum(result.file_metrics, EMPTY_DATA_WRITER_METRICS).items_count
Expand Down Expand Up @@ -237,23 +227,11 @@ def spool_schema_files(self, load_id: str, schema: Schema, files: Sequence[str])
self.load_storage.import_extracted_package(
load_id, self.normalize_storage.extracted_packages
)
logger.info(f"Created new load package {load_id} on loading volume")
try:
# process parallel
self.spool_files(
load_id, schema.clone(update_normalizers=True), self.map_parallel, files
)
except CannotCoerceColumnException as exc:
# schema conflicts resulting from parallel executing
logger.warning(
f"Parallel schema update conflict, switching to single thread ({str(exc)}"
)
# start from scratch
self.load_storage.new_packages.delete_package(load_id)
self.load_storage.import_extracted_package(
load_id, self.normalize_storage.extracted_packages
)
self.spool_files(load_id, schema.clone(update_normalizers=True), self.map_single, files)
logger.info(f"Created new load package {load_id} on loading volume with ")
# get number of workers with default == 1 if not set (ie. NullExecutor)
workers: int = getattr(self.pool, "_max_workers", 1)
map_f: TMapFuncType = self.map_parallel if workers > 1 else self.map_single
self.spool_files(load_id, schema.clone(update_normalizers=True), map_f, files)

return load_id

Expand Down
20 changes: 19 additions & 1 deletion dlt/normalize/validate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import List

from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.schema import Schema
from dlt.common.schema.typing import TTableSchema
from dlt.common.schema.typing import TTableSchema, TSchemaUpdate
from dlt.common.schema.utils import (
ensure_compatible_tables,
find_incomplete_columns,
get_first_column_name_with_prop,
is_nested_table,
Expand All @@ -10,6 +13,21 @@
from dlt.common import logger


def validate_and_update_schema(schema: Schema, schema_updates: List[TSchemaUpdate]) -> None:
"""Updates `schema` tables with partial tables in `schema_updates`"""
for schema_update in schema_updates:
for table_name, table_updates in schema_update.items():
logger.info(f"Updating schema for table {table_name} with {len(table_updates)} deltas")
for partial_table in table_updates:
# ensure updates will pass
if existing_table := schema.tables.get(partial_table["name"]):
ensure_compatible_tables(schema.name, existing_table, partial_table)

for partial_table in table_updates:
# merge columns where we expect identifiers to be normalized
schema.update_table(partial_table, normalize_identifiers=False)


def verify_normalized_table(
schema: Schema, table: TTableSchema, capabilities: DestinationCapabilitiesContext
) -> None:
Expand Down
21 changes: 0 additions & 21 deletions tests/common/schema/test_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,27 +441,6 @@ def test_update_schema_table_prop_conflict(schema: Schema) -> None:
assert exc_val.value.val2 == "tab_parent"


def test_update_schema_column_conflict(schema: Schema) -> None:
tab1 = utils.new_table(
"tab1",
write_disposition="append",
columns=[
{"name": "col1", "data_type": "text", "nullable": False},
],
)
schema.update_table(tab1)
tab1_u1 = deepcopy(tab1)
# simulate column that had other datatype inferred
tab1_u1["columns"]["col1"]["data_type"] = "bool"
with pytest.raises(CannotCoerceColumnException) as exc_val:
schema.update_table(tab1_u1)
assert exc_val.value.column_name == "col1"
assert exc_val.value.from_type == "bool"
assert exc_val.value.to_type == "text"
# whole column mismatch
assert exc_val.value.coerced_value is None


def _add_preferred_types(schema: Schema) -> None:
schema._settings["preferred_types"] = {}
schema._settings["preferred_types"][TSimpleRegex("timestamp")] = "timestamp"
Expand Down
11 changes: 9 additions & 2 deletions tests/common/schema/test_merges.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def test_diff_tables() -> None:
assert "test" in partial["columns"]


def test_diff_tables_conflicts() -> None:
def test_tables_conflicts() -> None:
# conflict on parents
table: TTableSchema = { # type: ignore[typeddict-unknown-key]
"name": "table",
Expand All @@ -366,21 +366,28 @@ def test_diff_tables_conflicts() -> None:
other = utils.new_table("table")
with pytest.raises(TablePropertiesConflictException) as cf_ex:
utils.diff_table("schema", table, other)
with pytest.raises(TablePropertiesConflictException) as cf_ex:
utils.ensure_compatible_tables("schema", table, other)
assert cf_ex.value.table_name == "table"
assert cf_ex.value.prop_name == "parent"

# conflict on name
other = utils.new_table("other_name")
with pytest.raises(TablePropertiesConflictException) as cf_ex:
utils.diff_table("schema", table, other)
with pytest.raises(TablePropertiesConflictException) as cf_ex:
utils.ensure_compatible_tables("schema", table, other)
assert cf_ex.value.table_name == "table"
assert cf_ex.value.prop_name == "name"

# conflict on data types in columns
changed = deepcopy(table)
changed["columns"]["test"]["data_type"] = "bigint"
with pytest.raises(CannotCoerceColumnException):
utils.diff_table("schema", table, changed)
utils.ensure_compatible_tables("schema", table, changed)
# but diff now accepts different data types
merged_table = utils.diff_table("schema", table, changed)
assert merged_table["columns"]["test"]["data_type"] == "bigint"


def test_merge_tables() -> None:
Expand Down
1 change: 0 additions & 1 deletion tests/load/pipeline/test_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from dlt.common.pipeline import SupportsPipeline
from dlt.common.destination import Destination
from dlt.common.destination.reference import WithStagingDataset
from dlt.common.schema.exceptions import CannotCoerceColumnException
from dlt.common.schema.schema import Schema
from dlt.common.schema.typing import VERSION_TABLE_NAME
from dlt.common.schema.utils import new_table
Expand Down
Loading

0 comments on commit d81a35b

Please sign in to comment.