Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend layout placeholder params for filesystem destinations #1149

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
d64dfc0
Add current_datetime, datetime_format and layout_params to filesystem
sultaniman Mar 26, 2024
a21ea39
Define layout parameter callback types and formatter types
sultaniman Mar 26, 2024
21dda29
Specify precise types
sultaniman Mar 26, 2024
f017465
Move new parameters to fs destination client config
sultaniman Mar 27, 2024
929b0e5
Validate current_datetime
sultaniman Mar 27, 2024
e7b4636
Fix validation logic
sultaniman Mar 27, 2024
6117a20
Validate layout and layout params
sultaniman Mar 27, 2024
43f0634
Define suffix parameter callback
sultaniman Mar 27, 2024
1713e4e
Adjust suffix param type
sultaniman Mar 27, 2024
27604b9
Implement filesystem layout helpers
sultaniman Mar 28, 2024
b34da5d
Revert path_utils
sultaniman Mar 28, 2024
9d7d763
Add FileStorage.get_dir_name_from_file_path
sultaniman Mar 28, 2024
9613ce3
Create missing directories before copying the file
sultaniman Mar 28, 2024
a787c2a
Remove suffix_fn from factory
sultaniman Apr 2, 2024
0c5d58f
Remove old helper function to make destination file names
sultaniman Apr 2, 2024
834b802
Remove unused imports
sultaniman Apr 2, 2024
ccb5d82
Format date variables and move get_table_prefix_layout to layout helpers
sultaniman Apr 2, 2024
f5479e4
Pass constructor arguments via kwargs and override values from config…
sultaniman Apr 2, 2024
e85345f
Add a placeholder for the day of week
sultaniman Apr 2, 2024
7cc7954
Lowercase day of week placeholder
sultaniman Apr 2, 2024
f9c8955
Use now.date() instead of now.today()
sultaniman Apr 2, 2024
247cbca
Small cleanup for extra_params contructor
sultaniman Apr 2, 2024
c1e6644
Use file format from extras
sultaniman Apr 2, 2024
e4609bd
Pass file extension to param callback
sultaniman Apr 2, 2024
d23f51f
Fix mypy errors
sultaniman Apr 2, 2024
ab5504d
Update docstrings
sultaniman Apr 2, 2024
c773c65
Fix current datetime resolution
sultaniman Apr 2, 2024
96f83e1
Cleanup code flow
sultaniman Apr 2, 2024
14beaef
Adjust tests
sultaniman Apr 2, 2024
3bf1e42
Simplify code
sultaniman Apr 3, 2024
39c2ad7
Log a message if any extrace placeholder is not used
sultaniman Apr 3, 2024
ac1a7d2
Adjust docstring
sultaniman Apr 3, 2024
ae2f826
Do not print the message about unused placeholders
sultaniman Apr 3, 2024
bb792a8
Add more layout test samples
sultaniman Apr 3, 2024
9afbdcd
Fix mypy issues
sultaniman Apr 3, 2024
f3af8f5
Pass config to assert_file_matches
sultaniman Apr 3, 2024
bac2e6b
flake: Use pendulum datetime
sultaniman Apr 3, 2024
72c3f5a
Revert changes to factory
sultaniman Apr 3, 2024
6b7a008
Cleanup redundant code and rename extra_params to extra_placeholders
sultaniman Apr 3, 2024
cf37c66
Let fsspec to create destination directory
sultaniman Apr 3, 2024
be5ca1d
Revert filesystem/factory docstrings
sultaniman Apr 3, 2024
33671b4
Set default current_datetime in filesystem factory
sultaniman Apr 3, 2024
fa295c6
Catch TypeError is extract placeholder callable has a signature mismatch
sultaniman Apr 3, 2024
a0cad82
Do not pass config to ExtraParams
sultaniman Apr 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import TYPE_CHECKING, Any, Literal, Optional, Type, get_args, ClassVar, Dict, Union
from typing import Any, ClassVar, Dict, Literal, Optional, Type, get_args, Union
from urllib.parse import urlparse

from dlt.common.configuration import configspec, resolve_type
Expand All @@ -13,6 +13,7 @@
AzureCredentialsWithoutDefaults,
BaseConfiguration,
)

