diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 33a597f915..b976dec54b 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -38,32 +38,30 @@ def __init__( file_name = FileStorage.get_file_name_from_file_path(local_path) self.config = config self.dataset_path = dataset_path - self.destination_file_name = LoadFilesystemJob.make_destination_filename( - config.layout, file_name, schema_name, load_id + self.destination_file_name = path_utils.create_path( + config, + file_name, + schema_name, + load_id, + current_datetime=config.current_datetime, + datetime_format=config.datetime_format, + extra_placeholders=config.extra_placeholders, ) super().__init__(file_name) fs_client, _ = fsspec_from_config(config) - self.destination_file_name = LoadFilesystemJob.make_destination_filename( - config.layout, file_name, schema_name, load_id + self.destination_file_name = path_utils.create_path( + config, + file_name, + schema_name, + load_id, + current_datetime=config.current_datetime, + datetime_format=config.datetime_format, + extra_placeholders=config.extra_placeholders, ) item = self.make_remote_path() fs_client.put_file(local_path, item) - @staticmethod - def make_destination_filename( - layout: str, file_name: str, schema_name: str, load_id: str - ) -> str: - job_info = ParsedLoadJobFileName.parse(file_name) - return path_utils.create_path( - layout, - schema_name=schema_name, - table_name=job_info.table_name, - load_id=load_id, - file_id=job_info.file_id, - ext=job_info.file_format, - ) - def make_remote_path(self) -> str: return ( f"{self.config.protocol}://{posixpath.join(self.dataset_path, self.destination_file_name)}" @@ -100,7 +98,12 @@ def __init__(self, schema: Schema, config: FilesystemDestinationClientConfigurat self.config: FilesystemDestinationClientConfiguration = config # verify files layout. we need {table_name} and only allow {schema_name} before it, otherwise tables # cannot be replaced and we cannot initialize folders consistently - self.table_prefix_layout = path_utils.get_table_prefix_layout(config.layout) + self.table_prefix_layout = path_utils.get_table_prefix_layout( + config.layout, + current_datetime=config.current_datetime, + datetime_format=config.datetime_format, + extra_placeholders=config.extra_placeholders, + ) self._dataset_path = self.config.normalize_dataset_name(self.schema) def drop_storage(self) -> None: diff --git a/dlt/destinations/path_utils.py b/dlt/destinations/path_utils.py index 047cb274e0..fc024e19f0 100644 --- a/dlt/destinations/path_utils.py +++ b/dlt/destinations/path_utils.py @@ -1,80 +1,276 @@ -# this can probably go some other place, but it is shared by destinations, so for now it is here -from typing import List, Sequence, Tuple +import re +from types import TracebackType +from typing import Any, Dict, List, Optional, Sequence, Set, Type import pendulum -import re +from dlt.cli import echo as fmt +from dlt.common.storages.load_package import ParsedLoadJobFileName +from dlt.destinations.exceptions import CantExtractTablePrefix, InvalidFilesystemLayout +from dlt.destinations.impl.filesystem.configuration import ( + FilesystemDestinationClientConfiguration, +) +from typing_extensions import Self -from dlt.destinations.exceptions import InvalidFilesystemLayout, CantExtractTablePrefix -# TODO: ensure layout only has supported placeholders -SUPPORTED_PLACEHOLDERS = {"schema_name", "table_name", "load_id", "file_id", "ext", "curr_date"} +SUPPORTED_PLACEHOLDERS = { + "schema_name", + "table_name", + "load_id", + "file_id", + "ext", + "curr_date", + "year", + "month", + "day", + "hour", + "minute", + "dow", + "timestamp", +} SUPPORTED_TABLE_NAME_PREFIX_PLACEHOLDERS = ("schema_name",) -def check_layout(layout: str) -> List[str]: - placeholders = get_placeholders(layout) - invalid_placeholders = [p for p in placeholders if p not in SUPPORTED_PLACEHOLDERS] - if invalid_placeholders: - raise InvalidFilesystemLayout(invalid_placeholders) - return placeholders - - def get_placeholders(layout: str) -> List[str]: return re.findall(r"\{(.*?)\}", layout) +class ExtraParams: + def __init__( + self, + current_datetime: Optional[pendulum.DateTime] = None, + datetime_format: Optional[str] = None, + extra_placeholders: Optional[Dict[str, Any]] = None, + job_info: Optional[ParsedLoadJobFileName] = None, + schema_name: Optional[str] = None, + load_id: Optional[str] = None, + ) -> None: + self.current_datetime = current_datetime + self.datetime_format = datetime_format + self.extra_placeholders = extra_placeholders + self.job_info = job_info + self.load_id = load_id + self.schema_name = schema_name + self._params: Dict[str, Any] = {} + + self.table_name = None + self.file_id = None + self.file_format = None + if job_info: + self.table_name = job_info.table_name + self.file_id = job_info.file_id + self.file_format = job_info.file_format + self._params.update( + { + "table_name": self.table_name, + "file_id": self.file_id, + "ext": self.file_format, + } + ) + + if self.load_id: + self._params["load_id"] = self.load_id + + @property + def params(self) -> Optional[Dict[str, Any]]: + """Process extra params for layout + If any value is a callable then we call it with the following arguments + * schema name, + * table name, + * load id, + * file id, + * current datetime + """ + + # If current_datetime is callable + # Then call it and check it's instance + # If the result id DateTime + # Then take it + # Else exit. + if callable(self.current_datetime): + result = self.current_datetime() + if isinstance(result, pendulum.DateTime): + self.current_datetime = result + else: + raise RuntimeError( + "current_datetime was passed as callable but " + "didn't return any instance of pendulum.DateTime" + ) + + self._process_extra_placeholders() + self._add_date_placeholders() + + return self._params + + def _process_extra_placeholders(self) -> None: + # For each callable extra parameter + # otherwise take it's value + if not self.extra_placeholders: + return + + for key, value in self.extra_placeholders.items(): + if callable(value): + try: + self._params[key] = value( + self.schema_name, + self.table_name, + self.load_id, + self.file_id, + self.file_format, + self.current_datetime, + ) + except TypeError: + fmt.secho( + f"Extra placeholder {key} is callableCallable placeholder should accept" + " parameters below`schema name`, `table name`, `load_id`, `file_id`," + " `extension` and `current_datetime`", + fg="red", + ) + raise + else: + self._params[key] = value + + def _add_date_placeholders(self) -> None: + # For formatting options please see + # https://github.com/sdispater/pendulum/blob/master/docs/docs/string_formatting.md + now: pendulum.DateTime = self.current_datetime # type: ignore[assignment] + # Format curr_date datetime according to given format + if self.datetime_format: + self._params["curr_date"] = now.format(self.datetime_format) + else: + self._params["curr_date"] = str(now.date()) + + self._params["year"] = now.year + # month, day, hour and minute padded with 0 + self._params["month"] = now.format("MM") + # Days in format Mon, Tue, Wed + self._params["day"] = now.format("DD") + # Hour in 24h format + self._params["hour"] = now.format("HH") + self._params["minute"] = now.format("mm") + # Day of week + self._params["dow"] = now.format("ddd").lower() + self._params["timestamp"] = int(now.timestamp()) + + +class LayoutHelper: + def __init__( + self, + layout: str, + params: Dict[str, str], + allowed_placeholders: Optional[Set[str]] = SUPPORTED_PLACEHOLDERS, + ) -> None: + self.params = params + self.allowed_placeholders = allowed_placeholders.copy() + self.layout_placeholders = get_placeholders(layout) + + def __enter__(self) -> Self: + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]] = None, + exc_value: Optional[BaseException] = None, + traceback: Optional[TracebackType] = None, + ) -> None: + pass + + @property + def placeholders(self) -> List[str]: + self.check_layout() + return list(self.allowed_placeholders) + + def check_layout(self) -> None: + # Build out the list of placeholder names + # which we will use to validate placeholders + # in a given config.layout template + if self.params: + for placeholder, _ in self.params.items(): + self.allowed_placeholders.add(placeholder) + + # now collect all unknown placeholders from config.layout template + invalid_placeholders = [ + p for p in self.layout_placeholders if p not in self.allowed_placeholders + ] + if invalid_placeholders: + raise InvalidFilesystemLayout(invalid_placeholders) + + def create_path( - layout: str, schema_name: str, table_name: str, load_id: str, file_id: str, ext: str + layout: str, + file_name: str, + schema_name: str, + load_id: str, + current_datetime: Optional[pendulum.DateTime] = None, + datetime_format: Optional[str] = None, + extra_placeholders: Optional[Dict[str, Any]] = None, ) -> str: """create a filepath from the layout and our default params""" - placeholders = check_layout(layout) - path = layout.format( + job_info = ParsedLoadJobFileName.parse(file_name) + extras = ExtraParams( + current_datetime=current_datetime, + datetime_format=datetime_format, + extra_placeholders=extra_placeholders, + job_info=job_info, schema_name=schema_name, - table_name=table_name, load_id=load_id, - file_id=file_id, - ext=ext, - curr_date=str(pendulum.today()), ) - # if extension is not defined, we append it at the end - if "ext" not in placeholders: - path += f".{ext}" - return path + + with LayoutHelper(layout, extras.params) as layout_helper: + placeholders = layout_helper.placeholders + path = layout.format(**extras.params) + + # if extension is not defined, we append it at the end + if "ext" not in placeholders: + path += f".{job_info.file_format}" + + return path def get_table_prefix_layout( layout: str, supported_prefix_placeholders: Sequence[str] = SUPPORTED_TABLE_NAME_PREFIX_PLACEHOLDERS, + current_datetime: Optional[pendulum.DateTime] = None, + datetime_format: Optional[str] = None, + extra_placeholders: Optional[Dict[str, Any]] = None, ) -> str: """get layout fragment that defines positions of the table, cutting other placeholders - allowed `supported_prefix_placeholders` that may appear before table. """ - placeholders = get_placeholders(layout) + extras = ExtraParams( + current_datetime=current_datetime, + datetime_format=datetime_format, + extra_placeholders=extra_placeholders, + ) + with LayoutHelper(layout, extras.params) as layout: + # fail if table name is not defined + if "table_name" not in layout.placeholders: + raise CantExtractTablePrefix(layout, "{table_name} placeholder not found. ") - # fail if table name is not defined - if "table_name" not in placeholders: - raise CantExtractTablePrefix(layout, "{table_name} placeholder not found. ") - table_name_index = placeholders.index("table_name") + table_name_index = layout.layout_placeholders.index("table_name") # fail if any other prefix is defined before table_name - if [p for p in placeholders[:table_name_index] if p not in supported_prefix_placeholders]: + if [ + p + for p in layout.layout_placeholders[:table_name_index] + if p not in supported_prefix_placeholders + ]: if len(supported_prefix_placeholders) == 0: details = ( "No other placeholders are allowed before {table_name} but you have %s present. " - % placeholders[:table_name_index] + % layout.layout_placeholders[:table_name_index] ) else: details = "Only %s are allowed before {table_name} but you have %s present. " % ( supported_prefix_placeholders, - placeholders[:table_name_index], + layout.layout_placeholders[:table_name_index], ) raise CantExtractTablePrefix(layout, details) # we include the char after the table_name here, this should be a separator not a new placeholder - # this is to prevent selecting tables that have the same starting name + # this is to prevent selecting tables that have the same starting name -> {table_name}/ prefix = layout[: layout.index("{table_name}") + 13] + if prefix[-1] == "{": raise CantExtractTablePrefix(layout, "A separator is required after a {table_name}. ")