diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index a8cd3e9422..8e3f65f4de 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -368,8 +368,13 @@ def _get_stored_schema_by_hash_or_newest( return None def _store_current_schema(self, load_id: str) -> None: + # check if schema with hash exists + current_hash = self.schema.stored_version_hash + if self._get_stored_schema_by_hash_or_newest(current_hash): + return + # get paths - hash_path = self._get_schema_file_name(self.schema.stored_version_hash, load_id) + filepath = self._get_schema_file_name(self.schema.stored_version_hash, load_id) # TODO: duplicate of weaviate implementation, should be abstracted out version_info = { @@ -382,7 +387,7 @@ def _store_current_schema(self, load_id: str) -> None: } # we always keep tabs on what the current schema is - self._write_to_json_file(hash_path, version_info) + self._write_to_json_file(filepath, version_info) def get_stored_schema(self) -> Optional[StorageSchemaInfo]: """Retrieves newest schema from destination storage""" diff --git a/tests/load/filesystem/test_filesystem_client.py b/tests/load/filesystem/test_filesystem_client.py index 5d6dbe33ef..1cc0b61856 100644 --- a/tests/load/filesystem/test_filesystem_client.py +++ b/tests/load/filesystem/test_filesystem_client.py @@ -129,6 +129,9 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non for basedir, _dirs, files in client.fs_client.walk( client.dataset_path, detail=False, refresh=True ): + # remove internal paths + if "_dlt" in basedir: + continue for f in files: paths.append(posixpath.join(basedir, f)) ls = set(paths) @@ -166,6 +169,9 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None for basedir, _dirs, files in client.fs_client.walk( client.dataset_path, detail=False, refresh=True ): + # remove internal paths + if "_dlt" in basedir: + continue for f in files: paths.append(posixpath.join(basedir, f)) assert list(sorted(paths)) == expected_files diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index d4e8777d28..0d859df3b0 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -86,11 +86,15 @@ def some_source(): for job in pkg.jobs["completed_jobs"]: assert_file_matches(layout, job, pkg.load_id, client) - complete_fn = f"{client.schema.name}.{LOADS_TABLE_NAME}.%s" + complete_fn = f"{client.schema.name}__%s.jsonl" # Test complete_load markers are saved - assert client.fs_client.isfile(posixpath.join(client.dataset_path, complete_fn % load_id1)) - assert client.fs_client.isfile(posixpath.join(client.dataset_path, complete_fn % load_id2)) + assert client.fs_client.isfile( + posixpath.join(client.dataset_path, client.schema.loads_table_name, complete_fn % load_id1) + ) + assert client.fs_client.isfile( + posixpath.join(client.dataset_path, client.schema.loads_table_name, complete_fn % load_id2) + ) # Force replace pipeline.run(some_source(), write_disposition="replace") diff --git a/tests/load/pipeline/test_replace_disposition.py b/tests/load/pipeline/test_replace_disposition.py index 6efde6e019..09a746433f 100644 --- a/tests/load/pipeline/test_replace_disposition.py +++ b/tests/load/pipeline/test_replace_disposition.py @@ -41,8 +41,6 @@ def test_replace_disposition( # make duckdb to reuse database in working folder os.environ["DESTINATION__DUCKDB__CREDENTIALS"] = "duckdb:///test_replace_disposition.duckdb" - # TODO: start storing _dlt_loads with right json content - increase_loads = lambda x: x if destination_config.destination == "filesystem" else x + 1 increase_state_loads = lambda info: len( [ job @@ -52,11 +50,9 @@ def test_replace_disposition( ] ) - # filesystem does not have versions and child tables + # filesystem does not have child tables, prepend defaults def norm_table_counts(counts: Dict[str, int], *child_tables: str) -> Dict[str, int]: - if destination_config.destination != "filesystem": - return counts - return {**{"_dlt_version": 0}, **{t: 0 for t in child_tables}, **counts} + return {**{t: 0 for t in child_tables}, **counts} dataset_name = "test_replace_strategies_ds" + uniq_id() pipeline = destination_config.setup_pipeline( @@ -108,8 +104,8 @@ def append_items(): assert_load_info(info) # count state records that got extracted state_records = increase_state_loads(info) - dlt_loads: int = increase_loads(0) - dlt_versions: int = increase_loads(0) + dlt_loads: int = 1 + dlt_versions: int = 1 # second run with higher offset so we can check the results offset = 1000 @@ -118,11 +114,11 @@ def append_items(): ) assert_load_info(info) state_records += increase_state_loads(info) - dlt_loads = increase_loads(dlt_loads) + dlt_loads += 1 # we should have all items loaded table_counts = load_table_counts(pipeline, *pipeline.default_schema.tables.keys()) - assert norm_table_counts(table_counts) == { + assert table_counts == { "append_items": 24, # loaded twice "items": 120, "items__sub_items": 240, @@ -166,7 +162,7 @@ def load_items_none(): ) assert_load_info(info) state_records += increase_state_loads(info) - dlt_loads = increase_loads(dlt_loads) + dlt_loads += 1 # table and child tables should be cleared table_counts = load_table_counts(pipeline, *pipeline.default_schema.tables.keys()) @@ -200,8 +196,8 @@ def load_items_none(): assert_load_info(info) new_state_records = increase_state_loads(info) assert new_state_records == 1 - dlt_loads = increase_loads(dlt_loads) - dlt_versions = increase_loads(dlt_versions) + dlt_loads += 1 + dlt_versions += 1 # check trace assert pipeline_2.last_trace.last_normalize_info.row_counts == { "items_copy": 120, @@ -214,18 +210,18 @@ def load_items_none(): assert_load_info(info) new_state_records = increase_state_loads(info) assert new_state_records == 0 - dlt_loads = increase_loads(dlt_loads) + dlt_loads += 1 # new pipeline table_counts = load_table_counts(pipeline_2, *pipeline_2.default_schema.tables.keys()) - assert norm_table_counts(table_counts) == { + assert table_counts == { "append_items": 48, "items_copy": 120, "items_copy__sub_items": 240, "items_copy__sub_items__sub_sub_items": 120, "_dlt_pipeline_state": state_records + 1, "_dlt_loads": dlt_loads, - "_dlt_version": increase_loads(dlt_versions), + "_dlt_version": dlt_versions + 1, } # check trace assert pipeline_2.last_trace.last_normalize_info.row_counts == { @@ -243,7 +239,7 @@ def load_items_none(): "items__sub_items__sub_sub_items": 0, "_dlt_pipeline_state": state_records + 1, "_dlt_loads": dlt_loads, # next load - "_dlt_version": increase_loads(dlt_versions), # new table name -> new schema + "_dlt_version": dlt_versions + 1, # new table name -> new schema } diff --git a/tests/pipeline/utils.py b/tests/pipeline/utils.py index c4e1f5314b..99312bf5dd 100644 --- a/tests/pipeline/utils.py +++ b/tests/pipeline/utils.py @@ -96,11 +96,6 @@ def load_file(path: str, file: str) -> Tuple[str, List[Dict[str, Any]]]: # table name will be last element of path table_name = path.split("/")[-1] - - # skip loads table - if table_name == "_dlt_loads": - return table_name, [] - full_path = posixpath.join(path, file) # load jsonl @@ -157,10 +152,6 @@ def load_files(p: dlt.Pipeline, *table_names: str) -> Dict[str, List[Dict[str, A else: result[table_name] = items - # loads file is special case - if LOADS_TABLE_NAME in table_names and file.find(".{LOADS_TABLE_NAME}."): - result[LOADS_TABLE_NAME] = [] - return result diff --git a/tests/utils.py b/tests/utils.py index 69338b2f72..6defd08f5a 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -62,7 +62,7 @@ # filter out active destinations for current tests ACTIVE_DESTINATIONS = set(dlt.config.get("ACTIVE_DESTINATIONS", list) or IMPLEMENTED_DESTINATIONS) -ACTIVE_DESTINATIONS = {"filesystem"} +ACTIVE_DESTINATIONS = {"filesystem", "duckdb"} ACTIVE_SQL_DESTINATIONS = SQL_DESTINATIONS.intersection(ACTIVE_DESTINATIONS) ACTIVE_NON_SQL_DESTINATIONS = NON_SQL_DESTINATIONS.intersection(ACTIVE_DESTINATIONS)