Skip to content

Commit

Permalink
Implement filesystem layout helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Mar 28, 2024
1 parent ad68db6 commit a6ddbb2
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 42 deletions.
17 changes: 8 additions & 9 deletions dlt/destinations/impl/filesystem/configuration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses

from typing import Callable, Dict, Final, Type, Optional, TypeAlias, Union
from typing import Any, Callable, Dict, Final, Type, Optional, TypeAlias, Union


from pendulum.datetime import DateTime
Expand All @@ -9,9 +9,7 @@
CredentialsConfiguration,
DestinationClientStagingConfiguration,
)
from dlt.common.schema.schema import Schema
from dlt.common.storages import FilesystemConfiguration
from dlt.destinations.path_utils import PathParams, check_layout

TCurrentDatetimeCallback: TypeAlias = Callable[[], DateTime]
"""A callback which should return current datetime"""
Expand All @@ -22,8 +20,10 @@
TDatetimeFormat: TypeAlias = Union[str, TDatetimeFormatterCallback]
"""Datetime format or formatter callback"""

TLayoutParamCallback: TypeAlias = Callable[[Schema, DateTime], str]
"""A callback which should return prepared string value for layout parameter value"""
TLayoutParamCallback: TypeAlias = Callable[[str, str, str, str, DateTime], str]
"""A callback which should return prepared string value for layout parameter value
schema name, table name, load_id, file_id and current_datetime will be passed
"""


@configspec
Expand All @@ -35,8 +35,8 @@ class FilesystemDestinationClientConfiguration(
) # type: ignore
current_datetime: Optional[Union[DateTime, TCurrentDatetimeCallback]] = None
datetime_format: Optional[TDatetimeFormat] = None
layout_params: Optional[Dict[str, Union[str, TLayoutParamCallback]]] = None
suffix: Optional[Union[str, Callable[[PathParams], str]]] = None
extra_params: Optional[Dict[str, Union[str, TLayoutParamCallback]]] = None
suffix: Optional[Union[str, Callable[[Dict[str, Any]], str]]] = None

@resolve_type("credentials")
def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
Expand All @@ -62,7 +62,6 @@ def on_resolved(self) -> None:
"didn't return any instance of pendulum.DateTime"
)


