Skip to content

Commit

Permalink
merges column props and hints, categorizes column props
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Sep 8, 2024
1 parent baa6f9b commit c2d35bf
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 49 deletions.
4 changes: 2 additions & 2 deletions dlt/common/schema/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
TSimpleRegex,
TStoredSchema,
TTableSchemaColumns,
TColumnHint,
TColumnDefaultHint,
)
from dlt.common.schema.exceptions import SchemaEngineNoUpgradePathException
from dlt.common.schema.utils import new_table, version_table, loads_table
Expand All @@ -34,7 +34,7 @@ def migrate_schema(schema_dict: DictStrAny, from_engine: int, to_engine: int) ->
"propagation": {"root": {"_dlt_id": "_dlt_root_id"}}
}
# move settings, convert strings to simple regexes
d_h: Dict[TColumnHint, List[TSimpleRegex]] = schema_dict.pop("hints", {})
d_h: Dict[TColumnDefaultHint, List[TSimpleRegex]] = schema_dict.pop("hints", {})
for h_k, h_l in d_h.items():
d_h[h_k] = list(map(lambda r: TSimpleRegex("re:" + r), h_l))
p_t: Dict[TSimpleRegex, TDataType] = schema_dict.pop("preferred_types", {})
Expand Down
35 changes: 21 additions & 14 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from dlt.common.schema import utils
from dlt.common.data_types import py_type_to_sc_type, coerce_value, TDataType
from dlt.common.schema.typing import (
COLUMN_HINTS,
DLT_NAME_PREFIX,
SCHEMA_ENGINE_VERSION,
LOADS_TABLE_NAME,
Expand All @@ -45,6 +44,7 @@
TColumnSchema,
TColumnProp,
TColumnHint,
TColumnDefaultHint,
TTypeDetections,
TSchemaContractDict,
TSchemaContract,
Expand Down Expand Up @@ -99,7 +99,7 @@ class Schema:
# list of preferred types: map regex on columns into types
_compiled_preferred_types: List[Tuple[REPattern, TDataType]]
# compiled default hints
_compiled_hints: Dict[TColumnHint, Sequence[REPattern]]
_compiled_hints: Dict[TColumnDefaultHint, Sequence[REPattern]]
# compiled exclude filters per table
_compiled_excludes: Dict[str, Sequence[REPattern]]
# compiled include filters per table
Expand Down Expand Up @@ -456,7 +456,9 @@ def drop_tables(
result.append(self._schema_tables.pop(table_name))
return result

def filter_row_with_hint(self, table_name: str, hint_type: TColumnHint, row: StrAny) -> StrAny:
def filter_row_with_hint(
self, table_name: str, hint_type: TColumnDefaultHint, row: StrAny
) -> StrAny:
rv_row: DictStrAny = {}
column_prop: TColumnProp = utils.hint_to_column_prop(hint_type)
try:
Expand All @@ -468,15 +470,15 @@ def filter_row_with_hint(self, table_name: str, hint_type: TColumnHint, row: Str
rv_row[column_name] = row[column_name]
except KeyError:
for k, v in row.items():
if self._infer_hint(hint_type, v, k):
if self._infer_hint(hint_type, k):
rv_row[k] = v

# dicts are ordered and we will return the rows with hints in the same order as they appear in the columns
return rv_row

def merge_hints(
self,
new_hints: Mapping[TColumnHint, Sequence[TSimpleRegex]],
new_hints: Mapping[TColumnDefaultHint, Sequence[TSimpleRegex]],
normalize_identifiers: bool = True,
) -> None:
"""Merges existing default hints with `new_hints`. Normalizes names in column regexes if possible. Compiles setting at the end
Expand Down Expand Up @@ -775,11 +777,16 @@ def _infer_column(
column_schema = TColumnSchema(
name=k,
data_type=data_type or self._infer_column_type(v, k),
nullable=not self._infer_hint("not_null", v, k),
nullable=not self._infer_hint("not_null", k),
)
for hint in COLUMN_HINTS:
# check other preferred hints that are available
for hint in self._compiled_hints:
# already processed
if hint == "not_null":
continue
column_prop = utils.hint_to_column_prop(hint)
hint_value = self._infer_hint(hint, v, k)
hint_value = self._infer_hint(hint, k)
# set only non-default values
if not utils.has_default_column_prop_value(column_prop, hint_value):
column_schema[column_prop] = hint_value

Expand All @@ -793,7 +800,7 @@ def _coerce_null_value(
"""Raises when column is explicitly not nullable"""
if col_name in table_columns:
existing_column = table_columns[col_name]
if not existing_column.get("nullable", True):
if not utils.is_nullable_column(existing_column):
raise CannotCoerceNullException(self.name, table_name, col_name)

def _coerce_non_null_value(
Expand Down Expand Up @@ -882,15 +889,15 @@ def _infer_column_type(self, v: Any, col_name: str, skip_preferred: bool = False
preferred_type = self.get_preferred_type(col_name)
return preferred_type or mapped_type

def _infer_hint(self, hint_type: TColumnHint, _: Any, col_name: str) -> bool:
def _infer_hint(self, hint_type: TColumnDefaultHint, col_name: str) -> bool:
if hint_type in self._compiled_hints:
return any(h.search(col_name) for h in self._compiled_hints[hint_type])
else:
return False

def _merge_hints(
self,
new_hints: Mapping[TColumnHint, Sequence[TSimpleRegex]],
new_hints: Mapping[TColumnDefaultHint, Sequence[TSimpleRegex]],
normalize_identifiers: bool = True,
) -> None:
"""Used by `merge_hints method, does not compile settings at the end"""
Expand Down Expand Up @@ -978,8 +985,8 @@ def _add_standard_hints(self) -> None:
self._settings["detections"] = type_detections

def _normalize_default_hints(
self, default_hints: Mapping[TColumnHint, Sequence[TSimpleRegex]]
) -> Dict[TColumnHint, List[TSimpleRegex]]:
self, default_hints: Mapping[TColumnDefaultHint, Sequence[TSimpleRegex]]
) -> Dict[TColumnDefaultHint, List[TSimpleRegex]]:
"""Normalizes the column names in default hints. In case of column names that are regexes, normalization is skipped"""
return {
hint: [utils.normalize_simple_regex_column(self.naming, regex) for regex in regexes]
Expand Down Expand Up @@ -1145,7 +1152,7 @@ def _reset_schema(self, name: str, normalizers: TNormalizersConfig = None) -> No

self._settings: TSchemaSettings = {}
self._compiled_preferred_types: List[Tuple[REPattern, TDataType]] = []
self._compiled_hints: Dict[TColumnHint, Sequence[REPattern]] = {}
self._compiled_hints: Dict[TColumnDefaultHint, Sequence[REPattern]] = {}
self._compiled_excludes: Dict[str, Sequence[REPattern]] = {}
self._compiled_includes: Dict[str, Sequence[REPattern]] = {}
self._type_detections: Sequence[TTypeDetections] = None
Expand Down
77 changes: 58 additions & 19 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
Dict,
List,
Literal,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
Type,
TypedDict,
NewType,
Expand Down Expand Up @@ -36,8 +38,14 @@

TColumnProp = Literal[
"name",
# data type
"data_type",
"precision",
"scale",
"timezone",
"nullable",
"variant",
# hints
"partition",
"cluster",
"primary_key",
Expand All @@ -49,10 +57,11 @@
"hard_delete",
"dedup_sort",
]
"""Known properties and hints of the column"""
# TODO: merge TColumnHint with TColumnProp
"""All known properties of the column, including name, data type info and hints"""
COLUMN_PROPS: Set[TColumnProp] = set(get_args(TColumnProp))

TColumnHint = Literal[
"not_null",
"nullable",
"partition",
"cluster",
"primary_key",
Expand All @@ -64,7 +73,49 @@
"hard_delete",
"dedup_sort",
]
"""Known hints of a column used to declare hint regexes."""
"""Known hints of a column"""
COLUMN_HINTS: Set[TColumnHint] = set(get_args(TColumnHint))


class TColumnPropInfo(NamedTuple):
name: Union[TColumnProp, str]
defaults: Tuple[Any, ...] = (None,)
is_hint: bool = False


_ColumnPropInfos = [
TColumnPropInfo("name"),
TColumnPropInfo("data_type"),
TColumnPropInfo("precision"),
TColumnPropInfo("scale"),
TColumnPropInfo("timezone", (True, None)),
TColumnPropInfo("nullable", (True, None)),
TColumnPropInfo("variant", (False, None)),
TColumnPropInfo("partition", (False, None)),
TColumnPropInfo("cluster", (False, None)),
TColumnPropInfo("primary_key", (False, None)),
TColumnPropInfo("foreign_key", (False, None)),
TColumnPropInfo("sort", (False, None)),
TColumnPropInfo("unique", (False, None)),
TColumnPropInfo("merge_key", (False, None)),
TColumnPropInfo("root_key", (False, None)),
TColumnPropInfo("hard_delete", (False, None)),
TColumnPropInfo("dedup_sort", (False, None)),
# any x- hint with special settings ie. defaults
TColumnPropInfo("x-active-record-timestamp", (), is_hint=True), # no default values
]

ColumnPropInfos: Dict[Union[TColumnProp, str], TColumnPropInfo] = {
info.name: info for info in _ColumnPropInfos
}
# verify column props and column hints infos
for hint in COLUMN_HINTS:
assert hint in COLUMN_PROPS, f"Hint {hint} must be a column prop"

for prop in COLUMN_PROPS:
assert prop in ColumnPropInfos, f"Column {prop} has no info, please define"
if prop in COLUMN_HINTS:
ColumnPropInfos[prop] = ColumnPropInfos[prop]._replace(is_hint=True)

TTableFormat = Literal["iceberg", "delta", "hive"]
TFileFormat = Literal[Literal["preferred"], TLoaderFileFormat]
Expand All @@ -75,20 +126,6 @@
TColumnNames = Union[str, Sequence[str]]
"""A string representing a column name or a list of"""

# COLUMN_PROPS: Set[TColumnProp] = set(get_args(TColumnProp))
COLUMN_HINTS: Set[TColumnHint] = set(
[
"partition",
"cluster",
"primary_key",
"foreign_key",
"sort",
"unique",
"merge_key",
"root_key",
]
)


class TColumnType(TypedDict, total=False):
data_type: Optional[TDataType]
Expand Down Expand Up @@ -220,12 +257,14 @@ class TPartialTableSchema(TTableSchema):

TSchemaTables = Dict[str, TTableSchema]
TSchemaUpdate = Dict[str, List[TPartialTableSchema]]
TColumnDefaultHint = Literal["not_null", TColumnHint]
"""Allows using not_null in default hints setting section"""


class TSchemaSettings(TypedDict, total=False):
schema_contract: Optional[TSchemaContract]
detections: Optional[List[TTypeDetections]]
default_hints: Optional[Dict[TColumnHint, List[TSimpleRegex]]]
default_hints: Optional[Dict[TColumnDefaultHint, List[TSimpleRegex]]]
preferred_types: Optional[Dict[TSimpleRegex, TDataType]]


Expand Down
20 changes: 7 additions & 13 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
from dlt.common.validation import TCustomValidator, validate_dict_ignoring_xkeys
from dlt.common.schema import detections
from dlt.common.schema.typing import (
COLUMN_HINTS,
SCHEMA_ENGINE_VERSION,
LOADS_TABLE_NAME,
SIMPLE_REGEX_PREFIX,
VERSION_TABLE_NAME,
PIPELINE_STATE_TABLE_NAME,
ColumnPropInfos,
TColumnName,
TFileFormat,
TPartialTableSchema,
Expand All @@ -36,7 +36,7 @@
TColumnSchema,
TColumnProp,
TTableFormat,
TColumnHint,
TColumnDefaultHint,
TTableSchemaColumns,
TTypeDetectionFunc,
TTypeDetections,
Expand Down Expand Up @@ -129,15 +129,9 @@ def remove_defaults(stored_schema: TStoredSchema) -> TStoredSchema:
def has_default_column_prop_value(prop: str, value: Any) -> bool:
"""Checks if `value` is a default for `prop`."""
# remove all boolean hints that are False, except "nullable" which is removed when it is True
# TODO: merge column props and hints
if prop in COLUMN_HINTS:
return value in (False, None)
# TODO: type all the hints including default value so those exceptions may be removed
if prop == "nullable":
return value in (True, None)
if prop == "x-active-record-timestamp":
# None is a valid value so it is not a default
return False
if prop in ColumnPropInfos:
return value in ColumnPropInfos[prop].defaults
# for any unknown hint ie. "x-" the defaults are
return value in (None, False)


Expand Down Expand Up @@ -589,7 +583,7 @@ def get_processing_hints(tables: TSchemaTables) -> Dict[str, List[str]]:
return hints


def hint_to_column_prop(h: TColumnHint) -> TColumnProp:
def hint_to_column_prop(h: TColumnDefaultHint) -> TColumnProp:
if h == "not_null":
return "nullable"
return h
Expand Down Expand Up @@ -933,7 +927,7 @@ def new_column(
return column


def default_hints() -> Dict[TColumnHint, List[TSimpleRegex]]:
def default_hints() -> Dict[TColumnDefaultHint, List[TSimpleRegex]]:
return None


Expand Down
2 changes: 1 addition & 1 deletion tests/common/schema/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ def test_compare_columns() -> None:
)
# any of the hints may differ
for hint in COLUMN_HINTS:
table["columns"]["col3"][hint] = True # type: ignore[typeddict-unknown-key]
table["columns"]["col3"][hint] = True
# name may not differ
assert (
utils.compare_complete_columns(table["columns"]["col3"], table["columns"]["col4"]) is False
Expand Down

0 comments on commit c2d35bf

Please sign in to comment.