Skip to content

Commit

Permalink
tests windows paths for filesystem destination
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed May 7, 2024
1 parent 45e83ec commit dc6f1ba
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 3 deletions.
1 change: 1 addition & 0 deletions tests/common/storages/test_local_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
(UNC_LOCAL_PATH, pathlib.PureWindowsPath(UNC_LOCAL_PATH).as_uri()),
(UNC_WSL_PATH, pathlib.PureWindowsPath(UNC_WSL_PATH).as_uri()),
(r"C:\hello", "file:///C:/hello"),
# (r"\\?\C:\hello", "file:///C:/hello"),
(r"a\b $\b", "file:///" + pathlib.Path(r"a\\" + quote("b $") + r"\b").resolve().as_posix()),
# same paths but with POSIX separators
(
Expand Down
2 changes: 1 addition & 1 deletion tests/load/filesystem/test_filesystem_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def test_successful_load(write_disposition: str, layout: str, with_gdrive_bucket
) as load_info:
client, jobs, _, load_id = load_info
layout = client.config.layout
dataset_path = posixpath.join(client.fs_path, client.config.dataset_name)
dataset_path = posixpath.join(client.bucket_path, client.config.dataset_name)

# Assert dataset dir exists
assert client.fs_client.isdir(dataset_path)
Expand Down
2 changes: 1 addition & 1 deletion tests/load/filesystem/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def perform_load(

client.initialize_storage(truncate_tables=truncate_tables)
client.update_stored_schema()
root_path = posixpath.join(client.fs_path, client.config.dataset_name)
root_path = posixpath.join(client.bucket_path, client.config.dataset_name)

files = load.load_storage.list_new_jobs(load_id)
try:
Expand Down
65 changes: 64 additions & 1 deletion tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import pathlib
from concurrent.futures import ThreadPoolExecutor
import itertools
import logging
Expand Down Expand Up @@ -37,6 +38,7 @@
from dlt.common.schema import Schema

from dlt.destinations import filesystem, redshift, dummy
from dlt.destinations.impl.filesystem.filesystem import INIT_FILE_NAME
from dlt.extract.exceptions import InvalidResourceDataTypeBasic, PipeGenInvalid, SourceExhausted
from dlt.extract.extract import ExtractStorage
from dlt.extract import DltResource, DltSource
Expand All @@ -47,7 +49,7 @@

from tests.common.utils import TEST_SENTRY_DSN
from tests.common.configuration.utils import environment
from tests.utils import TEST_STORAGE_ROOT
from tests.utils import TEST_STORAGE_ROOT, skipifnotwindows
from tests.extract.utils import expect_extracted_file
from tests.pipeline.utils import (
assert_data_table_counts,
Expand Down Expand Up @@ -2161,3 +2163,64 @@ def test_yielding_empty_list_creates_table() -> None:
rows = list(cur.fetchall())
assert len(rows) == 1
assert rows[0] == (1, None)


@skipifnotwindows
def test_local_filesystem_destination() -> None:
# make it unc path
unc_path = "\\\\localhost\\" + os.path.abspath("_storage").replace(":", "$")
print(unc_path)

dataset_name = "mydata_" + uniq_id()

@dlt.resource
def stateful_resource():
dlt.current.source_state()["mark"] = 1
yield [1, 2, 3]

pipeline = dlt.pipeline(
pipeline_name="local_files",
destination=dlt.destinations.filesystem(unc_path),
dataset_name=dataset_name,
)
info = pipeline.run(stateful_resource(), table_name="numbers", write_disposition="replace")
assert_load_info(info)

info = pipeline.run(stateful_resource(), table_name="numbers", write_disposition="replace")
assert_load_info(info)

pipeline = pipeline.drop()

# must be able to restore the schema and state
pipeline.sync_destination()
assert pipeline.state["sources"]["local_files"]["mark"] == 1
assert pipeline.default_schema.get_table("numbers") is not None

# check all the files, paths may get messed up in many different ways
# and data may land anywhere especially on Windows
expected_dataset = pathlib.Path("_storage").joinpath(dataset_name).resolve()
assert expected_dataset.exists()
assert expected_dataset.is_dir()

# count files in tables
assert expected_dataset.joinpath(INIT_FILE_NAME).is_file()
# one numbers table (replaced)
assert len(list(expected_dataset.joinpath("numbers").glob("*"))) == 1
# two loads + init
assert len(list(expected_dataset.joinpath("_dlt_loads").glob("*"))) == 3
# one schema (dedup on hash)
assert len(list(expected_dataset.joinpath("_dlt_version").glob("*"))) == 2
# one state (not sent twice)
assert len(list(expected_dataset.joinpath("_dlt_pipeline_state").glob("*"))) == 2

fs_client = pipeline._fs_client()
# all path formats we use must lead to "_storage" relative to tests
assert (
pathlib.Path(fs_client.dataset_path).resolve()
== pathlib.Path(unc_path).joinpath(dataset_name).resolve()
)
# same for client
assert len(fs_client.list_table_files("numbers")) == 1
assert len(fs_client.list_table_files("_dlt_loads")) == 2
assert len(fs_client.list_table_files("_dlt_version")) == 1
assert len(fs_client.list_table_files("_dlt_pipeline_state")) == 1

0 comments on commit dc6f1ba

Please sign in to comment.