# Validate layout and layout params
check_layout(self.layout, self.layout_params)
# layout_helper(self.layout, self.extra_params).check_layout()
super().on_resolved()
6 changes: 3 additions & 3 deletions dlt/destinations/impl/filesystem/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(
environment: t.Optional[str] = None,
current_datetime: t.Optional[datetime] = None,
datetime_format: t.Optional[str] = None,
layout_params: t.Optional[t.Dict[str, t.Any]] = None,
extra_params: t.Optional[t.Dict[str, t.Any]] = None,
suffix_fn: t.Optional[t.Callable[[PathParams], str]] = None,
**kwargs: t.Any,
) -> None:
Expand All @@ -54,7 +54,7 @@ def __init__(
A dictionary with the credentials parameters can also be provided.
current_datetime: current datetime used instead of datetime generated by dlt.
datetime_format: strftime formatting for current_datetime
layout_params: custom layout parameters, all unknown parameters will be skipped,
extra_params: custom layout parameters, all unknown parameters will be skipped,
values can be primitive types or callables which also should return a primitive type.
suffix_fn: a callback if specified will be called to generate a suffix string to final path
**kwargs: Additional arguments passed to the destination config
Expand All @@ -66,7 +66,7 @@ def __init__(
destination_name=destination_name,
current_datetime=current_datetime,
datetime_format=datetime_format,
layout_params=layout_params,
extra_params=extra_params,
suffix_fn=suffix_fn,
environment=environment,
**kwargs,
Expand Down
74 changes: 44 additions & 30 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
from types import TracebackType
from typing import ClassVar, List, Type, Iterable, Set, Iterator
from typing_extensions import deprecated
from fsspec import AbstractFileSystem
from contextlib import contextmanager

Expand All @@ -20,9 +21,12 @@

from dlt.destinations.job_impl import EmptyLoadJob
from dlt.destinations.impl.filesystem import capabilities
from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration
from dlt.destinations.impl.filesystem.configuration import (
FilesystemDestinationClientConfiguration,
)
from dlt.destinations.job_impl import NewReferenceJob
from dlt.destinations import path_utils
from dlt.destinations.impl.filesystem.layout import make_filename


class LoadFilesystemJob(LoadJob):
Expand All @@ -38,41 +42,41 @@ def __init__(
file_name = FileStorage.get_file_name_from_file_path(local_path)
self.config = config
self.dataset_path = dataset_path
self.destination_file_name = LoadFilesystemJob.make_destination_filename(
config.layout,
config.datetime_format,
config.layout_params,
file_name,
schema_name,
load_id,
)
self.destination_file_name = make_filename(config, file_name, schema_name, load_id)

super().__init__(file_name)
fs_client, _ = fsspec_from_config(config)
self.destination_file_name = LoadFilesystemJob.make_destination_filename(
config.layout,
config.datetime_format,
config.layout_params,
file_name,
schema_name,
load_id,
)
self.destination_file_name = make_filename(config, file_name, schema_name, load_id)

item = self.make_remote_path()
fs_client.put_file(local_path, item)

@staticmethod
@deprecated("Phase out in tests use layout.make_filename")
def make_destination_filename(
layout: str, file_name: str, schema_name: str, load_id: str
config: FilesystemDestinationClientConfiguration,
layout: str,
file_name: str,
schema_name: str,
load_id: str,
) -> str:
job_info = ParsedLoadJobFileName.parse(file_name)
return path_utils.create_path(
layout,
schema_name=schema_name,
table_name=job_info.table_name,
load_id=load_id,
file_id=job_info.file_id,
ext=job_info.file_format,
)
# job_info = ParsedLoadJobFileName.parse(file_name)
# layout_helper = PathLayout(
# PathParams(
# schema_name=schema_name,
# table_name=job_info.table_name,
# load_id=load_id,
# file_id=job_info.file_id,
# ext=job_info.file_format,
# current_date=config.current_datetime or ("NOW"),
# datetime_format=config.datetime_format,
# layout=config.layout,
# extra_params=config.layout_params,
# suffix=config.suffix,
# )
# )
# return layout_helper.create_path()
pass

def make_remote_path(self) -> str:
return (
Expand All @@ -91,7 +95,9 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
jobs = super().create_followup_jobs(final_state)
if final_state == "completed":
ref_job = NewReferenceJob(
file_name=self.file_name(), status="running", remote_path=self.make_remote_path()
file_name=self.file_name(),
status="running",
remote_path=self.make_remote_path(),
)
jobs.append(ref_job)
return jobs
Expand All @@ -104,7 +110,11 @@ class FilesystemClient(JobClientBase, WithStagingDataset):
fs_client: AbstractFileSystem
fs_path: str

def __init__(self, schema: Schema, config: FilesystemDestinationClientConfiguration) -> None:
def __init__(
self,
schema: Schema,
config: FilesystemDestinationClientConfiguration,
) -> None:
super().__init__(schema, config)
self.fs_client, self.fs_path = fsspec_from_config(config)
self.config: FilesystemDestinationClientConfiguration = config
Expand Down Expand Up @@ -186,6 +196,7 @@ def update_stored_schema(
self.fs_client.makedirs(directory, exist_ok=True)
return expected_update

# FIXME: maybe have to fixup to support all PathParams and create_path updates
def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]:
"""Gets unique directories where table data is stored."""
table_dirs: Set[str] = set()
Expand Down Expand Up @@ -224,7 +235,10 @@ def __enter__(self) -> "FilesystemClient":
return self

def __exit__(
self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb: TracebackType
self,
exc_type: Type[BaseException],
exc_val: BaseException,
exc_tb: TracebackType,
) -> None:
pass

Expand Down
179 changes: 179 additions & 0 deletions dlt/destinations/impl/filesystem/layout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import re
from types import TracebackType
from typing import Any, Dict, List, Optional, Self, Set, Type

import pendulum

from dlt.common.storages.load_package import ParsedLoadJobFileName
from dlt.destinations.exceptions import InvalidFilesystemLayout
from dlt.destinations.impl.filesystem.configuration import (
FilesystemDestinationClientConfiguration,
)

# TODO: ensure layout only has supported placeholders
SUPPORTED_PLACEHOLDERS = {
"schema_name",
"table_name",
"load_id",
"file_id",
"ext",
"timestamp",
"curr_date",
"year",
"month",
"day",
"hour",
"minute",
}

SUPPORTED_TABLE_NAME_PREFIX_PLACEHOLDERS = ("schema_name",)


class extra_params:
def __init__(
self,
config: FilesystemDestinationClientConfiguration,
job_info: ParsedLoadJobFileName,
schema_name: str,
load_id: str,
) -> None:
self.config = config
self.job_info = job_info
self.load_id = load_id
self.schema_name = schema_name
self._params = {}

def __enter__(self) -> Self:
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]] = None,
exc_value: Optional[BaseException] = None,
traceback: Optional[TracebackType] = None,
) -> None:
pass

@property
def params(self) -> Optional[Dict[str, Any]]:
"""Process extra params for layout
If any value is a callable then we call it with the following arguments
* schema name,
* table name,
* load id,
* file id,
* current datetime
"""
if self._params:
return self._params

self._params = {
"load_id": self.load_id,
"file_id": self.job_info.file_id,
"ext": self.job_info.file_format,
"table_name": self.job_info.table_name,
}
now = self.config.current_datetime or pendulum.now()
for key, value in self.config.extra_params.items():
if callable(value):
self._params[key] = value(
self.schema_name,
self.job_info.table_name,
self.load_id,
self.job_info.file_id,
now,
)
else:
self._params[key] = value

self._params["year"] = now.year
self._params["month"] = now.month
self._params["day"] = now.day
self._params["hour"] = now.hour
self._params["minute"] = now.minute
self._params["timestamp"] = int(now.timestamp())

# Format curr_date datetime according to given format
if self.config.datetime_format:
self._params["curr_date"] = now.format(self.config.datetime_format)
else:
self._params["curr_date"] = now.isoformat()

return self._params


class layout_helper:
def __init__(
self,
path_layout: str,
params: Dict[str, str],
allowed_placeholders: Optional[Set[str]] = SUPPORTED_PLACEHOLDERS,
) -> None:
self.params = params
self.allowed_placeholders = allowed_placeholders.copy()
self.layout_placeholders = re.findall(r"\{(.*?)\}", path_layout)

def __enter__(self) -> Self:
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]] = None,
exc_value: Optional[BaseException] = None,
traceback: Optional[TracebackType] = None,
) -> None:
pass

