Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Delta table support for filesystem destination #1382

Merged
merged 50 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
683b35c
add delta table support for filesystem destination
May 17, 2024
a650de7
Merge branch 'refs/heads/devel' into 978-filesystem-delta-table
May 18, 2024
d66cbb2
Merge branch 'devel' of https://github.com/dlt-hub/dlt into 978-files…
May 18, 2024
6e3dced
remove duplicate method definition
May 18, 2024
b241e8c
make property robust
May 18, 2024
10185df
exclude high-precision decimal columns
May 18, 2024
574215f
make delta imports conditional
May 18, 2024
ae03815
include pyarrow in deltalake dependency
May 18, 2024
88cbfcf
install extra deltalake dependency
May 18, 2024
b83ca8b
disable high precision decimal arrow test columns by default
May 19, 2024
b8d2967
include arrow max precision decimal column
May 19, 2024
7a38470
introduce directory job and refactor delta table code
jorritsandbrink May 26, 2024
418d8a8
refactor delta table load
jorritsandbrink May 29, 2024
fad4ff0
revert import changes
jorritsandbrink May 29, 2024
8134aab
Merge branch 'devel' of https://github.com/dlt-hub/dlt into 978-files…
jorritsandbrink May 30, 2024
91716df
add delta table format child table handling
jorritsandbrink May 30, 2024
8bdb93f
make table_format key lookups robust
jorritsandbrink May 30, 2024
0a32c44
write remote path to reference file
jorritsandbrink May 30, 2024
0fd7e3e
add supported table formats and file format adapter to destination ca…
jorritsandbrink May 31, 2024
e9282ea
remove jsonl and parquet from table formats
jorritsandbrink May 31, 2024
c87d68e
add object_store rust crate credentials handling
jorritsandbrink Jun 2, 2024
1e341cf
add deltalake_storage_options to filesystem config
jorritsandbrink Jun 2, 2024
2e04cff
move function to top level to prevent multiprocessing pickle error
jorritsandbrink Jun 3, 2024
0240c39
add new deltalake_storage_options filesystem config key to tests
jorritsandbrink Jun 3, 2024
f47de39
replace secrets with dummy values in test
jorritsandbrink Jun 3, 2024
1d9b968
reorganize object_store rust crate credentials tests
jorritsandbrink Jun 3, 2024
12e03ec
add delta table format docs
jorritsandbrink Jun 3, 2024
681ae48
move delta table logical delete logic to filesystem client
jorritsandbrink Jun 4, 2024
83745fa
rename pyarrow lib method names
jorritsandbrink Jun 4, 2024
72553d6
rename utils to delta_utils
jorritsandbrink Jun 4, 2024
3f41402
import pyarrow from dlt common libs
jorritsandbrink Jun 4, 2024
a49b23d
move delta lake utitilities to module in dlt common libs
jorritsandbrink Jun 4, 2024
5b9071f
import delta lake utils early to assert dependencies availability
jorritsandbrink Jun 4, 2024
6f76587
handle file format adaptation at table level
jorritsandbrink Jun 4, 2024
bae5266
Merge branch 'devel' of https://github.com/dlt-hub/dlt into 978-files…
jorritsandbrink Jun 4, 2024
70fddc3
initialize file format variables
jorritsandbrink Jun 4, 2024
1a10d2d
split delta table format tests
jorritsandbrink Jun 4, 2024
e1e4772
handle table schema is None case
jorritsandbrink Jun 5, 2024
d25ebc4
add test for dynamic dispatching of delta tables
jorritsandbrink Jun 5, 2024
4420a36
mark core delta table test as essential
jorritsandbrink Jun 5, 2024
0bde8b9
simplify item normalizer dict key
jorritsandbrink Jun 5, 2024
86ab9ff
make list copy to prevent in place mutations
jorritsandbrink Jun 5, 2024
9a302dc
add extra deltalake dependency
jorritsandbrink Jun 5, 2024
6c3c8c7
only test deltalake lib on local filesystem
jorritsandbrink Jun 5, 2024
e17a54b
Merge branch 'devel' into 978-filesystem-delta-table
rudolfix Jun 5, 2024
bcfb418
properly evaluates lazy annotations
rudolfix Jun 5, 2024
8f7831f
uses base FilesystemConfiguration from common in libs
rudolfix Jun 5, 2024
e33af63
solves union type reordering due to caching and clash with delta-rs D…
rudolfix Jun 5, 2024
cdeefd2
creates a table with just root name to cache item normalizers properly
rudolfix Jun 5, 2024
a75a0f9
Merge branch 'devel' into 978-filesystem-delta-table
rudolfix Jun 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli --with sentry-sdk --with pipeline
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli --with sentry-sdk --with pipeline -E deltalake

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate --with sentry-sdk --with pipeline
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate --with sentry-sdk --with pipeline -E deltalake

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
20 changes: 19 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,20 @@ def exception(self) -> str:
pass


