Skip to content

Commit

Permalink
Extend layout placeholder params for filesystem destinations (#1182)
Browse files Browse the repository at this point in the history
* Introduce new config fields to filesystem destination configuration

* Merge layout.py with path_utils.py

* Adjust tests

* Fix linting issues and extract common types

* Use pendulum.now if current_datetime is not defined

* Add more layout tests

* Fix failing tests

* Cleanup tests

* Adjust tests

* Enable auto_mkdir for local filesystem

* Format code

* Extract re-usable functions and fix test

* Add invalid layout

* Accept load_package timestamp in create_path

* Collect all files and directories and then delete files first then directories

* Recursively descend and collect files and directories to truncate for filesystem destination

* Mock load_package timestamp and add more test layouts

* Fix linting issues

* Use better variable name to avoid shadowing builtin name

* Fix dummy tests

* Revert changes to filesystem impl

* Use timestamp if it is specified

* Cleanup path_utils and remove redundant code

* Revert factory

* Refactor path_utils and simplify things

* Remove custom datetime format parameter

* Pass load_package_timestamp

* Remove custom datetime format and current_datetime parameter parameter

* Cleanup imports

* Fix path_utils tests

* Make load_package_timestamp optional

* Revert some changes in tests

* Uncomment layout test item

* Revert fs client argument changes

* Fix mypy issue

* Fix linting issues

* Use all aggregated placeholder when checking layout

* Enable auto_mkdir for filesystem config tests

* Enable auto_mkdir for filesystem config tests

* Enable auto_mkdir only for local filesystem

* Add more placeholders

* Remove extra layout options

* Accepts current_datetime

* Adjust type names

* Pass current datetime

* Resolve current_datetime if it is callable

* Fix linting issues

* Parametrize layout check test

* Fix mypy issues

* Fix mypy issues

* Add more tests

* Add more test checks for create_path flow

* Add test flow comment

* Add more tests

* Add test to check callable extra placeholders

* Test if unused layout placeholders are printed

* Adjust timestamp selection logic

* Fix linting issues

* Extend default placeholders and remove redundant ones

* Add quarter placeholder

* Add test case for layout with quarter of the year

* Adjust type alias for placeholder callback

* Simplify code

* Adjust tests

* Validate placeholders in on_resolve

* Avoid assigning current_datetime callback value during validation

* Fix mypy warnings

* Fix linting issues

* Log warning message with yellow foreground

* Remove useless test

* Adjust error message for placeholder callback functions

* Lowercase formatted datetime values

* Adjust comments

* Re-import load_storage

* Adjust logic around timestamp and current datetime

* Fix mypy warnings

* Add test configuraion override when values passed via filesystem factory

* Better logic to handle current timestamp and current datetime

* Add more test checks

* Introduce new InvalidPlaceholderCallback exception

* Raise InvalidPlaceholderCallback instead of plain TypeError

* Fix import ban error

* Add more test cases for path utils layout check

* Adjust text color

* Small cleanup

* Verify path parts and layout parts are equal

* Remove unnecessary log

* Add test with actual pipeline run and checks for callback calls

* Revert conftest changes

* Cleanup and move current_datetime calling inside path_utils

* Adjust test

* Add clarification comment

* Use logger instead of printing out

* Make InvalidPlaceholderCallback of transient kind

* Move tests to new place

* Cleanup

* Add load_package_timestamp placeholder

* Fix mypy warning

* Add pytest-mock

* Adjust tests

* Adjust logic

* Fix mypy issue

* Use spy on logger for a test

* Add test layout example with load_package_timestamp

* Add test layout example with load_package_timestamp in pipeline tests

* Check created paths and assert validity of placeholders

* Rename variable to better fit the context

* Assert arguments in extra placeholder callbacks

* Make invalid placeholders exception more useful

* Assert created path with spy values

* Make error messages better for InvalidFilesystemLayout exception

* Fix mypy errors

* Also check created path

* Run pipeline then create path and check if files exist

* Fix mypy errors

* Check all load packages

* Add more layout samples using custom placeholders

* Add more layout samples with callable extra placeholder

* Add more layout samples with callable extra placeholder

* Remove redundant import

* Check expected paths created by create_path

* Fix mypy issues

* Add explanation comment to ALL_LAYOUTS

* Re-use frozen datetime

* Use dlt.common.pendulum

* Use ensure_pendulum_datetime instead of pendulum.parse

* Fix mypy issues

* Add invalid layout with extra placeholder before table_name

* Adjust exception message from invalid to missing placeholders
  • Loading branch information
sultaniman authored Apr 16, 2024
1 parent 11b0c68 commit 2ce906c
Show file tree
Hide file tree
Showing 13 changed files with 1,279 additions and 107 deletions.
37 changes: 35 additions & 2 deletions dlt/destinations/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,42 @@ def __init__(


class InvalidFilesystemLayout(DestinationTerminalException):
def __init__(self, invalid_placeholders: Sequence[str]) -> None:
def __init__(
self,
layout: str,
expected_placeholders: Sequence[str],
extra_placeholders: Sequence[str],
invalid_placeholders: Sequence[str],
unused_placeholders: Sequence[str],
) -> None:
self.invalid_placeholders = invalid_placeholders
super().__init__(f"Invalid placeholders found in filesystem layout: {invalid_placeholders}")
self.extra_placeholders = extra_placeholders
self.expected_placeholders = expected_placeholders
self.unused_placeholders = unused_placeholders
self.layout = layout

message = (
f"Layout '{layout}' expected {', '.join(expected_placeholders)} placeholders."
f"Missing placeholders: {', '.join(invalid_placeholders)}."
)

if extra_placeholders:
message += f"Extra placeholders specified: {', '.join(extra_placeholders)}."

if unused_placeholders:
message += f"Unused placeholders: {', '.join(unused_placeholders)}."

super().__init__(message)


class InvalidPlaceholderCallback(DestinationTransientException):
def __init__(self, callback_name: str) -> None:
self.callback_name = callback_name
super().__init__(
f"Invalid placeholder callback: {callback_name}, please make sure it can"
" accept parameters the following `schema name`, `table name`,"
" `load_id`, `file_id` and an `extension`",
)


class CantExtractTablePrefix(DestinationTerminalException):
Expand Down
3 changes: 1 addition & 2 deletions dlt/destinations/impl/dummy/dummy.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from contextlib import contextmanager
import random
from contextlib import contextmanager
from copy import copy
from types import TracebackType
from typing import (
ClassVar,
ContextManager,
Dict,
Iterator,
Optional,
Expand Down
28 changes: 25 additions & 3 deletions dlt/destinations/impl/filesystem/configuration.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,41 @@
import dataclasses
from typing import Final, Type, Optional

from typing import Final, List, Optional, Type

from dlt.common import logger
from dlt.common.configuration import configspec, resolve_type
from dlt.common.destination.reference import (
CredentialsConfiguration,
DestinationClientStagingConfiguration,
)

from dlt.common.storages import FilesystemConfiguration
from dlt.destinations.impl.filesystem.typing import TCurrentDateTime, TExtraPlaceholders

from dlt.destinations.path_utils import check_layout, get_unused_placeholders


@configspec
class FilesystemDestinationClientConfiguration(FilesystemConfiguration, DestinationClientStagingConfiguration): # type: ignore[misc]
destination_type: Final[str] = dataclasses.field(default="filesystem", init=False, repr=False, compare=False) # type: ignore
destination_type: Final[str] = dataclasses.field( # type: ignore[misc]
default="filesystem", init=False, repr=False, compare=False
)
current_datetime: Optional[TCurrentDateTime] = None
extra_placeholders: Optional[TExtraPlaceholders] = None

@resolve_type("credentials")
def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
# use known credentials or empty credentials for unknown protocol
return self.PROTOCOL_CREDENTIALS.get(self.protocol) or Optional[CredentialsConfiguration] # type: ignore[return-value]
return (
self.PROTOCOL_CREDENTIALS.get(self.protocol)
or Optional[CredentialsConfiguration] # type: ignore[return-value]
)

def on_resolved(self) -> None:
# Validate layout and show unused placeholders
_, layout_placeholders = check_layout(self.layout, self.extra_placeholders)
unused_placeholders = get_unused_placeholders(
layout_placeholders, list((self.extra_placeholders or {}).keys())
)
if unused_placeholders:
logger.info(f"Found unused layout placeholders: {', '.join(unused_placeholders)}")
37 changes: 18 additions & 19 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from fsspec import AbstractFileSystem
from contextlib import contextmanager

import dlt
from dlt.common import logger
from dlt.common.schema import Schema, TSchemaTables, TTableSchema
from dlt.common.storages import FileStorage, ParsedLoadJobFileName, fsspec_from_config
from dlt.common.storages import FileStorage, fsspec_from_config
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import (
NewLoadJob,
Expand Down Expand Up @@ -38,32 +39,30 @@ def __init__(
file_name = FileStorage.get_file_name_from_file_path(local_path)
self.config = config
self.dataset_path = dataset_path
self.destination_file_name = LoadFilesystemJob.make_destination_filename(
config.layout, file_name, schema_name, load_id
self.destination_file_name = path_utils.create_path(
config.layout,
file_name,
schema_name,
load_id,
current_datetime=config.current_datetime,
load_package_timestamp=dlt.current.load_package()["state"]["created_at"], # type: ignore
extra_placeholders=config.extra_placeholders,
)

super().__init__(file_name)
fs_client, _ = fsspec_from_config(config)
self.destination_file_name = LoadFilesystemJob.make_destination_filename(
config.layout, file_name, schema_name, load_id
self.destination_file_name = path_utils.create_path(
config.layout,
file_name,
schema_name,
load_id,
current_datetime=config.current_datetime,
load_package_timestamp=dlt.current.load_package()["state"]["created_at"], # type: ignore
extra_placeholders=config.extra_placeholders,
)
item = self.make_remote_path()
fs_client.put_file(local_path, item)

@staticmethod
def make_destination_filename(
layout: str, file_name: str, schema_name: str, load_id: str
) -> str:
job_info = ParsedLoadJobFileName.parse(file_name)
return path_utils.create_path(
layout,
schema_name=schema_name,
table_name=job_info.table_name,
load_id=load_id,
file_id=job_info.file_id,
ext=job_info.file_format,
)

def make_remote_path(self) -> str:
return (
f"{self.config.protocol}://{posixpath.join(self.dataset_path, self.destination_file_name)}"
Expand Down
19 changes: 19 additions & 0 deletions dlt/destinations/impl/filesystem/typing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import Callable, Dict, Union

from pendulum.datetime import DateTime
from typing_extensions import TypeAlias


TCurrentDateTimeCallback: TypeAlias = Callable[[], DateTime]
"""A callback function to which should return pendulum.DateTime instance"""

TCurrentDateTime: TypeAlias = Union[DateTime, TCurrentDateTimeCallback]
"""pendulum.DateTime instance or a callable which should return pendulum.DateTime"""

TLayoutPlaceholderCallback: TypeAlias = Callable[[str, str, str, str, str], str]
"""A callback which should return prepared string value the following arguments passed
`schema name`, `table name`, `load_id`, `file_id` and an `extension`
"""

TExtraPlaceholders: TypeAlias = Dict[str, Union[str, TLayoutPlaceholderCallback]]
"""Extra placeholders for filesystem layout"""
Loading

0 comments on commit 2ce906c

Please sign in to comment.