diff --git a/dlt/destinations/impl/filesystem/factory.py b/dlt/destinations/impl/filesystem/factory.py index bdc1713e37..3991630819 100644 --- a/dlt/destinations/impl/filesystem/factory.py +++ b/dlt/destinations/impl/filesystem/factory.py @@ -54,7 +54,6 @@ def __init__( datetime_format: strftime formatting for current_datetime extra_params: custom layout parameters, all unknown parameters will be skipped, values can be primitive types or callables which also should return a primitive type. - suffix_fn: a callback if specified will be called to generate a suffix string to final path **kwargs: Additional arguments passed to the destination config """ # TODO: validate parameters diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 2396685314..326fc70309 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -24,8 +24,10 @@ FilesystemDestinationClientConfiguration, ) from dlt.destinations.job_impl import NewReferenceJob -from dlt.destinations import path_utils -from dlt.destinations.impl.filesystem.layout import make_filename +from dlt.destinations.impl.filesystem.layout import ( + get_table_prefix_layout, + make_filename, +) class LoadFilesystemJob(LoadJob): @@ -101,7 +103,7 @@ def __init__( self.config: FilesystemDestinationClientConfiguration = config # verify files layout. we need {table_name} and only allow {schema_name} before it, otherwise tables # cannot be replaced and we cannot initialize folders consistently - self.table_prefix_layout = path_utils.get_table_prefix_layout(config.layout) + self.table_prefix_layout = get_table_prefix_layout(config) self._dataset_path = self.config.normalize_dataset_name(self.schema) def drop_storage(self) -> None: diff --git a/dlt/destinations/impl/filesystem/layout.py b/dlt/destinations/impl/filesystem/layout.py index 84f62823e2..021b7a0ba0 100644 --- a/dlt/destinations/impl/filesystem/layout.py +++ b/dlt/destinations/impl/filesystem/layout.py @@ -1,11 +1,11 @@ import re from types import TracebackType -from typing import Any, Dict, List, Optional, Self, Set, Type +from typing import Any, Dict, List, Optional, Self, Sequence, Set, Type import pendulum from dlt.common.storages.load_package import ParsedLoadJobFileName -from dlt.destinations.exceptions import InvalidFilesystemLayout +from dlt.destinations.exceptions import CantExtractTablePrefix, InvalidFilesystemLayout from dlt.destinations.impl.filesystem.configuration import ( FilesystemDestinationClientConfiguration, ) @@ -33,9 +33,9 @@ class extra_params: def __init__( self, config: FilesystemDestinationClientConfiguration, - job_info: ParsedLoadJobFileName, - schema_name: str, - load_id: str, + job_info: Optional[ParsedLoadJobFileName] = None, + schema_name: Optional[str] = None, + load_id: Optional[str] = None, ) -> None: self.config = config self.job_info = job_info @@ -43,6 +43,12 @@ def __init__( self.schema_name = schema_name self._params = {} + self.table_name = None + self.file_id = None + if job_info: + self.table_name = job_info.table_name + self.file_id = job_info.file_id + def __enter__(self) -> Self: return self @@ -65,40 +71,48 @@ def params(self) -> Optional[Dict[str, Any]]: * file id, * current datetime """ - if self._params: - return self._params - - self._params = { - "load_id": self.load_id, - "file_id": self.job_info.file_id, - "ext": self.job_info.file_format, - "table_name": self.job_info.table_name, - } + if self.load_id: + self._params["load_id"] = self.load_id + + if self.job_info: + self._params.update( + { + "file_id": self.job_info.file_id, + "ext": self.job_info.file_format, + "table_name": self.job_info.table_name, + } + ) + now = self.config.current_datetime or pendulum.now() for key, value in self.config.extra_params.items(): if callable(value): self._params[key] = value( self.schema_name, - self.job_info.table_name, + self.table_name, self.load_id, - self.job_info.file_id, + self.file_id, now, ) else: self._params[key] = value self._params["year"] = now.year - self._params["month"] = now.month - self._params["day"] = now.day - self._params["hour"] = now.hour - self._params["minute"] = now.minute + # month, day, hour and minute padded with 0 + self._params["month"] = now.format("MM") + # Days in format Mon, Tue, Wed + self._params["day"] = now.format("DD") + # Hour in 24h format + self._params["hour"] = now.format("HH") + self._params["minute"] = now.format("mm") + # Day of week + self._params["dow"] = now.format("ddd") self._params["timestamp"] = int(now.timestamp()) # Format curr_date datetime according to given format if self.config.datetime_format: self._params["curr_date"] = now.format(self.config.datetime_format) else: - self._params["curr_date"] = now.isoformat() + self._params["curr_date"] = str(now.today()) return self._params @@ -169,11 +183,52 @@ def make_filename( if "ext" not in placeholders: path += f".{extras.params['ext']}" - if config.suffix: - suffix = config.suffix - if callable(suffix): - suffix = suffix(extras.params) - path += suffix - # import ipdb;ipdb.set_trace() - return path + + +def get_table_prefix_layout( + config: FilesystemDestinationClientConfiguration, + supported_prefix_placeholders: Sequence[str] = SUPPORTED_TABLE_NAME_PREFIX_PLACEHOLDERS, +) -> str: + """get layout fragment that defines positions of the table, cutting other placeholders + + allowed `supported_prefix_placeholders` that may appear before table. + """ + with extra_params(config) as extras, layout_helper( + config.layout, + extras.params, + ) as layout: + # fail if table name is not defined + if "table_name" not in layout.placeholders: + raise CantExtractTablePrefix(layout, "{table_name} placeholder not found. ") + + table_name_index = layout.layout_placeholders.index("table_name") + + # fail if any other prefix is defined before table_name + if [ + p + for p in layout.layout_placeholders[:table_name_index] + if p not in supported_prefix_placeholders + ]: + if len(supported_prefix_placeholders) == 0: + details = ( + "No other placeholders are allowed before {table_name} but you have %s present. " + % layout.layout_placeholders[:table_name_index] + ) + else: + details = "Only %s are allowed before {table_name} but you have %s present. " % ( + supported_prefix_placeholders, + layout.layout_placeholders[:table_name_index], + ) + raise CantExtractTablePrefix(config.layout, details) + + # we include the char after the table_name here, this should be a separator not a new placeholder + # this is to prevent selecting tables that have the same starting name -> {table_name}/ + prefix = config.layout[: config.layout.index("{table_name}") + 13] + + if prefix[-1] == "{": + raise CantExtractTablePrefix( + config.layout, "A separator is required after a {table_name}. " + ) + + return prefix