Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce hard_delete and dedup_sort columns hint for merge #960

Merged
merged 32 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
82c3634
black formatting
Feb 12, 2024
97c5512
remove unused exception
Feb 12, 2024
400d84b
add initial support for replicate write disposition
Feb 12, 2024
24f362e
add hard_delete hint and sorted deduplication for merge
Feb 14, 2024
f3a4878
undo config change
Feb 14, 2024
deb816f
undo unintentional changes
Feb 14, 2024
4a38d56
refactor hard_delete handling and introduce dedup_sort hint
Feb 15, 2024
0d1c977
update docstring
Feb 15, 2024
474d8bc
replace dialect-specific SQL
Feb 16, 2024
568ef26
add parentheses to ensure proper clause evaluation order
Feb 16, 2024
81ea426
add escape defaults and temp tables for non-primary key case
Feb 16, 2024
a04a238
exclude destinations that don't support merge from test
Feb 17, 2024
8ac0f9c
correct typo
Feb 20, 2024
ec115e9
extend docstring
Feb 20, 2024
a1afeb8
remove redundant copies for immutable strings
Feb 20, 2024
f07205d
simplify boolean logic
Feb 20, 2024
a64580d
add more test cases for hard_delete and dedup_sort hints
Feb 20, 2024
3308549
refactor table chain resolution
Feb 21, 2024
189c2fb
marks tables that seen data in normalizer, skips empty jobs if never …
rudolfix Feb 22, 2024
a649b0e
ignores tables that didn't seen data when loading, tests edge cases
rudolfix Feb 22, 2024
9778f0e
Merge branch 'devel' into 947-core-extensions-to-support-database-rep…
rudolfix Feb 22, 2024
4b3c59b
add sort order configuration option
Feb 22, 2024
c984c4e
bumps schema engine to v9, adds migrations
rudolfix Feb 22, 2024
935748a
filters tables without data properly in load
rudolfix Feb 22, 2024
d125556
converts seen-data to boolean, fixes tests
rudolfix Feb 22, 2024
ecaf6ef
Merge branch '947-core-extensions-to-support-database-replication' of…
rudolfix Feb 22, 2024
af0b344
disables filesystem tests config due to merge present
rudolfix Feb 22, 2024
262018b
add docs for hard_delete and dedup_sort column hints
Feb 22, 2024
0814bb0
Merge branch '947-core-extensions-to-support-database-replication' of…
Feb 22, 2024
44a9ff2
fixes extending table chains in load
rudolfix Feb 23, 2024
9384148
Merge branch '947-core-extensions-to-support-database-replication' of…
rudolfix Feb 23, 2024
9921b89
refactors load and adds unit tests with dummy
rudolfix Feb 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ test-load-local:
DESTINATION__POSTGRES__CREDENTIALS=postgresql://loader:loader@localhost:5432/dlt_data DESTINATION__DUCKDB__CREDENTIALS=duckdb:///_storage/test_quack.duckdb poetry run pytest tests -k '(postgres or duckdb)'

test-common:
poetry run pytest tests/common tests/normalize tests/extract tests/pipeline tests/reflection tests/sources tests/cli/common
poetry run pytest tests/common tests/normalize tests/extract tests/pipeline tests/reflection tests/sources tests/cli/common tests/load/test_dummy_client.py tests/libs tests/destinations

reset-test-storage:
-rm -r _storage
Expand Down
67 changes: 57 additions & 10 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
Generic,
Final,
)
from contextlib import contextmanager
import datetime # noqa: 251
from copy import deepcopy
import inspect
Expand All @@ -32,18 +31,22 @@
UnknownDestinationModule,
)
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.schema.exceptions import InvalidDatasetName
from dlt.common.schema.utils import get_write_disposition, get_table_format
from dlt.common.configuration import configspec, with_config, resolve_configuration, known_sections
from dlt.common.schema.exceptions import SchemaException
from dlt.common.schema.utils import (
get_write_disposition,
get_table_format,
get_columns_names_with_prop,
has_column_with_prop,
get_first_column_name_with_prop,
)
from dlt.common.configuration import configspec, resolve_configuration, known_sections
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.configuration.accessors import config
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.schema.utils import is_complete_column
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.utils import get_module_name
from dlt.common.configuration.specs import GcpCredentials, AwsCredentialsWithoutDefaults


