Skip to content

Commit

Permalink
Introduce hard_delete and dedup_sort columns hint for merge (#960)
Browse files Browse the repository at this point in the history
* black formatting

* remove unused exception

* add initial support for replicate write disposition

* add hard_delete hint and sorted deduplication for merge

* undo config change

* undo unintentional changes

* refactor hard_delete handling and introduce dedup_sort hint

* update docstring

* replace dialect-specific SQL

* add parentheses to ensure proper clause evaluation order

* add escape defaults and temp tables for non-primary key case

* exclude destinations that don't support merge from test

* correct typo

* extend docstring

* remove redundant copies for immutable strings

* simplify boolean logic

* add more test cases for hard_delete and dedup_sort hints

* refactor table chain resolution

* marks tables that seen data in normalizer, skips empty jobs if never seen data

* ignores tables that didn't seen data when loading, tests edge cases

* add sort order configuration option

* bumps schema engine to v9, adds migrations

* filters tables without data properly in load

* converts seen-data to boolean, fixes tests

* disables filesystem tests config due to merge present

* add docs for hard_delete and dedup_sort column hints

* fixes extending table chains in load

* refactors load and adds unit tests with dummy

---------

Co-authored-by: Jorrit Sandbrink <[email protected]>
Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
3 people authored Feb 24, 2024
1 parent b2e07be commit 88f2722
Show file tree
Hide file tree
Showing 45 changed files with 2,345 additions and 546 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ test-load-local:
DESTINATION__POSTGRES__CREDENTIALS=postgresql://loader:loader@localhost:5432/dlt_data DESTINATION__DUCKDB__CREDENTIALS=duckdb:///_storage/test_quack.duckdb poetry run pytest tests -k '(postgres or duckdb)'

test-common:
poetry run pytest tests/common tests/normalize tests/extract tests/pipeline tests/reflection tests/sources tests/cli/common
poetry run pytest tests/common tests/normalize tests/extract tests/pipeline tests/reflection tests/sources tests/cli/common tests/load/test_dummy_client.py tests/libs tests/destinations

reset-test-storage:
-rm -r _storage
Expand Down
67 changes: 57 additions & 10 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
Generic,
Final,
)
from contextlib import contextmanager
import datetime # noqa: 251
from copy import deepcopy
import inspect
Expand All @@ -32,18 +31,22 @@
UnknownDestinationModule,
)
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.schema.exceptions import InvalidDatasetName
from dlt.common.schema.utils import get_write_disposition, get_table_format
from dlt.common.configuration import configspec, with_config, resolve_configuration, known_sections
from dlt.common.schema.exceptions import SchemaException
from dlt.common.schema.utils import (
get_write_disposition,
get_table_format,
get_columns_names_with_prop,
has_column_with_prop,
get_first_column_name_with_prop,
)
from dlt.common.configuration import configspec, resolve_configuration, known_sections
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.configuration.accessors import config
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.schema.utils import is_complete_column
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.utils import get_module_name
from dlt.common.configuration.specs import GcpCredentials, AwsCredentialsWithoutDefaults


Expand Down Expand Up @@ -252,7 +255,8 @@ def new_file_path(self) -> str:
class FollowupJob:
"""Adds a trait that allows to create a followup job"""

def create_followup_jobs(self, next_state: str) -> List[NewLoadJob]:
def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
"""Return list of new jobs. `final_state` is state to which this job transits"""
return []


Expand Down Expand Up @@ -345,6 +349,49 @@ def _verify_schema(self) -> None:
table_name,
self.capabilities.max_identifier_length,
)
if has_column_with_prop(table, "hard_delete"):
if len(get_columns_names_with_prop(table, "hard_delete")) > 1:
raise SchemaException(
f'Found multiple "hard_delete" column hints for table "{table_name}" in'
f' schema "{self.schema.name}" while only one is allowed:'
f' {", ".join(get_columns_names_with_prop(table, "hard_delete"))}.'
)
if table.get("write_disposition") in ("replace", "append"):
logger.warning(
f"""The "hard_delete" column hint for column "{get_first_column_name_with_prop(table, 'hard_delete')}" """
f'in table "{table_name}" with write disposition'
f' "{table.get("write_disposition")}"'
f' in schema "{self.schema.name}" will be ignored.'
' The "hard_delete" column hint is only applied when using'
' the "merge" write disposition.'
)
if has_column_with_prop(table, "dedup_sort"):
if len(get_columns_names_with_prop(table, "dedup_sort")) > 1:
raise SchemaException(
f'Found multiple "dedup_sort" column hints for table "{table_name}" in'
f' schema "{self.schema.name}" while only one is allowed:'
f' {", ".join(get_columns_names_with_prop(table, "dedup_sort"))}.'
)
if table.get("write_disposition") in ("replace", "append"):
logger.warning(
f"""The "dedup_sort" column hint for column "{get_first_column_name_with_prop(table, 'dedup_sort')}" """
f'in table "{table_name}" with write disposition'
f' "{table.get("write_disposition")}"'
f' in schema "{self.schema.name}" will be ignored.'
' The "dedup_sort" column hint is only applied when using'
' the "merge" write disposition.'
)
if table.get("write_disposition") == "merge" and not has_column_with_prop(
table, "primary_key"
):
logger.warning(
f"""The "dedup_sort" column hint for column "{get_first_column_name_with_prop(table, 'dedup_sort')}" """
f'in table "{table_name}" with write disposition'
f' "{table.get("write_disposition")}"'
f' in schema "{self.schema.name}" will be ignored.'
' The "dedup_sort" column hint is only applied when a'
" primary key has been specified."
)
for column_name, column in dict(table["columns"]).items():
if len(column_name) > self.capabilities.max_column_identifier_length:
raise IdentifierTooLongException(
Expand All @@ -361,9 +408,9 @@ def _verify_schema(self) -> None:
" column manually in code ie. as a merge key?"
)