from dlt.common.typing import DictStrAny
from dlt.common.utils import digest128

Expand Down Expand Up @@ -74,7 +75,7 @@ class FilesystemConfiguration(BaseConfiguration):
"adl": Union[AzureCredentialsWithoutDefaults, AzureCredentials],
}

bucket_url: str = None
bucket_url: Optional[str] = None
sultaniman marked this conversation as resolved.
Show resolved Hide resolved

# should be a union of all possible credentials as found in PROTOCOL_CREDENTIALS
credentials: FileSystemCredentials = None
Expand Down
4 changes: 4 additions & 0 deletions dlt/common/storages/file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ def from_relative_path_to_wd(self, relative_path: str) -> str:
def get_file_name_from_file_path(file_path: str) -> str:
return os.path.basename(file_path)

@staticmethod
def get_dir_name_from_file_path(file_path: str) -> str:
return os.path.dirname(file_path)

@staticmethod
def validate_file_name_component(name: str) -> None:
# Universal platform bans several characters allowed in POSIX ie. | < \ or "COM1" :)
Expand Down
60 changes: 56 additions & 4 deletions dlt/destinations/impl/filesystem/configuration.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,71 @@
import dataclasses
from typing import Final, Type, Optional

from typing import Any, Callable, Dict, Final, Type, Optional, Union

from dlt.common.configuration import configspec, resolve_type
from dlt.common.destination.reference import (
CredentialsConfiguration,
DestinationClientStagingConfiguration,
)
from dlt.common.storages import FilesystemConfiguration
from pendulum.datetime import DateTime
from typing_extensions import TypeAlias

TCurrentDatetimeCallback: TypeAlias = Callable[[], DateTime]
"""A callback which should return current datetime"""

TDatetimeFormat: TypeAlias = str
"""Datetime format or formatter callback"""

TLayoutParamCallback: TypeAlias = Callable[[str, str, str, str, str, DateTime], str]
"""A callback which should return prepared string value the following arguments passed
`schema name`, `table name`, `load_id`, `file_id`, `extension` and `current_datetime`.
"""


@configspec
class FilesystemDestinationClientConfiguration(FilesystemConfiguration, DestinationClientStagingConfiguration): # type: ignore[misc]
destination_type: Final[str] = dataclasses.field(default="filesystem", init=False, repr=False, compare=False) # type: ignore
class FilesystemDestinationClientConfiguration( # type: ignore[misc]
FilesystemConfiguration, DestinationClientStagingConfiguration
):
destination_type: Final[str] = dataclasses.field( # type: ignore
default="filesystem", init=False, repr=False, compare=False
)
current_datetime: Optional[Union[DateTime, TCurrentDatetimeCallback]] = None
datetime_format: Optional[TDatetimeFormat] = None
extra_params: Optional[Dict[str, Union[str, TLayoutParamCallback]]] = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra_placeholders would be better


@resolve_type("credentials")
def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
# use known credentials or empty credentials for unknown protocol
return self.PROTOCOL_CREDENTIALS.get(self.protocol) or Optional[CredentialsConfiguration] # type: ignore[return-value]
return (
self.PROTOCOL_CREDENTIALS.get(self.protocol) or Optional[CredentialsConfiguration] # type: ignore[return-value]
)

def on_resolved(self) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you do not need this method at all. config will be populated automatically

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK you need it to validate the layout. at this moment you know all the config values. some come from providers some from factory,

so here you need to validate the layout. see if all placeholders are known. if not we need a really good explanation which are missing etc.

"""Resolve configuration for filesystem destination

The following three variables will override the ones from configuration
if supplied via `filesystem(...)` constructor, additionally
when provided `extra_params` will be merged with the values from configuration
however provided values will always override config.toml.

* current_datetime,
* datetime_format,
* extra_params
"""
if current_datetime := self.kwargs.get("current_datetime"):
self.current_datetime = current_datetime

datetime_format = self.kwargs.get("datetime_format", self.datetime_format)
if datetime_format is not None:
self.datetime_format = datetime_format

extra_params_arg: Dict[str, Any] = self.kwargs.get("extra_params", {})
if not self.extra_params and extra_params_arg:
self.extra_params = {}

if extra_params_arg:
for key, val in extra_params_arg.items():
self.extra_params[key] = val

