diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index c53a43f728..c763885e76 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -620,6 +620,7 @@ def get_duckdb( # create all tables in duck instance for ptable in tables: + folder = self.get_table_dir(ptable) files = self.list_table_files(ptable) # discover tables files file_type = os.path.splitext(files[0])[1][1:] @@ -632,7 +633,7 @@ def get_duckdb( # create table protocol = "" if self.is_local_filesystem else f"{self.config.protocol}://" - files_string = ",".join([f"'{protocol}{f}'" for f in files]) + files_string = f"'{protocol}{folder}/**/*.{file_type}'" create_table_sql_base = ( f"CREATE {table_type} {ptable} AS SELECT * FROM {read_command}([{files_string}])" ) diff --git a/tests/load/test_read_interfaces.py b/tests/load/test_read_interfaces.py index da4caecf75..ae2b6c0a43 100644 --- a/tests/load/test_read_interfaces.py +++ b/tests/load/test_read_interfaces.py @@ -14,6 +14,7 @@ def _run_dataset_checks(pipeline: Pipeline) -> None: destination_type = pipeline.destination_client().config.destination_type + skip_df_chunk_size_check = False if destination_type == "bigquery": chunk_size = 50 total_records = 80 @@ -24,6 +25,10 @@ def _run_dataset_checks(pipeline: Pipeline) -> None: chunk_size = 2048 total_records = 3000 + # on filesystem one chunk is one file and not the default vector size + if destination_type == "filesystem": + skip_df_chunk_size_check = True + # we always expect 2 chunks based on the above setup expected_chunk_counts = [chunk_size, total_records - chunk_size] @@ -55,18 +60,16 @@ def items(): # check dataframes # - # full frame - df = relationship.df() - assert len(df.index) == total_records - # chunk df = relationship.df(chunk_size=chunk_size) - assert len(df.index) == chunk_size + if not skip_df_chunk_size_check: + assert len(df.index) == chunk_size assert set(df.columns.values) == {"id", "_dlt_load_id", "_dlt_id"} # iterate all dataframes frames = list(relationship.iter_df(chunk_size=chunk_size)) - assert [len(df.index) for df in frames] == expected_chunk_counts + if not skip_df_chunk_size_check: + assert [len(df.index) for df in frames] == expected_chunk_counts # check all items are present ids = reduce(lambda a, b: a + b, [f["id"].to_list() for f in frames]) @@ -144,7 +147,7 @@ def test_read_interfaces_sql(destination_config: DestinationTestConfiguration) - ) def test_read_interfaces_filesystem(destination_config: DestinationTestConfiguration) -> None: # we force multiple files per table, they may only hold 50 items - os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "30" + os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "700" if destination_config.file_format not in ["parquet", "jsonl"]: pytest.skip(