Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend layout placeholder params for filesystem destinations #1149

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
d64dfc0
Add current_datetime, datetime_format and layout_params to filesystem
sultaniman Mar 26, 2024
a21ea39
Define layout parameter callback types and formatter types
sultaniman Mar 26, 2024
21dda29
Specify precise types
sultaniman Mar 26, 2024
f017465
Move new parameters to fs destination client config
sultaniman Mar 27, 2024
929b0e5
Validate current_datetime
sultaniman Mar 27, 2024
e7b4636
Fix validation logic
sultaniman Mar 27, 2024
6117a20
Validate layout and layout params
sultaniman Mar 27, 2024
43f0634
Define suffix parameter callback
sultaniman Mar 27, 2024
1713e4e
Adjust suffix param type
sultaniman Mar 27, 2024
27604b9
Implement filesystem layout helpers
sultaniman Mar 28, 2024
b34da5d
Revert path_utils
sultaniman Mar 28, 2024
9d7d763
Add FileStorage.get_dir_name_from_file_path
sultaniman Mar 28, 2024
9613ce3
Create missing directories before copying the file
sultaniman Mar 28, 2024
a787c2a
Remove suffix_fn from factory
sultaniman Apr 2, 2024
0c5d58f
Remove old helper function to make destination file names
sultaniman Apr 2, 2024
834b802
Remove unused imports
sultaniman Apr 2, 2024
ccb5d82
Format date variables and move get_table_prefix_layout to layout helpers
sultaniman Apr 2, 2024
f5479e4
Pass constructor arguments via kwargs and override values from config…
sultaniman Apr 2, 2024
e85345f
Add a placeholder for the day of week
sultaniman Apr 2, 2024
7cc7954
Lowercase day of week placeholder
sultaniman Apr 2, 2024
f9c8955
Use now.date() instead of now.today()
sultaniman Apr 2, 2024
247cbca
Small cleanup for extra_params contructor
sultaniman Apr 2, 2024
c1e6644
Use file format from extras
sultaniman Apr 2, 2024
e4609bd
Pass file extension to param callback
sultaniman Apr 2, 2024
d23f51f
Fix mypy errors
sultaniman Apr 2, 2024
ab5504d
Update docstrings
sultaniman Apr 2, 2024
c773c65
Fix current datetime resolution
sultaniman Apr 2, 2024
96f83e1
Cleanup code flow
sultaniman Apr 2, 2024
14beaef
Adjust tests
sultaniman Apr 2, 2024
3bf1e42
Simplify code
sultaniman Apr 3, 2024
39c2ad7
Log a message if any extrace placeholder is not used
sultaniman Apr 3, 2024
ac1a7d2
Adjust docstring
sultaniman Apr 3, 2024
ae2f826
Do not print the message about unused placeholders
sultaniman Apr 3, 2024
bb792a8
Add more layout test samples
sultaniman Apr 3, 2024
9afbdcd
Fix mypy issues
sultaniman Apr 3, 2024
f3af8f5
Pass config to assert_file_matches
sultaniman Apr 3, 2024
bac2e6b
flake: Use pendulum datetime
sultaniman Apr 3, 2024
72c3f5a
Revert changes to factory
sultaniman Apr 3, 2024
6b7a008
Cleanup redundant code and rename extra_params to extra_placeholders
sultaniman Apr 3, 2024
cf37c66
Let fsspec to create destination directory
sultaniman Apr 3, 2024
be5ca1d
Revert filesystem/factory docstrings
sultaniman Apr 3, 2024
33671b4
Set default current_datetime in filesystem factory
sultaniman Apr 3, 2024
fa295c6
Catch TypeError is extract placeholder callable has a signature mismatch
sultaniman Apr 3, 2024
a0cad82
Do not pass config to ExtraParams
sultaniman Apr 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import TYPE_CHECKING, Any, Literal, Optional, Type, get_args, ClassVar, Dict, Union
from typing import Any, ClassVar, Dict, Literal, Optional, Type, get_args, Union
from urllib.parse import urlparse

from dlt.common.configuration import configspec, resolve_type
Expand All @@ -13,6 +13,7 @@
AzureCredentialsWithoutDefaults,
BaseConfiguration,
)

