Skip to content

Commit

Permalink
splits remote path and remote uri in filesystem load job
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed May 7, 2024
1 parent 3af94e4 commit 0b806f2
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 11 deletions.
1 change: 0 additions & 1 deletion dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,4 @@ def make_file_uri(local_path: str) -> str:
"""
p_ = pathlib.Path(local_path)
p_ = p_.expanduser().resolve()
# return "file:///" + p_.as_posix().lstrip("/")
return p_.as_uri()
14 changes: 12 additions & 2 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import posixpath
import pathlib
import os
import base64
from types import TracebackType
Expand Down Expand Up @@ -59,7 +60,7 @@ def __init__(
schema_name,
load_id,
current_datetime=config.current_datetime,
load_package_timestamp=dlt.current.load_package()["state"]["created_at"], # type: ignore
load_package_timestamp=dlt.current.load_package()["state"]["created_at"],
extra_placeholders=config.extra_placeholders,
)

Expand All @@ -75,13 +76,22 @@ def __init__(
fs_client.put_file(local_path, item)

def make_remote_path(self) -> str:
"""Returns path on the remote filesystem to which copy the file, without scheme. For local filesystem a native path is used"""
# path.join does not normalize separators and available
# normalization functions are very invasive and may string the trailing separator
return self.pathlib.join( # type: ignore[no-any-return]
self.dataset_path,
path_utils.normalize_path_sep(self.pathlib, self.destination_file_name),
)

def make_remote_uri(self) -> str:
"""Returns uri to the remote filesystem to which copy the file"""
remote_path = self.make_remote_path()
if self.is_local_filesystem:
return self.config.make_file_uri(remote_path)
else:
return f"{self.config.protocol}://{remote_path}"

def state(self) -> TLoadJobState:
return "completed"

Expand All @@ -94,7 +104,7 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
jobs = super().create_followup_jobs(final_state)
if final_state == "completed":
ref_job = NewReferenceJob(
file_name=self.file_name(), status="running", remote_path=self.make_remote_path()
file_name=self.file_name(), status="running", remote_path=self.make_remote_uri()
)
jobs.append(ref_job)
return jobs
Expand Down
11 changes: 6 additions & 5 deletions tests/destinations/test_destination_name_and_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import dlt
from dlt.common.configuration.exceptions import ConfigFieldMissingException
from dlt.common.typing import DictStrStr
from dlt.destinations import duckdb, dummy, filesystem
from dlt.common.utils import uniq_id
from dlt.common.storages import FilesystemConfiguration
from dlt.destinations import duckdb, dummy, filesystem

from tests.common.configuration.utils import environment
from tests.utils import TEST_STORAGE_ROOT
Expand Down Expand Up @@ -59,7 +60,7 @@ def test_set_name_and_environment() -> None:
def test_preserve_destination_instance() -> None:
dummy1 = dummy(destination_name="dummy1", environment="dev/null/1")
filesystem1 = filesystem(
posixpath.join("file://", posixpath.abspath(TEST_STORAGE_ROOT)),
FilesystemConfiguration.make_file_uri(TEST_STORAGE_ROOT),
destination_name="local_fs",
environment="devel",
)
Expand Down Expand Up @@ -209,7 +210,7 @@ def test_destination_config_in_name(environment: DictStrStr) -> None:
with pytest.raises(ConfigFieldMissingException):
p.destination_client()

environment["DESTINATION__FILESYSTEM-PROD__BUCKET_URL"] = "file://" + posixpath.abspath(
TEST_STORAGE_ROOT
environment["DESTINATION__FILESYSTEM-PROD__BUCKET_URL"] = FilesystemConfiguration.make_file_uri(
"_storage"
)
assert p.destination_client().fs_path.endswith(TEST_STORAGE_ROOT) # type: ignore[attr-defined]
assert p._fs_client().dataset_path.endswith(p.dataset_name)
8 changes: 5 additions & 3 deletions tests/load/filesystem/test_filesystem_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytest

from dlt.common.time import ensure_pendulum_datetime
from dlt.common.utils import digest128, uniq_id
from dlt.common.storages import FileStorage, ParsedLoadJobFileName

Expand Down Expand Up @@ -55,7 +56,7 @@ def test_successful_load(write_disposition: str, layout: str, with_gdrive_bucket
os.environ.pop("DESTINATION__FILESYSTEM__LAYOUT", None)

dataset_name = "test_" + uniq_id()
timestamp = "2024-04-05T09:16:59.942779Z"
timestamp = ensure_pendulum_datetime("2024-04-05T09:16:59.942779Z")
mocked_timestamp = {"state": {"created_at": timestamp}}
with mock.patch(
"dlt.current.load_package",
Expand Down Expand Up @@ -102,7 +103,8 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non

dataset_name = "test_" + uniq_id()
# NOTE: context manager will delete the dataset at the end so keep it open until the end
timestamp = "2024-04-05T09:16:59.942779Z"
# state is typed now
timestamp = ensure_pendulum_datetime("2024-04-05T09:16:59.942779Z")
mocked_timestamp = {"state": {"created_at": timestamp}}
with mock.patch(
"dlt.current.load_package",
Expand Down Expand Up @@ -173,7 +175,7 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None
dataset_name = "test_" + uniq_id()
# NOTE: context manager will delete the dataset at the end so keep it open until the end
# also we would like to have reliable timestamp for this test so we patch it
timestamp = "2024-04-05T09:16:59.942779Z"
timestamp = ensure_pendulum_datetime("2024-04-05T09:16:59.942779Z")
mocked_timestamp = {"state": {"created_at": timestamp}}
with mock.patch(
"dlt.current.load_package",
Expand Down

0 comments on commit 0b806f2

Please sign in to comment.