class DirectoryLoadJob:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minimal for now. Want to get some feedback before further polishing.

"""Job that loads a directory of files in a single transaction."""

def __init__(self, dir_name: str) -> None:
self._dir_name = dir_name

def dir_name(self) -> str:
"""Returns name of directory containing the job files."""
return self._dir_name

def job_id(self) -> str:
return "hacked_job_id"


class NewLoadJob(LoadJob):
"""Adds a trait that allows to save new job file"""

Expand Down Expand Up @@ -309,8 +323,12 @@ def restore_file_load(self, file_path: str) -> LoadJob:
"""Finds and restores already started loading job identified by `file_path` if destination supports it."""
pass

def can_do_logical_replace(self, table: TTableSchema) -> bool:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this can become a destination capability if we turn Delta into a full destination.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we do not need this on the highest abstraction level. This belongs only to filesystem client and there it is enough to override should_truncate_table_before_load.

i'm trying to keep the abstract classes as simple as possible. two methods below are already a stretch (but I do not have an idea where to move them)

"""Returns True if `replace` can be done without physically deleting data."""
return table["table_format"] == "delta"

def should_truncate_table_before_load(self, table: TTableSchema) -> bool:
return table["write_disposition"] == "replace"
return table["write_disposition"] == "replace" and not self.can_do_logical_replace(table)

