Skip to content

Commit

Permalink
Format date variables and move get_table_prefix_layout to layout helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Apr 2, 2024
1 parent 93d8802 commit ef7a2df
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 32 deletions.
1 change: 0 additions & 1 deletion dlt/destinations/impl/filesystem/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def __init__(
datetime_format: strftime formatting for current_datetime
extra_params: custom layout parameters, all unknown parameters will be skipped,
values can be primitive types or callables which also should return a primitive type.
suffix_fn: a callback if specified will be called to generate a suffix string to final path
**kwargs: Additional arguments passed to the destination config
"""
# TODO: validate parameters
Expand Down
8 changes: 5 additions & 3 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
FilesystemDestinationClientConfiguration,
)
from dlt.destinations.job_impl import NewReferenceJob
from dlt.destinations import path_utils
from dlt.destinations.impl.filesystem.layout import make_filename
from dlt.destinations.impl.filesystem.layout import (
get_table_prefix_layout,
make_filename,
)


class LoadFilesystemJob(LoadJob):
Expand Down Expand Up @@ -101,7 +103,7 @@ def __init__(
self.config: FilesystemDestinationClientConfiguration = config
# verify files layout. we need {table_name} and only allow {schema_name} before it, otherwise tables
# cannot be replaced and we cannot initialize folders consistently
self.table_prefix_layout = path_utils.get_table_prefix_layout(config.layout)
self.table_prefix_layout = get_table_prefix_layout(config)
self._dataset_path = self.config.normalize_dataset_name(self.schema)

def drop_storage(self) -> None:
Expand Down
111 changes: 83 additions & 28 deletions dlt/destinations/impl/filesystem/layout.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import re
from types import TracebackType
from typing import Any, Dict, List, Optional, Self, Set, Type
from typing import Any, Dict, List, Optional, Self, Sequence, Set, Type

import pendulum

from dlt.common.storages.load_package import ParsedLoadJobFileName
from dlt.destinations.exceptions import InvalidFilesystemLayout
from dlt.destinations.exceptions import CantExtractTablePrefix, InvalidFilesystemLayout
from dlt.destinations.impl.filesystem.configuration import (
FilesystemDestinationClientConfiguration,
)
Expand Down Expand Up @@ -33,16 +33,22 @@ class extra_params:
def __init__(
self,
config: FilesystemDestinationClientConfiguration,
job_info: ParsedLoadJobFileName,
schema_name: str,
load_id: str,
job_info: Optional[ParsedLoadJobFileName] = None,
schema_name: Optional[str] = None,
load_id: Optional[str] = None,
) -> None:
self.config = config
self.job_info = job_info
self.load_id = load_id
self.schema_name = schema_name
self._params = {}

self.table_name = None
self.file_id = None
if job_info:
self.table_name = job_info.table_name
self.file_id = job_info.file_id

def __enter__(self) -> Self:
return self

Expand All @@ -65,40 +71,48 @@ def params(self) -> Optional[Dict[str, Any]]:
* file id,
* current datetime
"""
if self._params:
return self._params

self._params = {
"load_id": self.load_id,
"file_id": self.job_info.file_id,
"ext": self.job_info.file_format,
"table_name": self.job_info.table_name,
}
if self.load_id:
self._params["load_id"] = self.load_id

if self.job_info:
self._params.update(
{
"file_id": self.job_info.file_id,
"ext": self.job_info.file_format,
"table_name": self.job_info.table_name,
}
)

now = self.config.current_datetime or pendulum.now()
for key, value in self.config.extra_params.items():
if callable(value):
self._params[key] = value(
self.schema_name,
self.job_info.table_name,
self.table_name,
self.load_id,
self.job_info.file_id,
self.file_id,
now,
)
else:
self._params[key] = value

self._params["year"] = now.year
self._params["month"] = now.month
self._params["day"] = now.day
self._params["hour"] = now.hour
self._params["minute"] = now.minute
# month, day, hour and minute padded with 0
self._params["month"] = now.format("MM")
# Days in format Mon, Tue, Wed
self._params["day"] = now.format("DD")
# Hour in 24h format
self._params["hour"] = now.format("HH")
self._params["minute"] = now.format("mm")
# Day of week
self._params["dow"] = now.format("ddd")
self._params["timestamp"] = int(now.timestamp())

# Format curr_date datetime according to given format
if self.config.datetime_format:
self._params["curr_date"] = now.format(self.config.datetime_format)
else:
self._params["curr_date"] = now.isoformat()
self._params["curr_date"] = str(now.today())

return self._params

Expand Down Expand Up @@ -169,11 +183,52 @@ def make_filename(
if "ext" not in placeholders:
path += f".{extras.params['ext']}"

if config.suffix:
suffix = config.suffix
if callable(suffix):
suffix = suffix(extras.params)
path += suffix
# import ipdb;ipdb.set_trace()

return path


def get_table_prefix_layout(
config: FilesystemDestinationClientConfiguration,
supported_prefix_placeholders: Sequence[str] = SUPPORTED_TABLE_NAME_PREFIX_PLACEHOLDERS,
) -> str:
"""get layout fragment that defines positions of the table, cutting other placeholders
allowed `supported_prefix_placeholders` that may appear before table.
"""
with extra_params(config) as extras, layout_helper(
config.layout,
extras.params,
) as layout:
# fail if table name is not defined
if "table_name" not in layout.placeholders:
raise CantExtractTablePrefix(layout, "{table_name} placeholder not found. ")

table_name_index = layout.layout_placeholders.index("table_name")

# fail if any other prefix is defined before table_name
if [
p
for p in layout.layout_placeholders[:table_name_index]
if p not in supported_prefix_placeholders
]:
if len(supported_prefix_placeholders) == 0:
details = (
"No other placeholders are allowed before {table_name} but you have %s present. "
% layout.layout_placeholders[:table_name_index]
)
else:
details = "Only %s are allowed before {table_name} but you have %s present. " % (
supported_prefix_placeholders,
layout.layout_placeholders[:table_name_index],
)
raise CantExtractTablePrefix(config.layout, details)

# we include the char after the table_name here, this should be a separator not a new placeholder
# this is to prevent selecting tables that have the same starting name -> {table_name}/
prefix = config.layout[: config.layout.index("{table_name}") + 13]

if prefix[-1] == "{":
raise CantExtractTablePrefix(
config.layout, "A separator is required after a {table_name}. "
)

return prefix

0 comments on commit ef7a2df

Please sign in to comment.