Skip to content

Commit

Permalink
Define suffix parameter callback
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Mar 27, 2024
1 parent e0c8e09 commit ba320dd
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
25 changes: 12 additions & 13 deletions dlt/destinations/impl/filesystem/configuration.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import inspect
import dataclasses

from typing import Callable, Dict, Final, Type, Optional, TypeAlias, Union, get_args
from typing import Callable, Dict, Final, Type, Optional, TypeAlias, Union

import pendulum

from pendulum.datetime import DateTime
from dlt.common.configuration import configspec, resolve_type
Expand All @@ -13,7 +11,7 @@
)
from dlt.common.schema.schema import Schema
from dlt.common.storages import FilesystemConfiguration
from dlt.destinations.path_utils import check_layout
from dlt.destinations.path_utils import PathParams, check_layout

TCurrentDatetimeCallback: TypeAlias = Callable[[], DateTime]
"""A callback which should return current datetime"""
Expand All @@ -38,6 +36,7 @@ class FilesystemDestinationClientConfiguration(
current_datetime: Optional[Union[DateTime, TCurrentDatetimeCallback]] = None
datetime_format: Optional[TDatetimeFormat] = None
layout_params: Optional[Dict[str, Union[str, TLayoutParamCallback]]] = None
suffix: Optional[Callable[[PathParams], str]] = None

@resolve_type("credentials")
def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
Expand All @@ -54,16 +53,16 @@ def on_resolved(self) -> None:
# if it is not DateTime then exit.
if self.current_datetime is not None:
if callable(self.current_datetime):
signature = inspect.signature(self.current_datetime)
if signature.return_annotation is inspect._empty:
result = self.current_datetime()
if not isinstance(result, DateTime):
raise RuntimeError(
"current_datetime was passed as callable but "
"didn't return any instance of pendulum.DateTime"
)
result = self.current_datetime()
if isinstance(result, DateTime):
self.current_datetime = result
else:
raise RuntimeError(
"current_datetime was passed as callable but "
"didn't return any instance of pendulum.DateTime"
)


# Validate layout and layout params
check_layout(self.layout, self.layout_params)

super().on_resolved()
18 changes: 16 additions & 2 deletions dlt/destinations/path_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,19 @@
from dlt.destinations.exceptions import InvalidFilesystemLayout, CantExtractTablePrefix

# TODO: ensure layout only has supported placeholders
SUPPORTED_PLACEHOLDERS = {"schema_name", "table_name", "load_id", "file_id", "ext", "curr_date"}
SUPPORTED_PLACEHOLDERS = {
"schema_name",
"table_name",
"load_id",
"file_id",
"ext",
"curr_date",
"timestamp",
}

SUPPORTED_TABLE_NAME_PREFIX_PLACEHOLDERS = ("schema_name",)


@dataclass
class PathParams:
layout: str
Expand All @@ -27,6 +36,7 @@ class PathParams:
# called and resulting data provided here
date_format: Optional[str] = None
layout_params: Optional[Dict[str, str]] = None
suffix: Optional[str] = None


def check_layout(layout: str, layout_params: Optional[Dict[str, str]] = None) -> List[str]:
Expand Down Expand Up @@ -56,11 +66,15 @@ def create_path(params: PathParams) -> str:
load_id=params.load_id,
file_id=params.file_id,
ext=params.ext,
curr_date=str(pendulum.today()),
curr_date=params.curr_date.today(),
)
# if extension is not defined, we append it at the end
if "ext" not in placeholders:
path += f".{params.ext}"

if params.suffix:
path += params.suffix

return path


Expand Down

0 comments on commit ba320dd

Please sign in to comment.