from dlt.common.typing import DictStrAny
from dlt.common.utils import digest128

Expand Down Expand Up @@ -74,7 +75,7 @@ class FilesystemConfiguration(BaseConfiguration):
"adl": Union[AzureCredentialsWithoutDefaults, AzureCredentials],
}

bucket_url: str = None
bucket_url: Optional[str] = None
sultaniman marked this conversation as resolved.
Show resolved Hide resolved

# should be a union of all possible credentials as found in PROTOCOL_CREDENTIALS
credentials: FileSystemCredentials = None
Expand Down
4 changes: 4 additions & 0 deletions dlt/common/storages/file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ def from_relative_path_to_wd(self, relative_path: str) -> str:
def get_file_name_from_file_path(file_path: str) -> str:
return os.path.basename(file_path)

@staticmethod
def get_dir_name_from_file_path(file_path: str) -> str:
return os.path.dirname(file_path)

@staticmethod
def validate_file_name_component(name: str) -> None:
# Universal platform bans several characters allowed in POSIX ie. | < \ or "COM1" :)
Expand Down
31 changes: 27 additions & 4 deletions dlt/destinations/impl/filesystem/configuration.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,42 @@
import dataclasses
from typing import Final, Type, Optional

from typing import Callable, Dict, Final, Type, Optional, Union

from dlt.common.configuration import configspec, resolve_type
from dlt.common.destination.reference import (
CredentialsConfiguration,
DestinationClientStagingConfiguration,
)
from dlt.common.storages import FilesystemConfiguration
from pendulum.datetime import DateTime
from typing_extensions import TypeAlias

TCurrentDatetimeCallback: TypeAlias = Callable[[], DateTime]
"""A callback which should return current datetime"""

TDatetimeFormat: TypeAlias = str
"""Datetime format or formatter callback"""

TLayoutParamCallback: TypeAlias = Callable[[str, str, str, str, str, DateTime], str]
"""A callback which should return prepared string value the following arguments passed
`schema name`, `table name`, `load_id`, `file_id`, `extension` and `current_datetime`.
"""


@configspec
class FilesystemDestinationClientConfiguration(FilesystemConfiguration, DestinationClientStagingConfiguration): # type: ignore[misc]
destination_type: Final[str] = dataclasses.field(default="filesystem", init=False, repr=False, compare=False) # type: ignore
class FilesystemDestinationClientConfiguration( # type: ignore[misc]
FilesystemConfiguration, DestinationClientStagingConfiguration
):
destination_type: Final[str] = dataclasses.field( # type: ignore
default="filesystem", init=False, repr=False, compare=False
)
current_datetime: Optional[Union[DateTime, TCurrentDatetimeCallback]] = None
datetime_format: Optional[TDatetimeFormat] = None
extra_placeholders: Optional[Dict[str, Union[str, TLayoutParamCallback]]] = None

@resolve_type("credentials")
def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
# use known credentials or empty credentials for unknown protocol
return self.PROTOCOL_CREDENTIALS.get(self.protocol) or Optional[CredentialsConfiguration] # type: ignore[return-value]
return (
self.PROTOCOL_CREDENTIALS.get(self.protocol) or Optional[CredentialsConfiguration] # type: ignore[return-value]
)
13 changes: 13 additions & 0 deletions dlt/destinations/impl/filesystem/factory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import typing as t

import pendulum

from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration
from dlt.destinations.impl.filesystem import capabilities
from dlt.common.destination import Destination, DestinationCapabilitiesContext
Expand Down Expand Up @@ -55,3 +57,14 @@ def __init__(
environment=environment,
**kwargs,
)

def configuration(
self, initial_config: FilesystemDestinationClientConfiguration
) -> FilesystemDestinationClientConfiguration:
# If current_datetime is not provided then
# we need to make sure we have it one per
# filesystem destination instance
if not initial_config.current_datetime:
initial_config.current_datetime = pendulum.now()