Expand Down Expand Up @@ -252,7 +255,8 @@ def new_file_path(self) -> str:
class FollowupJob:
"""Adds a trait that allows to create a followup job"""

def create_followup_jobs(self, next_state: str) -> List[NewLoadJob]:
def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
"""Return list of new jobs. `final_state` is state to which this job transits"""
return []


Expand Down Expand Up @@ -345,6 +349,49 @@ def _verify_schema(self) -> None:
table_name,
self.capabilities.max_identifier_length,
)
if has_column_with_prop(table, "hard_delete"):
if len(get_columns_names_with_prop(table, "hard_delete")) > 1:
raise SchemaException(
f'Found multiple "hard_delete" column hints for table "{table_name}" in'
f' schema "{self.schema.name}" while only one is allowed:'
f' {", ".join(get_columns_names_with_prop(table, "hard_delete"))}.'
)
if table.get("write_disposition") in ("replace", "append"):
logger.warning(
f"""The "hard_delete" column hint for column "{get_first_column_name_with_prop(table, 'hard_delete')}" """
f'in table "{table_name}" with write disposition'
f' "{table.get("write_disposition")}"'
f' in schema "{self.schema.name}" will be ignored.'
' The "hard_delete" column hint is only applied when using'
' the "merge" write disposition.'
)
if has_column_with_prop(table, "dedup_sort"):
if len(get_columns_names_with_prop(table, "dedup_sort")) > 1:
raise SchemaException(
f'Found multiple "dedup_sort" column hints for table "{table_name}" in'
f' schema "{self.schema.name}" while only one is allowed:'
f' {", ".join(get_columns_names_with_prop(table, "dedup_sort"))}.'
)
if table.get("write_disposition") in ("replace", "append"):
logger.warning(
f"""The "dedup_sort" column hint for column "{get_first_column_name_with_prop(table, 'dedup_sort')}" """
f'in table "{table_name}" with write disposition'
f' "{table.get("write_disposition")}"'
f' in schema "{self.schema.name}" will be ignored.'
' The "dedup_sort" column hint is only applied when using'
' the "merge" write disposition.'
)
if table.get("write_disposition") == "merge" and not has_column_with_prop(
table, "primary_key"
):
logger.warning(
f"""The "dedup_sort" column hint for column "{get_first_column_name_with_prop(table, 'dedup_sort')}" """
f'in table "{table_name}" with write disposition'
f' "{table.get("write_disposition")}"'
f' in schema "{self.schema.name}" will be ignored.'
' The "dedup_sort" column hint is only applied when a'
" primary key has been specified."
)
for column_name, column in dict(table["columns"]).items():
if len(column_name) > self.capabilities.max_column_identifier_length:
raise IdentifierTooLongException(
Expand All @@ -361,9 +408,9 @@ def _verify_schema(self) -> None:
" column manually in code ie. as a merge key?"
)

def get_load_table(self, table_name: str, prepare_for_staging: bool = False) -> TTableSchema:
if table_name not in self.schema.tables:
return None
def prepare_load_table(
self, table_name: str, prepare_for_staging: bool = False
) -> TTableSchema:
try:
# make a copy of the schema so modifications do not affect the original document
table = deepcopy(self.schema.tables[table_name])
Expand Down
128 changes: 128 additions & 0 deletions dlt/common/schema/migrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from typing import Dict, List, cast

