Skip to content

Commit

Permalink
stores per table hints in resources, allows to compute them via item …
Browse files Browse the repository at this point in the history
…metas + tests
  • Loading branch information
rudolfix committed Mar 21, 2024
1 parent a6571a7 commit e6686db
Show file tree
Hide file tree
Showing 9 changed files with 397 additions and 71 deletions.
36 changes: 22 additions & 14 deletions dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,16 @@ def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> No
if isinstance(meta, HintsMeta):
# update the resource with new hints, remove all caches so schema is recomputed
# and contracts re-applied
resource.merge_hints(meta.hints)
resource.merge_hints(meta.hints, meta.create_table_variant)
# convert to table meta if created table variant so item is assigned to this table
if meta.create_table_variant:
# name in hints meta must be a string, otherwise merge_hints would fail
meta = TableNameMeta(meta.hints["name"]) # type: ignore[arg-type]
self._reset_contracts_cache()

if table_name := self._get_static_table_name(resource, meta):
# write item belonging to table with static name
self._write_to_static_table(resource, table_name, items)
self._write_to_static_table(resource, table_name, items, meta)
else:
# table has name or other hints depending on data items
self._write_to_dynamic_table(resource, items)
Expand Down Expand Up @@ -157,30 +161,32 @@ def _write_to_dynamic_table(self, resource: DltResource, items: TDataItems) -> N
if table_name in self._filtered_tables:
continue
if table_name not in self._table_contracts or resource._table_has_other_dynamic_hints:
item = self._compute_and_update_table(resource, table_name, item)
item = self._compute_and_update_table(
resource, table_name, item, TableNameMeta(table_name)
)
# write to storage with inferred table name
if table_name not in self._filtered_tables:
self._write_item(table_name, resource.name, item)

def _write_to_static_table(
self, resource: DltResource, table_name: str, items: TDataItems
self, resource: DltResource, table_name: str, items: TDataItems, meta: Any
) -> None:
if table_name not in self._table_contracts:
items = self._compute_and_update_table(resource, table_name, items)
items = self._compute_and_update_table(resource, table_name, items, meta)
if table_name not in self._filtered_tables:
self._write_item(table_name, resource.name, items)

def _compute_table(self, resource: DltResource, items: TDataItems) -> TTableSchema:
def _compute_table(self, resource: DltResource, items: TDataItems, meta: Any) -> TTableSchema:
"""Computes a schema for a new or dynamic table and normalizes identifiers"""
return self.schema.normalize_table_identifiers(resource.compute_table_schema(items))
return self.schema.normalize_table_identifiers(resource.compute_table_schema(items, meta))

def _compute_and_update_table(
self, resource: DltResource, table_name: str, items: TDataItems
self, resource: DltResource, table_name: str, items: TDataItems, meta: Any
) -> TDataItems:
"""
Computes new table and does contract checks, if false is returned, the table may not be created and no items should be written
"""
computed_table = self._compute_table(resource, items)
computed_table = self._compute_table(resource, items, meta)
# overwrite table name (if coming from meta)
computed_table["name"] = table_name
# get or compute contract
Expand All @@ -193,7 +199,7 @@ def _compute_and_update_table(
computed_table["x-normalizer"] = {"evolve-columns-once": True} # type: ignore[typeddict-unknown-key]
existing_table = self.schema._schema_tables.get(table_name, None)
if existing_table:
diff_table = utils.diff_tables(existing_table, computed_table)
diff_table = utils.diff_table(existing_table, computed_table)
else:
diff_table = computed_table

Expand Down Expand Up @@ -300,9 +306,11 @@ def _write_item(
]
super()._write_item(table_name, resource_name, items, columns)

def _compute_table(self, resource: DltResource, items: TDataItems) -> TPartialTableSchema:
def _compute_table(
self, resource: DltResource, items: TDataItems, meta: Any
) -> TPartialTableSchema:
items = items[0]
computed_table = super()._compute_table(resource, items)
computed_table = super()._compute_table(resource, items, Any)

# Merge the columns to include primary_key and other hints that may be set on the resource
arrow_table = copy(computed_table)
Expand All @@ -329,9 +337,9 @@ def _compute_table(self, resource: DltResource, items: TDataItems) -> TPartialTa
return arrow_table

def _compute_and_update_table(
self, resource: DltResource, table_name: str, items: TDataItems
self, resource: DltResource, table_name: str, items: TDataItems, meta: Any
) -> TDataItems:
items = super()._compute_and_update_table(resource, table_name, items)
items = super()._compute_and_update_table(resource, table_name, items, meta)
# filter data item as filters could be updated in compute table
items = [self._apply_contract_filters(item, resource, table_name) for item in items]
return items
121 changes: 92 additions & 29 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
TTableFormat,
TSchemaContract,
)
from dlt.common.schema.utils import DEFAULT_WRITE_DISPOSITION, merge_columns, new_column, new_table
from dlt.common import logger
from dlt.common.schema.utils import DEFAULT_WRITE_DISPOSITION, merge_column, new_column, new_table
from dlt.common.typing import TDataItem, DictStrAny, DictStrStr
from dlt.common.utils import update_dict_nested
from dlt.common.validation import validate_dict_ignoring_xkeys
Expand All @@ -21,7 +22,7 @@
InconsistentTableTemplate,
)
from dlt.extract.incremental import Incremental
from dlt.extract.items import TFunHintTemplate, TTableHintTemplate, ValidateItem
from dlt.extract.items import TFunHintTemplate, TTableHintTemplate, TableNameMeta, ValidateItem
from dlt.extract.utils import ensure_table_schema_columns, ensure_table_schema_columns_hint
from dlt.extract.validation import create_item_validator