def get_load_table(self, table_name: str, prepare_for_staging: bool = False) -> TTableSchema:
if table_name not in self.schema.tables:
return None
def prepare_load_table(
self, table_name: str, prepare_for_staging: bool = False
) -> TTableSchema:
try:
# make a copy of the schema so modifications do not affect the original document
table = deepcopy(self.schema.tables[table_name])
Expand Down
128 changes: 128 additions & 0 deletions dlt/common/schema/migrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from typing import Dict, List, cast

from dlt.common.data_types import TDataType
from dlt.common.normalizers import explicit_normalizers
from dlt.common.typing import DictStrAny
from dlt.common.schema.typing import (
LOADS_TABLE_NAME,
VERSION_TABLE_NAME,
TSimpleRegex,
TStoredSchema,
TTableSchemaColumns,
TColumnHint,
)
from dlt.common.schema.exceptions import SchemaEngineNoUpgradePathException

from dlt.common.normalizers.utils import import_normalizers
from dlt.common.schema.utils import new_table, version_table, load_table


def migrate_schema(schema_dict: DictStrAny, from_engine: int, to_engine: int) -> TStoredSchema:
if from_engine == to_engine:
return cast(TStoredSchema, schema_dict)

if from_engine == 1 and to_engine > 1:
schema_dict["includes"] = []
schema_dict["excludes"] = []
from_engine = 2
if from_engine == 2 and to_engine > 2:
# current version of the schema
current = cast(TStoredSchema, schema_dict)
# add default normalizers and root hash propagation
current["normalizers"], _, _ = import_normalizers(explicit_normalizers())
current["normalizers"]["json"]["config"] = {
"propagation": {"root": {"_dlt_id": "_dlt_root_id"}}
}
# move settings, convert strings to simple regexes
d_h: Dict[TColumnHint, List[TSimpleRegex]] = schema_dict.pop("hints", {})
for h_k, h_l in d_h.items():
d_h[h_k] = list(map(lambda r: TSimpleRegex("re:" + r), h_l))
p_t: Dict[TSimpleRegex, TDataType] = schema_dict.pop("preferred_types", {})
p_t = {TSimpleRegex("re:" + k): v for k, v in p_t.items()}

current["settings"] = {
"default_hints": d_h,
"preferred_types": p_t,
}
# repackage tables
old_tables: Dict[str, TTableSchemaColumns] = schema_dict.pop("tables")
current["tables"] = {}
for name, columns in old_tables.items():
# find last path separator
parent = name
# go back in a loop to find existing parent
while True:
idx = parent.rfind("__")
if idx > 0:
parent = parent[:idx]
if parent not in old_tables:
continue
else:
parent = None
break
nt = new_table(name, parent)
nt["columns"] = columns
current["tables"][name] = nt
# assign exclude and include to tables

def migrate_filters(group: str, filters: List[str]) -> None:
# existing filter were always defined at the root table. find this table and move filters
for f in filters:
# skip initial ^
root = f[1 : f.find("__")]
path = f[f.find("__") + 2 :]
t = current["tables"].get(root)
if t is None:
# must add new table to hold filters
t = new_table(root)
current["tables"][root] = t
t.setdefault("filters", {}).setdefault(group, []).append("re:^" + path) # type: ignore

excludes = schema_dict.pop("excludes", [])
migrate_filters("excludes", excludes)
includes = schema_dict.pop("includes", [])
migrate_filters("includes", includes)