return super().configuration(initial_config)
51 changes: 24 additions & 27 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from dlt.common import logger
from dlt.common.schema import Schema, TSchemaTables, TTableSchema
from dlt.common.storages import FileStorage, ParsedLoadJobFileName, fsspec_from_config
from dlt.common.storages import FileStorage, fsspec_from_config
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import (
NewLoadJob,
Expand All @@ -20,9 +20,14 @@

from dlt.destinations.job_impl import EmptyLoadJob
from dlt.destinations.impl.filesystem import capabilities
from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration
from dlt.destinations.impl.filesystem.configuration import (
FilesystemDestinationClientConfiguration,
)
from dlt.destinations.job_impl import NewReferenceJob
from dlt.destinations import path_utils
from dlt.destinations.impl.filesystem.layout import (
get_table_prefix_layout,
make_filename,
)


class LoadFilesystemJob(LoadJob):
Expand All @@ -38,32 +43,14 @@ def __init__(
file_name = FileStorage.get_file_name_from_file_path(local_path)
self.config = config
self.dataset_path = dataset_path
self.destination_file_name = LoadFilesystemJob.make_destination_filename(
config.layout, file_name, schema_name, load_id
)
self.destination_file_name = make_filename(config, file_name, schema_name, load_id)

super().__init__(file_name)
fs_client, _ = fsspec_from_config(config)
self.destination_file_name = LoadFilesystemJob.make_destination_filename(
config.layout, file_name, schema_name, load_id
)
self.destination_file_name = make_filename(config, file_name, schema_name, load_id)
item = self.make_remote_path()
fs_client.put_file(local_path, item)

@staticmethod
def make_destination_filename(
layout: str, file_name: str, schema_name: str, load_id: str
) -> str:
job_info = ParsedLoadJobFileName.parse(file_name)
return path_utils.create_path(
layout,
schema_name=schema_name,
table_name=job_info.table_name,
load_id=load_id,
file_id=job_info.file_id,
ext=job_info.file_format,
)

def make_remote_path(self) -> str:
return (
f"{self.config.protocol}://{posixpath.join(self.dataset_path, self.destination_file_name)}"
Expand All @@ -81,7 +68,9 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
jobs = super().create_followup_jobs(final_state)
if final_state == "completed":
ref_job = NewReferenceJob(
file_name=self.file_name(), status="running", remote_path=self.make_remote_path()
file_name=self.file_name(),
status="running",
remote_path=self.make_remote_path(),
)
jobs.append(ref_job)
return jobs
Expand All @@ -94,13 +83,17 @@ class FilesystemClient(JobClientBase, WithStagingDataset):
fs_client: AbstractFileSystem
fs_path: str

def __init__(self, schema: Schema, config: FilesystemDestinationClientConfiguration) -> None:
def __init__(
self,
schema: Schema,
config: FilesystemDestinationClientConfiguration,
) -> None:
super().__init__(schema, config)
self.fs_client, self.fs_path = fsspec_from_config(config)
self.config: FilesystemDestinationClientConfiguration = config
# verify files layout. we need {table_name} and only allow {schema_name} before it, otherwise tables
# cannot be replaced and we cannot initialize folders consistently
self.table_prefix_layout = path_utils.get_table_prefix_layout(config.layout)
self.table_prefix_layout = get_table_prefix_layout(config)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll comment on that in next review. this is a bigger chunk. we want to support all paths in append-only and require old layout only when we replace data (we can't guarantee replacement otherwise)

self._dataset_path = self.config.normalize_dataset_name(self.schema)

def drop_storage(self) -> None:
Expand Down Expand Up @@ -176,6 +169,7 @@ def update_stored_schema(
self.fs_client.makedirs(directory, exist_ok=True)
return expected_update

# FIXME: maybe have to fixup to support all PathParams and create_path updates
def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]:
"""Gets unique directories where table data is stored."""
table_dirs: Set[str] = set()
Expand Down Expand Up @@ -214,7 +208,10 @@ def __enter__(self) -> "FilesystemClient":
return self

def __exit__(
self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb: TracebackType
self,
exc_type: Type[BaseException],
exc_val: BaseException,
exc_tb: TracebackType,
) -> None:
pass

Expand Down
Loading
Loading