def create_table_chain_completed_followup_jobs(
self, table_chain: Sequence[TTableSchema]
Expand Down
21 changes: 21 additions & 0 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

TAnyArrowItem = Union[pyarrow.Table, pyarrow.RecordBatch]

ARROW_DECIMAL_MAX_PRECISION = 76


def get_py_arrow_datatype(
column: TColumnType,
Expand Down Expand Up @@ -397,6 +399,25 @@ def pq_stream_with_new_columns(
yield tbl


def adjust_arrow_schema(
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
schema: pyarrow.Schema,
type_map: Dict[Callable[[pyarrow.DataType], bool], Callable[..., pyarrow.DataType]],
) -> pyarrow.Schema:
"""Returns adjusted Arrow schema.

Replaces data types for fields matching a type check in `type_map`.
Type check functions in `type_map` are assumed to be mutually exclusive, i.e.
a data type does not match more than one type check function.
"""
for i, e in enumerate(schema.types):
for type_check, cast_type in type_map.items():
if type_check(e):
adjusted_field = schema.field(i).with_type(cast_type)
schema = schema.set(i, adjusted_field)
break # if type matches type check, do not do other type checks
return schema


class NameNormalizationClash(ValueError):
def __init__(self, reason: str) -> None:
msg = f"Arrow column name clash after input data normalization. {reason}"
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
"dedup_sort",
]
"""Known hints of a column used to declare hint regexes."""
TTableFormat = Literal["iceberg", "parquet", "jsonl"]
TTableFormat = Literal["iceberg", "parquet", "jsonl", "delta"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should kick out "parquet" and "jsonl" from here. athena uses jsonl but does it badly (and should use file format simply). we can even comment out the part that creates jsonl tables in athena.

elif table_format == "jsonl":
                sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name}
                        ({columns})
                        ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
                        LOCATION '{location}';""")

why: to have a clear distinction between file format and table format. I see 3 formats now: iceberg, delta and hive (or pyarrow dataset)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

TTypeDetections = Literal[
"timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double"
]
Expand Down
3 changes: 2 additions & 1 deletion dlt/common/storages/data_item_storage.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from pathlib import Path
from typing import Dict, Any, List, Sequence
from typing import Dict, Any, List, Optional
from abc import ABC, abstractmethod

from dlt.common import logger
from dlt.common.schema import TTableSchemaColumns
from dlt.common.typing import StrAny, TDataItems
from dlt.common.storages.load_package import PackageStorage
from dlt.common.data_writers import (
BufferedDataWriter,
DataWriter,
Expand Down
87 changes: 62 additions & 25 deletions dlt/common/storages/load_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
Any,
Tuple,
TypedDict,
Union,
)
from typing_extensions import NotRequired

Expand Down Expand Up @@ -177,6 +178,15 @@ def __str__(self) -> str:
return self.job_id()


class ParsedLoadJobDirectoryName(NamedTuple):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also very minimal. Same as above.

table_name: str

@staticmethod
def parse(dir_name: str) -> "ParsedLoadJobDirectoryName":
table_name = Path(dir_name).name
return ParsedLoadJobDirectoryName(table_name=table_name)


class LoadJobInfo(NamedTuple):
state: TJobState
file_path: str
Expand Down Expand Up @@ -316,11 +326,18 @@ def __init__(self, storage: FileStorage, initial_state: TLoadPackageStatus) -> N
def get_package_path(self, load_id: str) -> str:
return load_id

def get_job_folder_path(self, load_id: str, folder: TJobState) -> str:
return os.path.join(self.get_package_path(load_id), folder)
def get_job_folder_path(
self, load_id: str, folder: TJobState, subfolder: Optional[str] = None
) -> str:
if subfolder is None:
return os.path.join(self.get_package_path(load_id), folder)
else:
return os.path.join(self.get_package_path(load_id), folder, subfolder)

def get_job_file_path(self, load_id: str, folder: TJobState, file_name: str) -> str:
return os.path.join(self.get_job_folder_path(load_id, folder), file_name)
def get_job_file_path(
self, load_id: str, folder: TJobState, file_name: str, subfolder: Optional[str] = None
) -> str:
return os.path.join(self.get_job_folder_path(load_id, folder, subfolder), file_name)

def list_packages(self) -> Sequence[str]:
"""Lists all load ids in storage, earliest first
Expand All @@ -331,11 +348,17 @@ def list_packages(self) -> Sequence[str]:
# start from the oldest packages
return sorted(loads)

def list_new_jobs(self, load_id: str) -> Sequence[str]:
new_jobs = self.storage.list_folder_files(
self.get_job_folder_path(load_id, PackageStorage.NEW_JOBS_FOLDER)
)
return new_jobs
def list_new_jobs(self, load_id: str, root_only: bool = False) -> Sequence[str]:
root_dir = self.get_job_folder_path(load_id, PackageStorage.NEW_JOBS_FOLDER)
if root_only:
return self.storage.list_folder_files(root_dir)
sub_dirs = self.storage.list_folder_dirs(root_dir)
dirs = [root_dir] + sub_dirs
return [file for dir_ in dirs for file in self.storage.list_folder_files(dir_)]

def list_new_dir_jobs(self, load_id: str) -> Sequence[str]:
root_dir = self.get_job_folder_path(load_id, PackageStorage.NEW_JOBS_FOLDER)
return self.storage.list_folder_dirs(root_dir)

def list_started_jobs(self, load_id: str) -> Sequence[str]:
return self.storage.list_folder_files(
Expand Down Expand Up @@ -382,17 +405,19 @@ def import_job(
"""Adds new job by moving the `job_file_path` into `new_jobs` of package `load_id`"""
self.storage.atomic_import(job_file_path, self.get_job_folder_path(load_id, job_state))

def start_job(self, load_id: str, file_name: str) -> str:
def start_job(self, load_id: str, job: Union["LoadJob", "DirectoryLoadJob"]) -> str: # type: ignore[name-defined] # noqa: F821
return self._move_job(
load_id, PackageStorage.NEW_JOBS_FOLDER, PackageStorage.STARTED_JOBS_FOLDER, file_name
load_id, PackageStorage.NEW_JOBS_FOLDER, PackageStorage.STARTED_JOBS_FOLDER, job
)

def fail_job(self, load_id: str, file_name: str, failed_message: Optional[str]) -> str:
def fail_job(
self, load_id: str, job: Union["LoadJob", "DirectoryLoadJob"], failed_message: Optional[str] # type: ignore[name-defined] # noqa: F821
) -> str:
# save the exception to failed jobs
if failed_message:
self.storage.save(
self.get_job_file_path(
load_id, PackageStorage.FAILED_JOBS_FOLDER, file_name + ".exception"
load_id, PackageStorage.FAILED_JOBS_FOLDER, job.file_name() + ".exception"
),
failed_message,
)
Expand All @@ -401,28 +426,30 @@ def fail_job(self, load_id: str, file_name: str, failed_message: Optional[str])
load_id,
PackageStorage.STARTED_JOBS_FOLDER,
PackageStorage.FAILED_JOBS_FOLDER,
file_name,
job.file_name(),
)

def retry_job(self, load_id: str, file_name: str) -> str:
def retry_job(self, load_id: str, job: Union["LoadJob", "DirectoryLoadJob"]) -> str: # type: ignore[name-defined] # noqa: F821
# when retrying job we must increase the retry count
source_fn = ParsedLoadJobFileName.parse(file_name)
source_fn = ParsedLoadJobFileName.parse(job.file_name())
dest_fn = source_fn.with_retry()
# move it directly to new file name
return self._move_job(
load_id,
PackageStorage.STARTED_JOBS_FOLDER,
PackageStorage.NEW_JOBS_FOLDER,
file_name,
job.file_name(),
dest_fn.file_name(),
)

def complete_job(self, load_id: str, file_name: str) -> str:
def complete_job(
self, load_id: str, job: Union["LoadJob", "DirectoryLoadJob"] # type: ignore[name-defined] # noqa: F821
) -> str:
return self._move_job(
load_id,
PackageStorage.STARTED_JOBS_FOLDER,
PackageStorage.COMPLETED_JOBS_FOLDER,
file_name,
job,
)

#
Expand Down Expand Up @@ -601,15 +628,25 @@ def _move_job(
load_id: str,
source_folder: TJobState,
dest_folder: TJobState,
file_name: str,
job: Union["LoadJob", "DirectoryLoadJob"], # type: ignore[name-defined] # noqa: F821
new_file_name: str = None,
) -> str:
# ensure we move file names, not paths
assert file_name == FileStorage.get_file_name_from_file_path(file_name)
from dlt.common.destination.reference import LoadJob, DirectoryLoadJob

load_path = self.get_package_path(load_id)
dest_path = os.path.join(load_path, dest_folder, new_file_name or file_name)
self.storage.atomic_rename(os.path.join(load_path, source_folder, file_name), dest_path)
# print(f"{join(load_path, source_folder, file_name)} -> {dest_path}")

if isinstance(job, LoadJob):
source_name = job.file_name()
# ensure we move file names, not paths
assert source_name == FileStorage.get_file_name_from_file_path(source_name)
dest_name = new_file_name or source_name
elif isinstance(job, DirectoryLoadJob):
source_name = job.dir_name()
dest_name = job.dir_name()

source_path = os.path.join(load_path, source_folder, source_name)
dest_path = os.path.join(load_path, dest_folder, dest_name)
self.storage.atomic_rename(source_path, dest_path)
return self.storage.make_full_path(dest_path)

def _load_schema(self, load_id: str) -> DictStrAny:
Expand Down
21 changes: 17 additions & 4 deletions dlt/common/storages/load_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,23 @@ def __init__(self, package_storage: PackageStorage, writer_spec: FileWriterSpec)
def _get_data_item_path_template(self, load_id: str, _: str, table_name: str) -> str:
# implements DataItemStorage._get_data_item_path_template
file_name = PackageStorage.build_job_file_name(table_name, "%s")
subfolder = self._get_data_item_subfolder(load_id, table_name)
file_path = self.package_storage.get_job_file_path(
load_id, PackageStorage.NEW_JOBS_FOLDER, file_name
load_id, PackageStorage.NEW_JOBS_FOLDER, file_name, subfolder
)
return self.package_storage.storage.make_full_path(file_path)

def _get_data_item_subfolder(self, load_id: str, table_name: str) -> Optional[str]:
"""Returns name of subfolder for `table_name`.

Returns None if subfolder is not used.
"""
subfolder = self.package_storage.get_job_folder_path(
load_id, PackageStorage.NEW_JOBS_FOLDER, table_name
)
subfolder_exists = self.package_storage.storage.has_folder(subfolder)
return table_name if subfolder_exists else None


class LoadStorage(VersionedStorage):
STORAGE_VERSION = "1.0.0"
Expand Down Expand Up @@ -95,19 +107,20 @@ def import_extracted_package(

def list_new_jobs(self, load_id: str) -> Sequence[str]:
"""Lists all jobs in new jobs folder of normalized package storage and checks if file formats are supported"""
new_jobs = self.normalized_packages.list_new_jobs(load_id)
file_jobs = self.normalized_packages.list_new_jobs(load_id, root_only=True)
dir_jobs = self.normalized_packages.list_new_dir_jobs(load_id)
# make sure all jobs have supported writers
wrong_job = next(
(
j
for j in new_jobs
for j in file_jobs
if ParsedLoadJobFileName.parse(j).file_format not in self.supported_job_file_formats
),
None,
)
if wrong_job is not None:
raise JobFileFormatUnsupported(load_id, self.supported_job_file_formats, wrong_job)
return new_jobs
return file_jobs + dir_jobs # type: ignore[no-any-return, operator]

def commit_new_load_package(self, load_id: str) -> None:
self.storage.rename_tree(
Expand Down
Loading