Expand All @@ -43,12 +44,17 @@ class TResourceHints(TypedDict, total=False):


class HintsMeta:
__slots__ = "hints"
__slots__ = ["hints", "create_table_variant"]

hints: TResourceHints
create_table_variant: bool

def __init__(self, hints: TResourceHints) -> None:
def __init__(self, hints: TResourceHints, create_table_variant: bool) -> None:
self.hints = hints
self.create_table_variant = create_table_variant


NATURAL_CALLABLES = ["incremental", "validator", "original_columns"]


def make_hints(
Expand Down Expand Up @@ -105,8 +111,11 @@ def __init__(self, table_schema_template: TResourceHints = None):
self._table_name_hint_fun: TFunHintTemplate[str] = None
self._table_has_other_dynamic_hints: bool = False
self._hints: TResourceHints = None
"""Hints for the resource"""
self._hints_variants: Dict[str, TResourceHints] = {}
"""Hints for tables emitted from resources"""
if table_schema_template:
self.set_hints(table_schema_template)
self._set_hints(table_schema_template)

@property
def name(self) -> str:
Expand Down Expand Up @@ -143,16 +152,24 @@ def columns(self) -> TTableHintTemplate[TTableSchemaColumns]:
def schema_contract(self) -> TTableHintTemplate[TSchemaContract]:
return self._hints.get("schema_contract")

def compute_table_schema(self, item: TDataItem = None) -> TTableSchema:
"""Computes the table schema based on hints and column definitions passed during resource creation. `item` parameter is used to resolve table hints based on data."""
if not self._hints:
def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTableSchema:
"""Computes the table schema based on hints and column definitions passed during resource creation.
`item` parameter is used to resolve table hints based on data.
`meta` parameter is taken from Pipe and may further specify table name if variant is to be used
"""
if isinstance(meta, TableNameMeta):
# look for variant
table_template = self._hints_variants.get(meta.table_name, self._hints)
else:
table_template = self._hints
if not table_template:
return new_table(self.name, resource=self.name)

# resolve a copy of a held template
table_template = copy(self._hints)
table_template = self._clone_hints(table_template)
if "name" not in table_template:
table_template["name"] = self.name
table_template["columns"] = copy(self._hints["columns"])
# table_template["columns"] = copy(self._hints["columns"])

# if table template present and has dynamic hints, the data item must be provided.
if self._table_name_hint_fun and item is None:
Expand All @@ -161,7 +178,7 @@ def compute_table_schema(self, item: TDataItem = None) -> TTableSchema:
resolved_template: TResourceHints = {
k: self._resolve_hint(item, v)
for k, v in table_template.items()
if k not in ["incremental", "validator", "original_columns"]
if k not in NATURAL_CALLABLES
} # type: ignore
table_schema = self._merge_keys(resolved_template)
table_schema["resource"] = self.name
Expand All @@ -184,9 +201,14 @@ def apply_hints(
schema_contract: TTableHintTemplate[TSchemaContract] = None,
additional_table_hints: Optional[Dict[str, TTableHintTemplate[Any]]] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
create_table_variant: bool = False,
) -> None:
"""Creates or modifies existing table schema by setting provided hints. Accepts both static and dynamic hints based on data.
If `create_table_variant` is specified, the `table_name` must be a string and hints will be used to create a separate set of hints
for a particular `table_name`. Such hints may be retrieved via compute_table_schema(meta=TableNameMeta(table_name)).
Table variant hints may not contain dynamic hints.
This method accepts the same table hints arguments as `dlt.resource` decorator with the following additions.
Skip the argument or pass None to leave the existing hint.
Pass empty value (for a particular type i.e. "" for a string) to remove a hint.
Expand All @@ -197,7 +219,24 @@ def apply_hints(
Please note that for efficient incremental loading, the resource must be aware of the Incremental by accepting it as one if its arguments and then using are to skip already loaded data.
In non-aware resources, `dlt` will filter out the loaded values, however, the resource will yield all the values again.
"""
if not self._hints:
if create_table_variant:
if not isinstance(table_name, str):
raise ValueError(
"Please provide string table name if you want to create a table variant of"
" hints"
)
# select hints variant
t = self._hints_variants.get(table_name, None)
if t is None:
# use resource hints as starting point
if self._hints:
t = self._clone_hints(self._hints)
# but remove callables
t = {n: h for n, h in t.items() if not callable(h)} # type: ignore[assignment]
else:
t = self._hints

if t is None:
# if there is no template yet, create and set a new one.
default_wd = None if parent_table_name else DEFAULT_WRITE_DISPOSITION
t = make_hints(
Expand All @@ -211,8 +250,7 @@ def apply_hints(
table_format,
)
else:
# set single hints
t = self._clone_hints(self._hints)
t = self._clone_hints(t)
if table_name is not None:
if table_name:
t["name"] = table_name
Expand Down Expand Up @@ -279,20 +317,46 @@ def apply_hints(
if incremental is not None:
t["incremental"] = None if incremental is Incremental.EMPTY else incremental

self.set_hints(t)
self._set_hints(t, create_table_variant)

def set_hints(self, hints_template: TResourceHints) -> None:
def _set_hints(
self, hints_template: TResourceHints, create_table_variant: bool = False
) -> None:
DltResourceHints.validate_dynamic_hints(hints_template)
# if "name" is callable in the template, then the table schema requires data item to be inferred.
name_hint = hints_template.get("name")
self._table_name_hint_fun = name_hint if callable(name_hint) else None
# check if any other hints in the table template should be inferred from data.
self._table_has_other_dynamic_hints = any(
callable(v) for k, v in hints_template.items() if k != "name"
)
self._hints = hints_template
if create_table_variant:
table_name: str = hints_template["name"] # type: ignore[assignment]
# incremental cannot be specified in variant
if hints_template.get("incremental"):
raise InconsistentTableTemplate(
f"You can specify incremental only for the resource `{self.name}` hints, not in"
f" table `{table_name}` variant-"
)
if hints_template.get("validator"):
logger.warning(
f"A data item validator was created from column schema in {self.name} for a"
f" table `{table_name}` variant. Currently such validator is ignored."
)
# dynamic hints will be ignored
for name, hint in hints_template.items():
if callable(hint) and name not in NATURAL_CALLABLES:
raise InconsistentTableTemplate(
f"Table `{table_name}` variant hint is resource {self.name} cannot have"
f" dynamic hint but {name} does."
)
self._hints_variants[table_name] = hints_template
else:
# if "name" is callable in the template, then the table schema requires data item to be inferred.
name_hint = hints_template.get("name")
self._table_name_hint_fun = name_hint if callable(name_hint) else None
# check if any other hints in the table template should be inferred from data.
self._table_has_other_dynamic_hints = any(
callable(v) for k, v in hints_template.items() if k != "name"
)
self._hints = hints_template

def merge_hints(self, hints_template: TResourceHints) -> None:
def merge_hints(
self, hints_template: TResourceHints, create_table_variant: bool = False
) -> None:
self.apply_hints(
table_name=hints_template.get("name"),
parent_table_name=hints_template.get("parent"),
Expand All @@ -303,6 +367,7 @@ def merge_hints(self, hints_template: TResourceHints) -> None:
incremental=hints_template.get("incremental"),
schema_contract=hints_template.get("schema_contract"),
table_format=hints_template.get("table_format"),
create_table_variant=create_table_variant,
)

@staticmethod
Expand All @@ -324,7 +389,7 @@ def _merge_key(hint: TColumnProp, keys: TColumnNames, partial: TPartialTableSche
keys = [keys]
for key in keys:
if key in partial["columns"]:
merge_columns(partial["columns"][key], {hint: True}) # type: ignore
merge_column(partial["columns"][key], {hint: True}) # type: ignore
else:
partial["columns"][key] = new_column(key, nullable=False)
partial["columns"][key][hint] = True
Expand All @@ -347,9 +412,7 @@ def validate_dynamic_hints(template: TResourceHints) -> None:
table_name = template.get("name")
# if any of the hints is a function, then name must be as well.
if any(
callable(v)
for k, v in template.items()
if k not in ["name", "incremental", "validator", "original_columns"]
callable(v) for k, v in template.items() if k not in ["name", *NATURAL_CALLABLES]
) and not callable(table_name):
raise InconsistentTableTemplate(
f"Table name {table_name} must be a function if any other table hint is a function"
Expand Down
10 changes: 5 additions & 5 deletions dlt/extract/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,20 @@ class SourcePipeItem(NamedTuple):


class DataItemWithMeta:
__slots__ = "meta", "data"
__slots__ = ["meta", "data"]

meta: Any
data: TDataItems
# meta: Any
# data: TDataItems

def __init__(self, meta: Any, data: TDataItems) -> None:
self.meta = meta
self.data = data


class TableNameMeta:
__slots__ = "table_name"
__slots__ = ["table_name"]

table_name: str
# table_name: str

def __init__(self, table_name: str) -> None:
self.table_name = table_name
Expand Down
Loading

0 comments on commit e6686db

Please sign in to comment.