from dlt.common.data_types import TDataType
from dlt.common.normalizers import explicit_normalizers
from dlt.common.typing import DictStrAny
from dlt.common.schema.typing import (
LOADS_TABLE_NAME,
VERSION_TABLE_NAME,
TSimpleRegex,
TStoredSchema,
TTableSchemaColumns,
TColumnHint,
)
from dlt.common.schema.exceptions import SchemaEngineNoUpgradePathException

from dlt.common.normalizers.utils import import_normalizers
from dlt.common.schema.utils import new_table, version_table, load_table


def migrate_schema(schema_dict: DictStrAny, from_engine: int, to_engine: int) -> TStoredSchema:
if from_engine == to_engine:
return cast(TStoredSchema, schema_dict)

if from_engine == 1 and to_engine > 1:
schema_dict["includes"] = []
schema_dict["excludes"] = []
from_engine = 2
if from_engine == 2 and to_engine > 2:
# current version of the schema
current = cast(TStoredSchema, schema_dict)
# add default normalizers and root hash propagation
current["normalizers"], _, _ = import_normalizers(explicit_normalizers())
current["normalizers"]["json"]["config"] = {
"propagation": {"root": {"_dlt_id": "_dlt_root_id"}}
}
# move settings, convert strings to simple regexes
d_h: Dict[TColumnHint, List[TSimpleRegex]] = schema_dict.pop("hints", {})
for h_k, h_l in d_h.items():
d_h[h_k] = list(map(lambda r: TSimpleRegex("re:" + r), h_l))
p_t: Dict[TSimpleRegex, TDataType] = schema_dict.pop("preferred_types", {})
p_t = {TSimpleRegex("re:" + k): v for k, v in p_t.items()}

current["settings"] = {
"default_hints": d_h,
"preferred_types": p_t,
}
# repackage tables
old_tables: Dict[str, TTableSchemaColumns] = schema_dict.pop("tables")
current["tables"] = {}
for name, columns in old_tables.items():
# find last path separator
parent = name
# go back in a loop to find existing parent
while True:
idx = parent.rfind("__")
if idx > 0:
parent = parent[:idx]
if parent not in old_tables:
continue
else:
parent = None
break
nt = new_table(name, parent)
nt["columns"] = columns
current["tables"][name] = nt
# assign exclude and include to tables

def migrate_filters(group: str, filters: List[str]) -> None:
# existing filter were always defined at the root table. find this table and move filters
for f in filters:
# skip initial ^
root = f[1 : f.find("__")]
path = f[f.find("__") + 2 :]
t = current["tables"].get(root)
if t is None:
# must add new table to hold filters
t = new_table(root)
current["tables"][root] = t
t.setdefault("filters", {}).setdefault(group, []).append("re:^" + path) # type: ignore

excludes = schema_dict.pop("excludes", [])
migrate_filters("excludes", excludes)
includes = schema_dict.pop("includes", [])
migrate_filters("includes", includes)

# upgraded
from_engine = 3
if from_engine == 3 and to_engine > 3:
# set empty version hash to pass validation, in engine 4 this hash is mandatory
schema_dict.setdefault("version_hash", "")
from_engine = 4
if from_engine == 4 and to_engine > 4:
# replace schema versions table
schema_dict["tables"][VERSION_TABLE_NAME] = version_table()
schema_dict["tables"][LOADS_TABLE_NAME] = load_table()
from_engine = 5
if from_engine == 5 and to_engine > 5:
# replace loads table
schema_dict["tables"][LOADS_TABLE_NAME] = load_table()
from_engine = 6
if from_engine == 6 and to_engine > 6:
# migrate from sealed properties to schema evolution settings
schema_dict["settings"].pop("schema_sealed", None)
schema_dict["settings"]["schema_contract"] = {}
for table in schema_dict["tables"].values():
table.pop("table_sealed", None)
if not table.get("parent"):
table["schema_contract"] = {}
from_engine = 7
if from_engine == 7 and to_engine > 7:
schema_dict["previous_hashes"] = []
from_engine = 8
if from_engine == 8 and to_engine > 8:
# add "seen-data" to all tables with _dlt_id, this will handle packages
# that are being loaded
for table in schema_dict["tables"].values():
if "_dlt_id" in table["columns"]:
x_normalizer = table.setdefault("x-normalizer", {})
x_normalizer["seen-data"] = True
from_engine = 9