# upgraded
from_engine = 3
if from_engine == 3 and to_engine > 3:
# set empty version hash to pass validation, in engine 4 this hash is mandatory
schema_dict.setdefault("version_hash", "")
from_engine = 4
if from_engine == 4 and to_engine > 4:
# replace schema versions table
schema_dict["tables"][VERSION_TABLE_NAME] = version_table()
schema_dict["tables"][LOADS_TABLE_NAME] = load_table()
from_engine = 5
if from_engine == 5 and to_engine > 5:
# replace loads table
schema_dict["tables"][LOADS_TABLE_NAME] = load_table()
from_engine = 6
if from_engine == 6 and to_engine > 6:
# migrate from sealed properties to schema evolution settings
schema_dict["settings"].pop("schema_sealed", None)
schema_dict["settings"]["schema_contract"] = {}
for table in schema_dict["tables"].values():
table.pop("table_sealed", None)
if not table.get("parent"):
table["schema_contract"] = {}
from_engine = 7
if from_engine == 7 and to_engine > 7:
schema_dict["previous_hashes"] = []
from_engine = 8
if from_engine == 8 and to_engine > 8:
# add "seen-data" to all tables with _dlt_id, this will handle packages
# that are being loaded
for table in schema_dict["tables"].values():
if "_dlt_id" in table["columns"]:
x_normalizer = table.setdefault("x-normalizer", {})
x_normalizer["seen-data"] = True
from_engine = 9

schema_dict["engine_version"] = from_engine
if from_engine != to_engine:
raise SchemaEngineNoUpgradePathException(
schema_dict["name"], schema_dict["engine_version"], from_engine, to_engine
)

return cast(TStoredSchema, schema_dict)
6 changes: 4 additions & 2 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from copy import copy, deepcopy
from typing import ClassVar, Dict, List, Mapping, Optional, Sequence, Tuple, Any, cast, Literal
from dlt.common import json
from dlt.common.schema.migrations import migrate_schema

from dlt.common.utils import extend_list_deduplicated
from dlt.common.typing import (
Expand Down Expand Up @@ -103,7 +104,7 @@ def __init__(self, name: str, normalizers: TNormalizersConfig = None) -> None:
@classmethod
def from_dict(cls, d: DictStrAny, bump_version: bool = True) -> "Schema":
# upgrade engine if needed
stored_schema = utils.migrate_schema(d, d["engine_version"], cls.ENGINE_VERSION)
stored_schema = migrate_schema(d, d["engine_version"], cls.ENGINE_VERSION)
# verify schema
utils.validate_stored_schema(stored_schema)
# add defaults
Expand Down Expand Up @@ -390,6 +391,7 @@ def resolve_contract_settings_for_table(
return Schema.expand_schema_contract_settings(settings)

def update_table(self, partial_table: TPartialTableSchema) -> TPartialTableSchema:
"""Adds or merges `partial_table` into the schema. Identifiers are not normalized"""
table_name = partial_table["name"]
parent_table_name = partial_table.get("parent")
# check if parent table present
Expand All @@ -414,7 +416,7 @@ def update_table(self, partial_table: TPartialTableSchema) -> TPartialTableSchem
return partial_table

def update_schema(self, schema: "Schema") -> None:
"""Updates this schema from an incoming schema"""
"""Updates this schema from an incoming schema. Normalizes identifiers after updating normalizers."""
# update all tables
for table in schema.tables.values():
self.update_table(table)
Expand Down
7 changes: 6 additions & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


# current version of schema engine
SCHEMA_ENGINE_VERSION = 8
SCHEMA_ENGINE_VERSION = 9

# dlt tables
VERSION_TABLE_NAME = "_dlt_version"
Expand All @@ -46,6 +46,7 @@
"unique",
"merge_key",
"root_key",
"dedup_sort",
]
"""Known properties and hints of the column"""
# TODO: merge TColumnHint with TColumnProp
Expand All @@ -59,6 +60,7 @@
"unique",
"root_key",
"merge_key",
"dedup_sort",
]
"""Known hints of a column used to declare hint regexes."""
TWriteDisposition = Literal["skip", "append", "replace", "merge"]
Expand All @@ -69,6 +71,7 @@
TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]]
TColumnNames = Union[str, Sequence[str]]
"""A string representing a column name or a list of"""
TSortOrder = Literal["asc", "desc"]

COLUMN_PROPS: Set[TColumnProp] = set(get_args(TColumnProp))
COLUMN_HINTS: Set[TColumnHint] = set(
Expand Down Expand Up @@ -112,6 +115,8 @@ class TColumnSchema(TColumnSchemaBase, total=False):
root_key: Optional[bool]
merge_key: Optional[bool]
variant: Optional[bool]
hard_delete: Optional[bool]
dedup_sort: Optional[TSortOrder]


TTableSchemaColumns = Dict[str, TColumnSchema]
Expand Down
Loading

0 comments on commit 88f2722

Please sign in to comment.