From f4d294974fbcd4ef989a649d1fccb08b788fc5eb Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 3 Apr 2024 17:51:55 +0200 Subject: [PATCH] Fix linting issues and extract common types --- .../impl/filesystem/configuration.py | 18 +++++-------- .../impl/filesystem/filesystem.py | 4 +-- dlt/destinations/impl/filesystem/typing.py | 19 ++++++++++++++ dlt/destinations/path_utils.py | 25 +++++++++---------- 4 files changed, 39 insertions(+), 27 deletions(-) create mode 100644 dlt/destinations/impl/filesystem/typing.py diff --git a/dlt/destinations/impl/filesystem/configuration.py b/dlt/destinations/impl/filesystem/configuration.py index 4a856581ae..d6cafbdf83 100644 --- a/dlt/destinations/impl/filesystem/configuration.py +++ b/dlt/destinations/impl/filesystem/configuration.py @@ -1,5 +1,5 @@ import dataclasses -from typing import Callable, Dict, Final, Optional, Type, Union +from typing import Dict, Final, Optional, Type, Union from dlt.common.configuration import configspec, resolve_type from dlt.common.destination.reference import ( @@ -9,18 +9,12 @@ from dlt.common.storages import FilesystemConfiguration from pendulum.datetime import DateTime -from typing_extensions import TypeAlias -TCurrentDatetimeCallback: TypeAlias = Callable[[], DateTime] -"""A callback which should return current datetime""" - -TDatetimeFormat: TypeAlias = str -"""Datetime format or formatter callback""" - -TLayoutParamCallback: TypeAlias = Callable[[str, str, str, str, str, DateTime], str] -"""A callback which should return prepared string value the following arguments passed -`schema name`, `table name`, `load_id`, `file_id`, `extension` and `current_datetime`. -""" +from dlt.destinations.impl.filesystem.typing import ( + TCurrentDatetimeCallback, + TDatetimeFormat, + TLayoutParamCallback, +) @configspec diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index b976dec54b..60a16276f6 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -39,7 +39,7 @@ def __init__( self.config = config self.dataset_path = dataset_path self.destination_file_name = path_utils.create_path( - config, + config.layout, file_name, schema_name, load_id, @@ -51,7 +51,7 @@ def __init__( super().__init__(file_name) fs_client, _ = fsspec_from_config(config) self.destination_file_name = path_utils.create_path( - config, + config.layout, file_name, schema_name, load_id, diff --git a/dlt/destinations/impl/filesystem/typing.py b/dlt/destinations/impl/filesystem/typing.py new file mode 100644 index 0000000000..6a273ace06 --- /dev/null +++ b/dlt/destinations/impl/filesystem/typing.py @@ -0,0 +1,19 @@ +from typing import Callable, Optional, Union + +from pendulum.datetime import DateTime +from typing_extensions import TypeAlias + + +TCurrentDatetimeCallback: TypeAlias = Callable[[], DateTime] +"""A callback which should return current datetime""" + +TCurrentDateTime: TypeAlias = Optional[Union[DateTime, TCurrentDatetimeCallback]] +"""pendulum.DateTime instance or a callable which should return pendulum.DateTime""" + +TDatetimeFormat: TypeAlias = str +"""Datetime format or formatter callback""" + +TLayoutParamCallback: TypeAlias = Callable[[str, str, str, str, str, DateTime], str] +"""A callback which should return prepared string value the following arguments passed +`schema name`, `table name`, `load_id`, `file_id`, `extension` and `current_datetime`. +""" diff --git a/dlt/destinations/path_utils.py b/dlt/destinations/path_utils.py index fc024e19f0..0dc0b76db6 100644 --- a/dlt/destinations/path_utils.py +++ b/dlt/destinations/path_utils.py @@ -3,14 +3,14 @@ from typing import Any, Dict, List, Optional, Sequence, Set, Type import pendulum + from dlt.cli import echo as fmt from dlt.common.storages.load_package import ParsedLoadJobFileName from dlt.destinations.exceptions import CantExtractTablePrefix, InvalidFilesystemLayout -from dlt.destinations.impl.filesystem.configuration import ( - FilesystemDestinationClientConfiguration, -) from typing_extensions import Self +from dlt.destinations.impl.filesystem.typing import TCurrentDateTime + SUPPORTED_PLACEHOLDERS = { "schema_name", @@ -38,7 +38,7 @@ def get_placeholders(layout: str) -> List[str]: class ExtraParams: def __init__( self, - current_datetime: Optional[pendulum.DateTime] = None, + current_datetime: TCurrentDateTime = None, datetime_format: Optional[str] = None, extra_placeholders: Optional[Dict[str, Any]] = None, job_info: Optional[ParsedLoadJobFileName] = None, @@ -201,7 +201,7 @@ def create_path( file_name: str, schema_name: str, load_id: str, - current_datetime: Optional[pendulum.DateTime] = None, + current_datetime: TCurrentDateTime = None, datetime_format: Optional[str] = None, extra_placeholders: Optional[Dict[str, Any]] = None, ) -> str: @@ -230,7 +230,7 @@ def create_path( def get_table_prefix_layout( layout: str, supported_prefix_placeholders: Sequence[str] = SUPPORTED_TABLE_NAME_PREFIX_PLACEHOLDERS, - current_datetime: Optional[pendulum.DateTime] = None, + current_datetime: TCurrentDateTime = None, datetime_format: Optional[str] = None, extra_placeholders: Optional[Dict[str, Any]] = None, ) -> str: @@ -242,35 +242,34 @@ def get_table_prefix_layout( datetime_format=datetime_format, extra_placeholders=extra_placeholders, ) - with LayoutHelper(layout, extras.params) as layout: + with LayoutHelper(layout, extras.params) as layout_helper: # fail if table name is not defined - if "table_name" not in layout.placeholders: + if "table_name" not in layout_helper.placeholders: raise CantExtractTablePrefix(layout, "{table_name} placeholder not found. ") - table_name_index = layout.layout_placeholders.index("table_name") + table_name_index = layout_helper.layout_placeholders.index("table_name") # fail if any other prefix is defined before table_name if [ p - for p in layout.layout_placeholders[:table_name_index] + for p in layout_helper.layout_placeholders[:table_name_index] if p not in supported_prefix_placeholders ]: if len(supported_prefix_placeholders) == 0: details = ( "No other placeholders are allowed before {table_name} but you have %s present. " - % layout.layout_placeholders[:table_name_index] + % layout_helper.layout_placeholders[:table_name_index] ) else: details = "Only %s are allowed before {table_name} but you have %s present. " % ( supported_prefix_placeholders, - layout.layout_placeholders[:table_name_index], + layout_helper.layout_placeholders[:table_name_index], ) raise CantExtractTablePrefix(layout, details) # we include the char after the table_name here, this should be a separator not a new placeholder # this is to prevent selecting tables that have the same starting name -> {table_name}/ prefix = layout[: layout.index("{table_name}") + 13] - if prefix[-1] == "{": raise CantExtractTablePrefix(layout, "A separator is required after a {table_name}. ")