diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index 52ecd66920..b8e615aae4 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -105,12 +105,16 @@ def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> No if isinstance(meta, HintsMeta): # update the resource with new hints, remove all caches so schema is recomputed # and contracts re-applied - resource.merge_hints(meta.hints) + resource.merge_hints(meta.hints, meta.create_table_variant) + # convert to table meta if created table variant so item is assigned to this table + if meta.create_table_variant: + # name in hints meta must be a string, otherwise merge_hints would fail + meta = TableNameMeta(meta.hints["name"]) # type: ignore[arg-type] self._reset_contracts_cache() if table_name := self._get_static_table_name(resource, meta): # write item belonging to table with static name - self._write_to_static_table(resource, table_name, items) + self._write_to_static_table(resource, table_name, items, meta) else: # table has name or other hints depending on data items self._write_to_dynamic_table(resource, items) @@ -157,30 +161,32 @@ def _write_to_dynamic_table(self, resource: DltResource, items: TDataItems) -> N if table_name in self._filtered_tables: continue if table_name not in self._table_contracts or resource._table_has_other_dynamic_hints: - item = self._compute_and_update_table(resource, table_name, item) + item = self._compute_and_update_table( + resource, table_name, item, TableNameMeta(table_name) + ) # write to storage with inferred table name if table_name not in self._filtered_tables: self._write_item(table_name, resource.name, item) def _write_to_static_table( - self, resource: DltResource, table_name: str, items: TDataItems + self, resource: DltResource, table_name: str, items: TDataItems, meta: Any ) -> None: if table_name not in self._table_contracts: - items = self._compute_and_update_table(resource, table_name, items) + items = self._compute_and_update_table(resource, table_name, items, meta) if table_name not in self._filtered_tables: self._write_item(table_name, resource.name, items) - def _compute_table(self, resource: DltResource, items: TDataItems) -> TTableSchema: + def _compute_table(self, resource: DltResource, items: TDataItems, meta: Any) -> TTableSchema: """Computes a schema for a new or dynamic table and normalizes identifiers""" - return self.schema.normalize_table_identifiers(resource.compute_table_schema(items)) + return self.schema.normalize_table_identifiers(resource.compute_table_schema(items, meta)) def _compute_and_update_table( - self, resource: DltResource, table_name: str, items: TDataItems + self, resource: DltResource, table_name: str, items: TDataItems, meta: Any ) -> TDataItems: """ Computes new table and does contract checks, if false is returned, the table may not be created and no items should be written """ - computed_table = self._compute_table(resource, items) + computed_table = self._compute_table(resource, items, meta) # overwrite table name (if coming from meta) computed_table["name"] = table_name # get or compute contract @@ -193,7 +199,7 @@ def _compute_and_update_table( computed_table["x-normalizer"] = {"evolve-columns-once": True} # type: ignore[typeddict-unknown-key] existing_table = self.schema._schema_tables.get(table_name, None) if existing_table: - diff_table = utils.diff_tables(existing_table, computed_table) + diff_table = utils.diff_table(existing_table, computed_table) else: diff_table = computed_table @@ -300,9 +306,11 @@ def _write_item( ] super()._write_item(table_name, resource_name, items, columns) - def _compute_table(self, resource: DltResource, items: TDataItems) -> TPartialTableSchema: + def _compute_table( + self, resource: DltResource, items: TDataItems, meta: Any + ) -> TPartialTableSchema: items = items[0] - computed_table = super()._compute_table(resource, items) + computed_table = super()._compute_table(resource, items, Any) # Merge the columns to include primary_key and other hints that may be set on the resource arrow_table = copy(computed_table) @@ -329,9 +337,9 @@ def _compute_table(self, resource: DltResource, items: TDataItems) -> TPartialTa return arrow_table def _compute_and_update_table( - self, resource: DltResource, table_name: str, items: TDataItems + self, resource: DltResource, table_name: str, items: TDataItems, meta: Any ) -> TDataItems: - items = super()._compute_and_update_table(resource, table_name, items) + items = super()._compute_and_update_table(resource, table_name, items, meta) # filter data item as filters could be updated in compute table items = [self._apply_contract_filters(item, resource, table_name) for item in items] return items diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 54ce00a806..e284984e4c 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -12,7 +12,8 @@ TTableFormat, TSchemaContract, ) -from dlt.common.schema.utils import DEFAULT_WRITE_DISPOSITION, merge_columns, new_column, new_table +from dlt.common import logger +from dlt.common.schema.utils import DEFAULT_WRITE_DISPOSITION, merge_column, new_column, new_table from dlt.common.typing import TDataItem, DictStrAny, DictStrStr from dlt.common.utils import update_dict_nested from dlt.common.validation import validate_dict_ignoring_xkeys @@ -21,7 +22,7 @@ InconsistentTableTemplate, ) from dlt.extract.incremental import Incremental -from dlt.extract.items import TFunHintTemplate, TTableHintTemplate, ValidateItem +from dlt.extract.items import TFunHintTemplate, TTableHintTemplate, TableNameMeta, ValidateItem from dlt.extract.utils import ensure_table_schema_columns, ensure_table_schema_columns_hint from dlt.extract.validation import create_item_validator @@ -43,12 +44,17 @@ class TResourceHints(TypedDict, total=False): class HintsMeta: - __slots__ = "hints" + __slots__ = ["hints", "create_table_variant"] hints: TResourceHints + create_table_variant: bool - def __init__(self, hints: TResourceHints) -> None: + def __init__(self, hints: TResourceHints, create_table_variant: bool) -> None: self.hints = hints + self.create_table_variant = create_table_variant + + +NATURAL_CALLABLES = ["incremental", "validator", "original_columns"] def make_hints( @@ -105,8 +111,11 @@ def __init__(self, table_schema_template: TResourceHints = None): self._table_name_hint_fun: TFunHintTemplate[str] = None self._table_has_other_dynamic_hints: bool = False self._hints: TResourceHints = None + """Hints for the resource""" + self._hints_variants: Dict[str, TResourceHints] = {} + """Hints for tables emitted from resources""" if table_schema_template: - self.set_hints(table_schema_template) + self._set_hints(table_schema_template) @property def name(self) -> str: @@ -143,16 +152,24 @@ def columns(self) -> TTableHintTemplate[TTableSchemaColumns]: def schema_contract(self) -> TTableHintTemplate[TSchemaContract]: return self._hints.get("schema_contract") - def compute_table_schema(self, item: TDataItem = None) -> TTableSchema: - """Computes the table schema based on hints and column definitions passed during resource creation. `item` parameter is used to resolve table hints based on data.""" - if not self._hints: + def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTableSchema: + """Computes the table schema based on hints and column definitions passed during resource creation. + `item` parameter is used to resolve table hints based on data. + `meta` parameter is taken from Pipe and may further specify table name if variant is to be used + """ + if isinstance(meta, TableNameMeta): + # look for variant + table_template = self._hints_variants.get(meta.table_name, self._hints) + else: + table_template = self._hints + if not table_template: return new_table(self.name, resource=self.name) # resolve a copy of a held template - table_template = copy(self._hints) + table_template = self._clone_hints(table_template) if "name" not in table_template: table_template["name"] = self.name - table_template["columns"] = copy(self._hints["columns"]) + # table_template["columns"] = copy(self._hints["columns"]) # if table template present and has dynamic hints, the data item must be provided. if self._table_name_hint_fun and item is None: @@ -161,7 +178,7 @@ def compute_table_schema(self, item: TDataItem = None) -> TTableSchema: resolved_template: TResourceHints = { k: self._resolve_hint(item, v) for k, v in table_template.items() - if k not in ["incremental", "validator", "original_columns"] + if k not in NATURAL_CALLABLES } # type: ignore table_schema = self._merge_keys(resolved_template) table_schema["resource"] = self.name @@ -184,9 +201,14 @@ def apply_hints( schema_contract: TTableHintTemplate[TSchemaContract] = None, additional_table_hints: Optional[Dict[str, TTableHintTemplate[Any]]] = None, table_format: TTableHintTemplate[TTableFormat] = None, + create_table_variant: bool = False, ) -> None: """Creates or modifies existing table schema by setting provided hints. Accepts both static and dynamic hints based on data. + If `create_table_variant` is specified, the `table_name` must be a string and hints will be used to create a separate set of hints + for a particular `table_name`. Such hints may be retrieved via compute_table_schema(meta=TableNameMeta(table_name)). + Table variant hints may not contain dynamic hints. + This method accepts the same table hints arguments as `dlt.resource` decorator with the following additions. Skip the argument or pass None to leave the existing hint. Pass empty value (for a particular type i.e. "" for a string) to remove a hint. @@ -197,7 +219,24 @@ def apply_hints( Please note that for efficient incremental loading, the resource must be aware of the Incremental by accepting it as one if its arguments and then using are to skip already loaded data. In non-aware resources, `dlt` will filter out the loaded values, however, the resource will yield all the values again. """ - if not self._hints: + if create_table_variant: + if not isinstance(table_name, str): + raise ValueError( + "Please provide string table name if you want to create a table variant of" + " hints" + ) + # select hints variant + t = self._hints_variants.get(table_name, None) + if t is None: + # use resource hints as starting point + if self._hints: + t = self._clone_hints(self._hints) + # but remove callables + t = {n: h for n, h in t.items() if not callable(h)} # type: ignore[assignment] + else: + t = self._hints + + if t is None: # if there is no template yet, create and set a new one. default_wd = None if parent_table_name else DEFAULT_WRITE_DISPOSITION t = make_hints( @@ -211,8 +250,7 @@ def apply_hints( table_format, ) else: - # set single hints - t = self._clone_hints(self._hints) + t = self._clone_hints(t) if table_name is not None: if table_name: t["name"] = table_name @@ -279,20 +317,46 @@ def apply_hints( if incremental is not None: t["incremental"] = None if incremental is Incremental.EMPTY else incremental - self.set_hints(t) + self._set_hints(t, create_table_variant) - def set_hints(self, hints_template: TResourceHints) -> None: + def _set_hints( + self, hints_template: TResourceHints, create_table_variant: bool = False + ) -> None: DltResourceHints.validate_dynamic_hints(hints_template) - # if "name" is callable in the template, then the table schema requires data item to be inferred. - name_hint = hints_template.get("name") - self._table_name_hint_fun = name_hint if callable(name_hint) else None - # check if any other hints in the table template should be inferred from data. - self._table_has_other_dynamic_hints = any( - callable(v) for k, v in hints_template.items() if k != "name" - ) - self._hints = hints_template + if create_table_variant: + table_name: str = hints_template["name"] # type: ignore[assignment] + # incremental cannot be specified in variant + if hints_template.get("incremental"): + raise InconsistentTableTemplate( + f"You can specify incremental only for the resource `{self.name}` hints, not in" + f" table `{table_name}` variant-" + ) + if hints_template.get("validator"): + logger.warning( + f"A data item validator was created from column schema in {self.name} for a" + f" table `{table_name}` variant. Currently such validator is ignored." + ) + # dynamic hints will be ignored + for name, hint in hints_template.items(): + if callable(hint) and name not in NATURAL_CALLABLES: + raise InconsistentTableTemplate( + f"Table `{table_name}` variant hint is resource {self.name} cannot have" + f" dynamic hint but {name} does." + ) + self._hints_variants[table_name] = hints_template + else: + # if "name" is callable in the template, then the table schema requires data item to be inferred. + name_hint = hints_template.get("name") + self._table_name_hint_fun = name_hint if callable(name_hint) else None + # check if any other hints in the table template should be inferred from data. + self._table_has_other_dynamic_hints = any( + callable(v) for k, v in hints_template.items() if k != "name" + ) + self._hints = hints_template - def merge_hints(self, hints_template: TResourceHints) -> None: + def merge_hints( + self, hints_template: TResourceHints, create_table_variant: bool = False + ) -> None: self.apply_hints( table_name=hints_template.get("name"), parent_table_name=hints_template.get("parent"), @@ -303,6 +367,7 @@ def merge_hints(self, hints_template: TResourceHints) -> None: incremental=hints_template.get("incremental"), schema_contract=hints_template.get("schema_contract"), table_format=hints_template.get("table_format"), + create_table_variant=create_table_variant, ) @staticmethod @@ -324,7 +389,7 @@ def _merge_key(hint: TColumnProp, keys: TColumnNames, partial: TPartialTableSche keys = [keys] for key in keys: if key in partial["columns"]: - merge_columns(partial["columns"][key], {hint: True}) # type: ignore + merge_column(partial["columns"][key], {hint: True}) # type: ignore else: partial["columns"][key] = new_column(key, nullable=False) partial["columns"][key][hint] = True @@ -347,9 +412,7 @@ def validate_dynamic_hints(template: TResourceHints) -> None: table_name = template.get("name") # if any of the hints is a function, then name must be as well. if any( - callable(v) - for k, v in template.items() - if k not in ["name", "incremental", "validator", "original_columns"] + callable(v) for k, v in template.items() if k not in ["name", *NATURAL_CALLABLES] ) and not callable(table_name): raise InconsistentTableTemplate( f"Table name {table_name} must be a function if any other table hint is a function" diff --git a/dlt/extract/items.py b/dlt/extract/items.py index c6e1f0a4b8..9baf4e444f 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -81,10 +81,10 @@ class SourcePipeItem(NamedTuple): class DataItemWithMeta: - __slots__ = "meta", "data" + __slots__ = ["meta", "data"] - meta: Any - data: TDataItems + # meta: Any + # data: TDataItems def __init__(self, meta: Any, data: TDataItems) -> None: self.meta = meta @@ -92,9 +92,9 @@ def __init__(self, meta: Any, data: TDataItems) -> None: class TableNameMeta: - __slots__ = "table_name" + __slots__ = ["table_name"] - table_name: str + # table_name: str def __init__(self, table_name: str) -> None: self.table_name = table_name diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 0fef502112..4776158bbb 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -63,13 +63,17 @@ def with_table_name(item: TDataItems, table_name: str) -> DataItemWithMeta: return DataItemWithMeta(TableNameMeta(table_name), item) -def with_hints(item: TDataItems, hints: TResourceHints) -> DataItemWithMeta: +def with_hints( + item: TDataItems, hints: TResourceHints, create_table_variant: bool = False +) -> DataItemWithMeta: """Marks `item` to update the resource with specified `hints`. + Will create a separate variant of hints for a table if `name` is provided in `hints` and `create_table_variant` is set. + Create `TResourceHints` with `make_hints`. Setting `table_name` will dispatch the `item` to a specified table, like `with_table_name` """ - return DataItemWithMeta(HintsMeta(hints), item) + return DataItemWithMeta(HintsMeta(hints, create_table_variant), item) class DltResource(Iterable[TDataItem], DltResourceHints): @@ -388,25 +392,29 @@ def add_step( self._pipe.insert_step(item_transform, insert_at) return self - def set_hints(self, table_schema_template: TResourceHints) -> None: - super().set_hints(table_schema_template) - incremental = self.incremental - # try to late assign incremental - if table_schema_template.get("incremental") is not None: - if incremental: - incremental._incremental = table_schema_template["incremental"] - else: - # if there's no wrapper add incremental as a transform - incremental = table_schema_template["incremental"] # type: ignore - self.add_step(incremental) + def _set_hints( + self, table_schema_template: TResourceHints, create_table_variant: bool = False + ) -> None: + super()._set_hints(table_schema_template, create_table_variant) + # validators and incremental apply only to resource hints + if not create_table_variant: + incremental = self.incremental + # try to late assign incremental + if table_schema_template.get("incremental") is not None: + if incremental: + incremental._incremental = table_schema_template["incremental"] + else: + # if there's no wrapper add incremental as a transform + incremental = table_schema_template["incremental"] # type: ignore + self.add_step(incremental) - if incremental: - primary_key = table_schema_template.get("primary_key", incremental.primary_key) - if primary_key is not None: - incremental.primary_key = primary_key + if incremental: + primary_key = table_schema_template.get("primary_key", incremental.primary_key) + if primary_key is not None: + incremental.primary_key = primary_key - if table_schema_template.get("validator") is not None: - self.validator = table_schema_template["validator"] + if table_schema_template.get("validator") is not None: + self.validator = table_schema_template["validator"] def bind(self, *args: Any, **kwargs: Any) -> "DltResource": """Binds the parametrized resource to passed arguments. Modifies resource pipe in place. Does not evaluate generators or iterators.""" diff --git a/dlt/helpers/streamlit_app/utils.py b/dlt/helpers/streamlit_app/utils.py index 6b2dab495c..cf1728c33b 100644 --- a/dlt/helpers/streamlit_app/utils.py +++ b/dlt/helpers/streamlit_app/utils.py @@ -38,9 +38,7 @@ def render_with_pipeline(render_func: Callable[..., None]) -> None: render_func(pipeline) -def query_using_cache( - pipeline: dlt.Pipeline, ttl: int -) -> Callable[..., Optional[pd.DataFrame]]: +def query_using_cache(pipeline: dlt.Pipeline, ttl: int) -> Callable[..., Optional[pd.DataFrame]]: @st.cache_data(ttl=ttl) def do_query( # type: ignore[return] query: str, diff --git a/docs/tools/fix_grammar_gpt.py b/docs/tools/fix_grammar_gpt.py index 1e4cf748dd..051448a2d4 100644 --- a/docs/tools/fix_grammar_gpt.py +++ b/docs/tools/fix_grammar_gpt.py @@ -41,7 +41,10 @@ parser.add_argument( "-f", "--files", - help="Specify the file name. Grammar Checker will filter all .md files containing this string in the filepath.", + help=( + "Specify the file name. Grammar Checker will filter all .md files containing this" + " string in the filepath." + ), type=str, ) diff --git a/tests/extract/test_extract.py b/tests/extract/test_extract.py index b86e198988..1879eaa9eb 100644 --- a/tests/extract/test_extract.py +++ b/tests/extract/test_extract.py @@ -15,6 +15,7 @@ from dlt.extract.extract import ExtractStorage, Extract from dlt.extract.hints import make_hints +from dlt.extract.items import TableNameMeta from tests.utils import clean_test_storage, TEST_STORAGE_ROOT from tests.extract.utils import expect_extracted_file @@ -164,6 +165,52 @@ def with_table_hints(): assert "pk" not in table["columns"] +def test_extract_hints_table_variant(extract_step: Extract) -> None: + os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "TRUE" + + @dlt.resource(primary_key="pk") + def with_table_hints(): + yield dlt.mark.with_hints( + {"id": 1, "pk": "A"}, + make_hints(table_name="table_a", columns=[{"name": "id", "data_type": "bigint"}]), + create_table_variant=True, + ) + # get the resource + resource = dlt.current.source().resources[dlt.current.resource_name()] + assert "table_a" in resource._hints_variants + # get table + table = resource.compute_table_schema(meta=TableNameMeta("table_a")) + assert "pk" in table["columns"] + assert "id" in table["columns"] + assert table["columns"]["pk"]["primary_key"] is True + assert table["columns"]["id"]["data_type"] == "bigint" + + schema = dlt.current.source_schema() + # table table_a will be created + assert "table_a" in schema.tables + schema_table = schema.tables["table_a"] + assert table == schema_table + + # dispatch to table b + yield dlt.mark.with_hints( + {"id": 2, "pk": "B"}, + make_hints(table_name="table_b", write_disposition="replace"), + create_table_variant=True, + ) + assert "table_b" in resource._hints_variants + # get table + table = resource.compute_table_schema(meta=TableNameMeta("table_b")) + assert table["write_disposition"] == "replace" + schema_table = schema.tables["table_b"] + assert table == schema_table + + # item to resource + yield {"id": 3, "pk": "C"} + + source = DltSource(dlt.Schema("hintable"), "module", [with_table_hints]) + extract_step.extract(source, 20, 1) + + # def test_extract_pipe_from_unknown_resource(): # pass diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index d9c73dfb20..6ff1a0bf5f 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -12,6 +12,7 @@ from dlt.common.typing import TDataItems from dlt.extract import DltResource, DltSource, Incremental +from dlt.extract.items import TableNameMeta from dlt.extract.source import DltResourceDict from dlt.extract.exceptions import ( DataItemRequiredForDynamicTableHints, @@ -1362,6 +1363,57 @@ def empty_gen(): assert table["columns"]["tags"] == {"name": "tags"} +def test_apply_hints_table_variants() -> None: + def empty_gen(): + yield [1, 2, 3] + + empty = DltResource.from_data(empty_gen) + + # table name must be a string + with pytest.raises(ValueError): + empty.apply_hints(write_disposition="append", create_table_variant=True) + with pytest.raises(ValueError): + empty.apply_hints( + table_name=lambda ev: ev["t"], write_disposition="append", create_table_variant=True + ) + + # table a with replace + empty.apply_hints(table_name="table_a", write_disposition="replace", create_table_variant=True) + table_a = empty.compute_table_schema(meta=TableNameMeta("table_a")) + assert table_a["name"] == "table_a" + assert table_a["write_disposition"] == "replace" + + # unknown table (without variant) - created out resource hints + table_unk = empty.compute_table_schema(meta=TableNameMeta("table_unk")) + assert table_unk["name"] == "empty_gen" + assert table_unk["write_disposition"] == "append" + + # resource hints are base for table variants + empty.apply_hints( + primary_key="id", + incremental=dlt.sources.incremental(cursor_path="x"), + columns=[{"name": "id", "data_type": "bigint"}], + ) + empty.apply_hints(table_name="table_b", write_disposition="merge", create_table_variant=True) + table_b = empty.compute_table_schema(meta=TableNameMeta("table_b")) + assert table_b["name"] == "table_b" + assert table_b["write_disposition"] == "merge" + assert len(table_b["columns"]) == 1 + assert table_b["columns"]["id"]["primary_key"] is True + # overwrite table_b, remove column def and primary_key + empty.apply_hints(table_name="table_b", columns=[], primary_key=(), create_table_variant=True) + table_b = empty.compute_table_schema(meta=TableNameMeta("table_b")) + assert table_b["name"] == "table_b" + assert table_b["write_disposition"] == "merge" + assert len(table_b["columns"]) == 0 + + # dyn hints not allowed + with pytest.raises(InconsistentTableTemplate): + empty.apply_hints( + table_name="table_b", write_disposition=lambda ev: ev["wd"], create_table_variant=True + ) + + def test_resource_no_template() -> None: empty = DltResource.from_data([1, 2, 3], name="table") assert empty.write_disposition == "append" diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 2f221ac8a0..8d12241c6d 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -441,6 +441,86 @@ def with_mark(): assert p.default_schema.tables["spec_table"]["resource"] == "with_mark" +def test_mark_hints_with_variant() -> None: + @dlt.resource(primary_key="pk") + def with_table_hints(): + # dispatch to table a + yield dlt.mark.with_hints( + {"id": 1, "pk": "A"}, + dlt.mark.make_hints( + table_name="table_a", columns=[{"name": "id", "data_type": "bigint"}] + ), + create_table_variant=True, + ) + + # dispatch to table b + yield dlt.mark.with_hints( + {"id": 2, "pk": "B"}, + dlt.mark.make_hints(table_name="table_b", write_disposition="replace"), + create_table_variant=True, + ) + + # item to resource + yield {"id": 3, "pk": "C"} + # table a with table_hints + yield dlt.mark.with_table_name({"id": 4, "pk": "D"}, "table_a") + # table b with table_hints + yield dlt.mark.with_table_name({"id": 5, "pk": "E"}, "table_b") + + pipeline_name = "pipe_" + uniq_id() + pipeline = dlt.pipeline(pipeline_name=pipeline_name, destination="duckdb") + info = pipeline.run(with_table_hints) + assert_load_info(info) + assert pipeline.last_trace.last_normalize_info.row_counts == { + "_dlt_pipeline_state": 1, + "table_a": 2, + "table_b": 2, + "with_table_hints": 1, + } + # check table counts + assert_data_table_counts(pipeline, {"table_a": 2, "table_b": 2, "with_table_hints": 1}) + + +def test_mark_hints_variant_dynamic_name() -> None: + @dlt.resource(table_name=lambda item: "table_" + item["tag"]) + def with_table_hints(): + # dispatch to table a + yield dlt.mark.with_hints( + {"id": 1, "pk": "A", "tag": "a"}, + dlt.mark.make_hints( + table_name="table_a", + primary_key="pk", + columns=[{"name": "id", "data_type": "bigint"}], + ), + create_table_variant=True, + ) + + # dispatch to table b + yield dlt.mark.with_hints( + {"id": 2, "pk": "B", "tag": "b"}, + dlt.mark.make_hints(table_name="table_b", write_disposition="replace"), + create_table_variant=True, + ) + + # dispatch by tag + yield {"id": 3, "pk": "C", "tag": "c"} + yield {"id": 4, "pk": "D", "tag": "a"} + yield {"id": 5, "pk": "E", "tag": "b"} + + pipeline_name = "pipe_" + uniq_id() + pipeline = dlt.pipeline(pipeline_name=pipeline_name, destination="duckdb") + info = pipeline.run(with_table_hints) + assert_load_info(info) + assert pipeline.last_trace.last_normalize_info.row_counts == { + "_dlt_pipeline_state": 1, + "table_a": 2, + "table_b": 2, + "table_c": 1, + } + # check table counts + assert_data_table_counts(pipeline, {"table_a": 2, "table_b": 2, "table_c": 1}) + + def test_restore_state_on_dummy() -> None: os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately @@ -952,6 +1032,73 @@ def reverse_order(item): ] +def test_preserve_new_fields_order_on_append() -> None: + pipeline_name = "pipe_" + uniq_id() + p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + + item = {"c1": 1, "c2": 2, "c3": "list"} + p.extract([item], table_name="order_1") + p.normalize() + assert list(p.default_schema.get_table_columns("order_1").keys()) == [ + "c1", + "c2", + "c3", + "_dlt_load_id", + "_dlt_id", + ] + + # add columns + item = {"c1": 1, "c4": 2.0, "c3": "list", "c5": {"x": 1}} + p.extract([item], table_name="order_1") + p.normalize() + assert list(p.default_schema.get_table_columns("order_1").keys()) == [ + "c1", + "c2", + "c3", + "_dlt_load_id", + "_dlt_id", + "c4", + "c5__x", + ] + + +def test_preserve_fields_order_incomplete_columns() -> None: + p = dlt.pipeline(pipeline_name="column_order", destination="dummy") + # incomplete columns (without data type) will be added in order of fields in data + + @dlt.resource(columns={"c3": {"precision": 32}}, primary_key="c2") + def items(): + yield {"c1": 1, "c2": 1, "c3": 1} + + p.extract(items) + p.normalize() + assert list(p.default_schema.get_table_columns("items").keys()) == [ + "c1", + "c2", + "c3", + "_dlt_load_id", + "_dlt_id", + ] + + # complete columns preserve order in "columns" + p = p.drop() + + @dlt.resource(columns={"c3": {"precision": 32, "data_type": "decimal"}}, primary_key="c1") + def items2(): + yield {"c1": 1, "c2": 1, "c3": 1} + + p.extract(items2) + p.normalize() + # c3 was first so goes first + assert list(p.default_schema.get_table_columns("items").keys()) == [ + "c3", + "c1", + "c2", + "_dlt_load_id", + "_dlt_id", + ] + + def test_pipeline_log_progress() -> None: os.environ["TIMEOUT"] = "3.0"