super().on_resolved()
20 changes: 20 additions & 0 deletions dlt/destinations/impl/filesystem/factory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import typing as t

import pendulum

from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration
from dlt.destinations.impl.filesystem import capabilities
from dlt.common.destination import Destination, DestinationCapabilitiesContext
Expand Down Expand Up @@ -27,6 +29,9 @@ def __init__(
credentials: t.Union[FileSystemCredentials, t.Dict[str, t.Any], t.Any] = None,
destination_name: t.Optional[str] = None,
environment: t.Optional[str] = None,
current_datetime: t.Optional[pendulum.DateTime] = None,
datetime_format: t.Optional[str] = None,
extra_params: t.Optional[t.Dict[str, t.Any]] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you should have the same typing as in config

**kwargs: t.Any,
) -> None:
"""Configure the filesystem destination to use in a pipeline and load data to local or remote filesystem.
Expand All @@ -46,12 +51,27 @@ def __init__(
credentials: Credentials to connect to the filesystem. The type of credentials should correspond to
the bucket protocol. For example, for AWS S3, the credentials should be an instance of `AwsCredentials`.
A dictionary with the credentials parameters can also be provided.
current_datetime: current datetime used instead of datetime generated by dlt.
datetime_format: strftime formatting for current_datetime
extra_params: custom layout parameters, all unknown parameters will be skipped,
values can be primitive types or callables which also should return a primitive type.
**kwargs: Additional arguments passed to the destination config
"""
destinatination_kwargs: t.Dict[str, t.Any] = {}
sultaniman marked this conversation as resolved.
Show resolved Hide resolved
if current_datetime:
destinatination_kwargs["current_datetime"] = current_datetime

if datetime_format:
destinatination_kwargs["datetime_format"] = datetime_format

if extra_params:
destinatination_kwargs["extra_params"] = extra_params
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am passing these three arguments via kwargs so here on resolve I reconcile and can override the values from config.toml.
It doesn't feel right however is it fine @sh-rp @rudolfix ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK factory is tricky and its main goal is to delay instantiation, population of the configs until runtime (ie. load step) while the UX is like you can create destination at any moment.

look at base init

def __init__(self, **kwargs: Any) -> None:
        # Create initial unresolved destination config
        # Argument defaults are filtered out here because we only want arguments passed explicitly
        # to supersede config from the environment or pipeline args
        sig = inspect.signature(self.__class__.__init__)
        params = sig.parameters
        self.config_params = {
            k: v for k, v in kwargs.items() if k not in params or v != params[k].default
        }

it will store all unknown paramters in config_params
and then here

def configuration(self, initial_config: TDestinationConfig) -> TDestinationConfig:
        """Get a fully resolved destination config from the initial config"""
        config = resolve_configuration(
            initial_config,
            sections=(known_sections.DESTINATION, self.destination_name),
            # Already populated values will supersede resolved env config
            explicit_value=self.config_params,
        )
        return config

whenever configuration is created, we use those params as explicit initials

outcome: you do not need any special code to handle that. just pass additional stuff to super() init and it will be pushed to config during resolution

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