schema_dict["engine_version"] = from_engine
if from_engine != to_engine:
raise SchemaEngineNoUpgradePathException(
schema_dict["name"], schema_dict["engine_version"], from_engine, to_engine
)

return cast(TStoredSchema, schema_dict)
6 changes: 4 additions & 2 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from copy import copy, deepcopy
from typing import ClassVar, Dict, List, Mapping, Optional, Sequence, Tuple, Any, cast, Literal
from dlt.common import json
from dlt.common.schema.migrations import migrate_schema

from dlt.common.utils import extend_list_deduplicated
from dlt.common.typing import (
Expand Down Expand Up @@ -103,7 +104,7 @@ def __init__(self, name: str, normalizers: TNormalizersConfig = None) -> None:
@classmethod
def from_dict(cls, d: DictStrAny, bump_version: bool = True) -> "Schema":
# upgrade engine if needed
stored_schema = utils.migrate_schema(d, d["engine_version"], cls.ENGINE_VERSION)
stored_schema = migrate_schema(d, d["engine_version"], cls.ENGINE_VERSION)
# verify schema
utils.validate_stored_schema(stored_schema)
# add defaults
Expand Down Expand Up @@ -390,6 +391,7 @@ def resolve_contract_settings_for_table(
return Schema.expand_schema_contract_settings(settings)

def update_table(self, partial_table: TPartialTableSchema) -> TPartialTableSchema:
"""Adds or merges `partial_table` into the schema. Identifiers are not normalized"""
table_name = partial_table["name"]
parent_table_name = partial_table.get("parent")
# check if parent table present
Expand All @@ -414,7 +416,7 @@ def update_table(self, partial_table: TPartialTableSchema) -> TPartialTableSchem
return partial_table

def update_schema(self, schema: "Schema") -> None:
"""Updates this schema from an incoming schema"""
"""Updates this schema from an incoming schema. Normalizes identifiers after updating normalizers."""
# update all tables
for table in schema.tables.values():
self.update_table(table)
Expand Down
7 changes: 6 additions & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


# current version of schema engine
SCHEMA_ENGINE_VERSION = 8
SCHEMA_ENGINE_VERSION = 9

# dlt tables
VERSION_TABLE_NAME = "_dlt_version"
Expand All @@ -46,6 +46,7 @@
"unique",
"merge_key",
"root_key",
"dedup_sort",
]
"""Known properties and hints of the column"""
# TODO: merge TColumnHint with TColumnProp
Expand All @@ -59,6 +60,7 @@
"unique",
"root_key",
"merge_key",
"dedup_sort",
]
"""Known hints of a column used to declare hint regexes."""
TWriteDisposition = Literal["skip", "append", "replace", "merge"]
Expand All @@ -69,6 +71,7 @@
TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]]
TColumnNames = Union[str, Sequence[str]]
"""A string representing a column name or a list of"""
TSortOrder = Literal["asc", "desc"]

COLUMN_PROPS: Set[TColumnProp] = set(get_args(TColumnProp))
COLUMN_HINTS: Set[TColumnHint] = set(
Expand Down Expand Up @@ -112,6 +115,8 @@ class TColumnSchema(TColumnSchemaBase, total=False):
root_key: Optional[bool]
merge_key: Optional[bool]
variant: Optional[bool]
hard_delete: Optional[bool]
dedup_sort: Optional[TSortOrder]


TTableSchemaColumns = Dict[str, TColumnSchema]
Expand Down
Loading
Loading