diff --git a/dlt/common/utils.py b/dlt/common/utils.py index 37b644c0b5..f7f82133f2 100644 --- a/dlt/common/utils.py +++ b/dlt/common/utils.py @@ -566,6 +566,27 @@ def get_exception_trace_chain( return traces +def group_dict_of_lists(input_dict: Dict[str, List[Any]]) -> List[Dict[str, Any]]: + """Decomposes a dictionary with list values into a list of dictionaries with unique keys. + + This function takes an input dictionary where each key maps to a list of objects. + It returns a list of dictionaries, each containing at most one object per key. + The goal is to ensure that no two objects with the same key appear in the same dictionary. + + Parameters: + input_dict (Dict[str, List[Any]]): A dictionary with string keys and list of objects as values. + + Returns: + List[Dict[str, Any]]: A list of dictionaries, each with unique keys and single objects. + """ + max_length = max(len(v) for v in input_dict.values()) + list_of_dicts: List[Dict[str, Any]] = [{} for _ in range(max_length)] + for name, value_list in input_dict.items(): + for idx, obj in enumerate(value_list): + list_of_dicts[idx][name] = obj + return list_of_dicts + + def order_deduped(lst: List[Any]) -> List[Any]: """Returns deduplicated list preserving order of input elements. diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index 1c42dba329..e65f6cf0d0 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -35,7 +35,7 @@ TLoadPackageState, commit_load_package_state, ) -from dlt.common.utils import get_callable_name, get_full_class_name +from dlt.common.utils import get_callable_name, get_full_class_name, group_dict_of_lists from dlt.extract.decorators import SourceInjectableContext, SourceSchemaInjectableContext from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints @@ -97,7 +97,8 @@ def choose_schema() -> Schema: # a list of sources or a list of resources may be passed as data sources: List[DltSource] = [] - resources: List[DltResource] = [] + resources: Dict[str, List[DltResource]] = {} + data_resources: List[DltResource] = [] def append_data(data_item: Any) -> None: if isinstance(data_item, DltSource): @@ -106,13 +107,13 @@ def append_data(data_item: Any) -> None: data_item.schema = schema sources.append(data_item) elif isinstance(data_item, DltResource): - # do not set section to prevent source that represent a standalone resource - # to overwrite other standalone resources (ie. parents) in that source - sources.append(DltSource(effective_schema, "", [data_item])) + # many resources with the same name may be present + r_ = resources.setdefault(data_item.name, []) + r_.append(data_item) else: # iterator/iterable/generator # create resource first without table template - resources.append( + data_resources.append( DltResource.from_data(data_item, name=table_name, section=pipeline.pipeline_name) ) @@ -126,9 +127,17 @@ def append_data(data_item: Any) -> None: else: append_data(data) - # add all the appended resources in one source + # add all appended resource instances in one source if resources: - sources.append(DltSource(effective_schema, pipeline.pipeline_name, resources)) + # decompose into groups so at most single resource with a given name belongs to a group + for r_ in group_dict_of_lists(resources): + # do not set section to prevent source that represent a standalone resource + # to overwrite other standalone resources (ie. parents) in that source + sources.append(DltSource(effective_schema, "", list(r_.values()))) + + # add all the appended data-like items in one source + if data_resources: + sources.append(DltSource(effective_schema, pipeline.pipeline_name, data_resources)) # apply hints and settings for source in sources: diff --git a/tests/common/test_utils.py b/tests/common/test_utils.py index e08c1cdf01..864bce5b91 100644 --- a/tests/common/test_utils.py +++ b/tests/common/test_utils.py @@ -13,6 +13,7 @@ flatten_list_of_str_or_dicts, digest128, graph_edges_to_nodes, + group_dict_of_lists, map_nested_in_place, reveal_pseudo_secret, obfuscate_pseudo_secret, @@ -367,3 +368,39 @@ def test_nested_dict_merge() -> None: mappings_update, {"_config": {"_dsn": dsn, "_dict": {"a": 3}}} ) assert mappings_update == deep_clone_dict_1_mappings + + +def test_group_dict_of_lists_one_element_each_list(): + input_dict = {"Frege": ["obj1"], "Gödel": ["obj2"], "Wittgenstein": ["obj3"]} + result = group_dict_of_lists(input_dict) + assert len(result) == 1 + assert result[0] == {"Frege": "obj1", "Gödel": "obj2", "Wittgenstein": "obj3"} + + +def test_group_dict_of_lists_equal_length_lists(): + input_dict = { + "Frege": ["obj1", "obj2"], + "Gödel": ["obj3", "obj4"], + "Wittgenstein": ["obj5", "obj6"], + } + result = group_dict_of_lists(input_dict) + assert len(result) == 2 + assert result[0] == {"Frege": "obj1", "Gödel": "obj3", "Wittgenstein": "obj5"} + assert result[1] == {"Frege": "obj2", "Gödel": "obj4", "Wittgenstein": "obj6"} + + +def test_group_dict_of_lists_various_length_lists(): + input_dict = { + "Frege": ["obj1", "obj2", "obj3"], + "Gödel": ["obj4", "obj5"], + "Wittgenstein": ["obj6"], + } + result = group_dict_of_lists(input_dict) + assert len(result) == 3 + assert result[0] == {"Frege": "obj1", "Gödel": "obj4", "Wittgenstein": "obj6"} + assert result[1] == {"Frege": "obj2", "Gödel": "obj5"} + assert result[2] == {"Frege": "obj3"} + + # Check if the sizes of the decomposed dicts are decreasing + sizes = [len(d) for d in result] + assert sizes == sorted(sizes, reverse=True), "Sizes of decomposed dicts are not decreasing" diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 9ba933fa7f..73125cbd6c 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1492,7 +1492,7 @@ def generic(start): assert generic(0).with_name("state2").state["start"] == 20 # NOTE: only one resource will be set in table - assert pipeline.default_schema.get_table("single_table")["resource"] == "state2" + assert pipeline.default_schema.get_table("single_table")["resource"] == "state1" # now load only state1 load_info = pipeline.run( @@ -2554,6 +2554,77 @@ def test_import_unknown_file_format() -> None: assert isinstance(inner_ex.__cause__, ValueError) +def test_resource_transformer_standalone() -> None: + # requires that standalone resources are executes in a single source + page = 1 + + @dlt.resource(name="pages") + def gen_pages(): + nonlocal page + while True: + yield {"page": page} + if page == 10: + return + page += 1 + + @dlt.transformer(name="subpages") + def get_subpages(page_item): + yield from [ + { + "page": page_item["page"], + "subpage": subpage, + } + for subpage in range(1, 11) + ] + + pipeline = dlt.pipeline("test_resource_transformer_standalone", destination="duckdb") + # here we must combine resources and transformers using the same instance + info = pipeline.run([gen_pages, gen_pages | get_subpages]) + assert_load_info(info) + # this works because we extract transformer and resource above in a single source so dlt optimizes + # dag and extracts gen_pages only once. + assert load_data_table_counts(pipeline) == {"subpages": 100, "pages": 10} + + # for two separate sources we have the following + page = 1 + schema = Schema("test") + info = pipeline.run( + [DltSource(schema, "", [gen_pages]), DltSource(schema, "", [gen_pages | get_subpages])], + dataset_name="new_dataset", + ) + assert_load_info(info, 2) + # ten subpages because only 1 page is extracted in the second source (see gen_pages exit condition) + assert load_data_table_counts(pipeline) == {"subpages": 10, "pages": 10} + + +def test_resources_same_name_in_single_source() -> None: + source_ids: List[int] = [] + + @dlt.resource(name="pages") + def gen_pages(): + page = 0 + # also store id of current source instance + source_ids.append(id(dlt.current.source())) + while True: + yield {"page": page} + if page == 10: + return + page += 1 + + pipeline = dlt.pipeline("test_resources_same_name_in_single_source", destination="duckdb") + info = pipeline.run([gen_pages(), gen_pages()]) + assert_load_info(info) + # two separate sources + assert len(set(source_ids)) == 2 + + # check against different names + source_ids.clear() + info = pipeline.run([gen_pages().with_name("page_1"), gen_pages().with_name("page_2")]) + assert_load_info(info) + # one source + assert len(set(source_ids)) == 1 + + def test_static_staging_dataset() -> None: # share database and staging dataset duckdb_ = dlt.destinations.duckdb(