@property
def placeholders(self) -> List[str]:
self.check_layout()
return list(self.allowed_placeholders)

def check_layout(self) -> None:
# Build out the list of placeholder names
# which we will use to validate placeholders
# in a given config.layout template
if self.params:
for placeholder, _ in self.params.items():
self.allowed_placeholders.add(placeholder)

# now collect all unknown placeholders from config.layout template
invalid_placeholders = [
p for p in self.layout_placeholders if p not in self.allowed_placeholders
]
if invalid_placeholders:
raise InvalidFilesystemLayout(invalid_placeholders)


def make_filename(
config: FilesystemDestinationClientConfiguration,
file_name: str,
schema_name: str,
load_id: str,
) -> str:
job_info = ParsedLoadJobFileName.parse(file_name)
with extra_params(
config,
job_info,
schema_name,
load_id,
) as extras, layout_helper(
config.layout,
extras.params,
) as layout:
placeholders = layout.placeholders
path = config.layout.format(**extras.params)

# if extension is not defined, we append it at the end
if "ext" not in placeholders:
path += f".{extras.params['ext']}"

if config.suffix:
suffix = config.suffix
if callable(suffix):
suffix = suffix(extras.params)
path += suffix
# import ipdb;ipdb.set_trace()

return path

0 comments on commit a6ddbb2

Please sign in to comment.