super().__init__(
bucket_url=bucket_url,
credentials=credentials,
destination_name=destination_name,
environment=environment,
kwargs=destinatination_kwargs,
**kwargs,
)
69 changes: 38 additions & 31 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from dlt.common import logger
from dlt.common.schema import Schema, TSchemaTables, TTableSchema
from dlt.common.storages import FileStorage, ParsedLoadJobFileName, fsspec_from_config
from dlt.common.storages import FileStorage, fsspec_from_config
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import (
NewLoadJob,
Expand All @@ -20,9 +20,14 @@

from dlt.destinations.job_impl import EmptyLoadJob
from dlt.destinations.impl.filesystem import capabilities
from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration
from dlt.destinations.impl.filesystem.configuration import (
FilesystemDestinationClientConfiguration,
)
from dlt.destinations.job_impl import NewReferenceJob
from dlt.destinations import path_utils
from dlt.destinations.impl.filesystem.layout import (
get_table_prefix_layout,
make_filename,
)


class LoadFilesystemJob(LoadJob):
Expand All @@ -38,36 +43,28 @@ def __init__(
file_name = FileStorage.get_file_name_from_file_path(local_path)
self.config = config
self.dataset_path = dataset_path
self.destination_file_name = LoadFilesystemJob.make_destination_filename(
config.layout, file_name, schema_name, load_id
)
self.destination_file_name = make_filename(config, file_name, schema_name, load_id)

super().__init__(file_name)
fs_client, _ = fsspec_from_config(config)
self.destination_file_name = LoadFilesystemJob.make_destination_filename(
config.layout, file_name, schema_name, load_id
)
self.destination_file_name = make_filename(config, file_name, schema_name, load_id)

# Make sure directory exists before moving file
dir_path = self.make_remote_path(only_dir=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't do that. fsspec does that in put_file. this code is not needed. what was your reason to add it?
OFC this smells like race conditions that's why we try to "create" dirs in initialize. btw. buckets do not have dirs really

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rudolfix the strange thing was that ffspec didn't create directory that's why I added it, I will remove this and see if test pipeline will work out.

fs_client.makedirs(dir_path, exist_ok=True)

# Once directory is created we can move things
item = self.make_remote_path()
fs_client.put_file(local_path, item)

@staticmethod
def make_destination_filename(
layout: str, file_name: str, schema_name: str, load_id: str
) -> str:
job_info = ParsedLoadJobFileName.parse(file_name)
return path_utils.create_path(
layout,
schema_name=schema_name,
table_name=job_info.table_name,
load_id=load_id,
file_id=job_info.file_id,
ext=job_info.file_format,
)

def make_remote_path(self) -> str:
return (
f"{self.config.protocol}://{posixpath.join(self.dataset_path, self.destination_file_name)}"
)
def make_remote_path(self, only_dir: bool = False) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed

if only_dir:
dir_path = FileStorage.get_dir_name_from_file_path(self.destination_file_name)
return f"{self.config.protocol}://{posixpath.join(self.dataset_path, dir_path)}"
else:
return (
f"{self.config.protocol}://{posixpath.join(self.dataset_path, self.destination_file_name)}"
)

def state(self) -> TLoadJobState:
return "completed"
Expand All @@ -81,7 +78,9 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
jobs = super().create_followup_jobs(final_state)
if final_state == "completed":
ref_job = NewReferenceJob(
file_name=self.file_name(), status="running", remote_path=self.make_remote_path()
file_name=self.file_name(),
status="running",
remote_path=self.make_remote_path(),
)
jobs.append(ref_job)
return jobs
Expand All @@ -94,13 +93,17 @@ class FilesystemClient(JobClientBase, WithStagingDataset):
fs_client: AbstractFileSystem
fs_path: str

def __init__(self, schema: Schema, config: FilesystemDestinationClientConfiguration) -> None:
def __init__(
self,
schema: Schema,
config: FilesystemDestinationClientConfiguration,
) -> None:
super().__init__(schema, config)
self.fs_client, self.fs_path = fsspec_from_config(config)
self.config: FilesystemDestinationClientConfiguration = config
# verify files layout. we need {table_name} and only allow {schema_name} before it, otherwise tables
# cannot be replaced and we cannot initialize folders consistently
self.table_prefix_layout = path_utils.get_table_prefix_layout(config.layout)
self.table_prefix_layout = get_table_prefix_layout(config)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll comment on that in next review. this is a bigger chunk. we want to support all paths in append-only and require old layout only when we replace data (we can't guarantee replacement otherwise)

self._dataset_path = self.config.normalize_dataset_name(self.schema)

def drop_storage(self) -> None:
Expand Down Expand Up @@ -176,6 +179,7 @@ def update_stored_schema(
self.fs_client.makedirs(directory, exist_ok=True)
return expected_update

# FIXME: maybe have to fixup to support all PathParams and create_path updates
def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]:
"""Gets unique directories where table data is stored."""
table_dirs: Set[str] = set()
Expand Down Expand Up @@ -214,7 +218,10 @@ def __enter__(self) -> "FilesystemClient":
return self

def __exit__(
self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb: TracebackType
self,
exc_type: Type[BaseException],
exc_val: BaseException,
exc_tb: TracebackType,
) -> None:
pass

Expand Down
Loading
Loading