Skip to content

Commit

Permalink
Revert path_utils
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Mar 28, 2024
1 parent a6ddbb2 commit 07ffa19
Showing 1 changed file with 16 additions and 51 deletions.
67 changes: 16 additions & 51 deletions dlt/destinations/path_utils.py
Original file line number Diff line number Diff line change
@@ -1,80 +1,45 @@
# this can probably go some other place, but it is shared by destinations, so for now it is here
from dataclasses import dataclass
from typing import Dict, List, Optional, Sequence
from typing import List, Sequence

import pendulum
import re

from dlt.destinations.exceptions import InvalidFilesystemLayout, CantExtractTablePrefix

# TODO: ensure layout only has supported placeholders
SUPPORTED_PLACEHOLDERS = {
"schema_name",
"table_name",
"load_id",
"file_id",
"ext",
"curr_date",
"timestamp",
}
SUPPORTED_PLACEHOLDERS = {"schema_name", "table_name", "load_id", "file_id", "ext", "curr_date"}

SUPPORTED_TABLE_NAME_PREFIX_PLACEHOLDERS = ("schema_name",)


@dataclass
class PathParams:
layout: str
schema_name: str
table_name: str
load_id: str
file_id: str
ext: str
curr_date: Optional[pendulum.DateTime] = None
# Below here we expect prepared parameters
# so they are already resolved from filesystem
# configuration and parameters, all callbacks
# called and resulting data provided here
date_format: Optional[str] = None
layout_params: Optional[Dict[str, str]] = None
suffix: Optional[str] = None


def check_layout(layout: str, layout_params: Optional[Dict[str, str]] = None) -> List[str]:
def check_layout(layout: str) -> List[str]:
placeholders = get_placeholders(layout)
supported_placeholders = SUPPORTED_PLACEHOLDERS.copy()
if layout_params:
for placeholder, _ in layout_params.items():
supported_placeholders.add(placeholder)

invalid_placeholders = [p for p in placeholders if p not in supported_placeholders]
invalid_placeholders = [p for p in placeholders if p not in SUPPORTED_PLACEHOLDERS]
if invalid_placeholders:
raise InvalidFilesystemLayout(invalid_placeholders)

return placeholders


def get_placeholders(layout: str) -> List[str]:
return re.findall(r"\{(.*?)\}", layout)


def create_path(params: PathParams) -> str:
def create_path(
layout: str, schema_name: str, table_name: str, load_id: str, file_id: str, ext: str
) -> str:
"""create a filepath from the layout and our default params"""
placeholders = check_layout(params.layout, layout_params=params.layout_params)
path = params.layout.format(
schema_name=params.schema_name,
table_name=params.table_name,
load_id=params.load_id,
file_id=params.file_id,
ext=params.ext,
curr_date=params.curr_date.today(),
placeholders = check_layout(layout)
path = layout.format(
schema_name=schema_name,
table_name=table_name,
load_id=load_id,
file_id=file_id,
ext=ext,
curr_date=str(pendulum.today()),
)
# if extension is not defined, we append it at the end
if "ext" not in placeholders:
path += f".{params.ext}"

if params.suffix:
path += params.suffix

path += f".{ext}"
return path


Expand Down

0 comments on commit 07ffa19

Please sign in to comment.