Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable schema evolution for merge write disposition with delta table format #1742

Merged
merged 12 commits into from
Aug 27, 2024
Merged
94 changes: 45 additions & 49 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,36 +56,36 @@ def __init__(
self._job_client: FilesystemClient = None

def run(self) -> None:
# pick local filesystem pathlib or posix for buckets
self.is_local_filesystem = self._job_client.config.protocol == "file"
self.pathlib = os.path if self.is_local_filesystem else posixpath

self.destination_file_name = path_utils.create_path(
self._job_client.config.layout,
self._file_name,
self._job_client.schema.name,
self._load_id,
current_datetime=self._job_client.config.current_datetime,
load_package_timestamp=dlt.current.load_package()["state"]["created_at"],
extra_placeholders=self._job_client.config.extra_placeholders,
)
self.__is_local_filesystem = self._job_client.config.protocol == "file"
# We would like to avoid failing for local filesystem where
# deeply nested directory will not exist before writing a file.
# It `auto_mkdir` is disabled by default in fsspec so we made some
# trade offs between different options and decided on this.
# remote_path = f"{client.config.protocol}://{posixpath.join(dataset_path, destination_file_name)}"
remote_path = self.make_remote_path()
if self.is_local_filesystem:
self._job_client.fs_client.makedirs(self.pathlib.dirname(remote_path), exist_ok=True)
if self.__is_local_filesystem:
# use os.path for local file name
self._job_client.fs_client.makedirs(os.path.dirname(remote_path), exist_ok=True)
self._job_client.fs_client.put_file(self._file_path, remote_path)

def make_remote_path(self) -> str:
"""Returns path on the remote filesystem to which copy the file, without scheme. For local filesystem a native path is used"""
destination_file_name = path_utils.create_path(
self._job_client.config.layout,
self._file_name,
self._job_client.schema.name,
self._load_id,
current_datetime=self._job_client.config.current_datetime,
load_package_timestamp=dlt.current.load_package()["state"]["created_at"],
extra_placeholders=self._job_client.config.extra_placeholders,
)
# pick local filesystem pathlib or posix for buckets
pathlib = os.path if self.__is_local_filesystem else posixpath
# path.join does not normalize separators and available
# normalization functions are very invasive and may string the trailing separator
return self.pathlib.join( # type: ignore[no-any-return]
return pathlib.join( # type: ignore[no-any-return]
self._job_client.dataset_path,
path_utils.normalize_path_sep(self.pathlib, self.destination_file_name),
path_utils.normalize_path_sep(pathlib, destination_file_name),
)

def make_remote_uri(self) -> str:
Expand All @@ -98,23 +98,20 @@ def metrics(self) -> Optional[LoadJobMetrics]:

class DeltaLoadFilesystemJob(FilesystemLoadJob):
def __init__(self, file_path: str) -> None:
from dlt.common.libs.pyarrow import pyarrow as pa
super().__init__(file_path=file_path)

super().__init__(
file_path=file_path,
)
# create Arrow dataset from Parquet files
file_paths = ReferenceFollowupJobRequest.resolve_references(self._file_path)
self.arrow_ds = pa.dataset.dataset(file_paths)
from dlt.common.libs.pyarrow import pyarrow as pa

self.file_paths = ReferenceFollowupJobRequest.resolve_references(self._file_path)
self.arrow_ds = pa.dataset.dataset(self.file_paths)

def make_remote_path(self) -> str:
# remote path is table dir - delta will create its file structure inside it
return self._job_client.get_table_dir(self.load_table_name)

def run(self) -> None:
# pick local filesystem pathlib or posix for buckets
# TODO: since we pass _job_client via run_managed and not set_env_vars it is hard
# to write a handler with those two line below only in FilesystemLoadJob
self.is_local_filesystem = self._job_client.config.protocol == "file"
self.pathlib = os.path if self.is_local_filesystem else posixpath
# `destination_file_name` is a folder path, not a file path
self.destination_file_name = self._job_client.get_table_dir(self.load_table_name)
logger.info(f"Will copy file(s) {self.file_paths} to delta table {self.make_remote_uri()}")

from dlt.common.libs.deltalake import write_delta_table, merge_delta_table

Expand All @@ -124,25 +121,24 @@ def run(self) -> None:
self._create_or_evolve_delta_table()
return

arrow_rbr = self.arrow_ds.scanner().to_reader() # RecordBatchReader

if self._load_table["write_disposition"] == "merge" and self._delta_table is not None:
assert self._load_table["x-merge-strategy"] in self._job_client.capabilities.supported_merge_strategies # type: ignore[typeddict-item]
merge_delta_table(
table=self._delta_table,
data=arrow_rbr,
schema=self._load_table,
)
else:
write_delta_table(
table_or_uri=(
self.make_remote_uri() if self._delta_table is None else self._delta_table
),
data=arrow_rbr,
write_disposition=self._load_table["write_disposition"],
partition_by=self._partition_columns,
storage_options=self._storage_options,
)
with self.arrow_ds.scanner().to_reader() as arrow_rbr: # RecordBatchReader
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why you inserted a with context here. Is it because arrow_rbr gets exhausted and is effectively useless after the context?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it has a close method... so it has internal unmanaged resources that we should free ASAP. otherwise garbage collector does it way later

if self._load_table["write_disposition"] == "merge" and self._delta_table is not None:
assert self._load_table["x-merge-strategy"] in self._job_client.capabilities.supported_merge_strategies # type: ignore[typeddict-item]
merge_delta_table(
table=self._delta_table,
data=arrow_rbr,
schema=self._load_table,
)
else:
write_delta_table(
table_or_uri=(
self.make_remote_uri() if self._delta_table is None else self._delta_table
),
data=arrow_rbr,
write_disposition=self._load_table["write_disposition"],
partition_by=self._partition_columns,
storage_options=self._storage_options,
)

@property
def _storage_options(self) -> Dict[str, str]:
Expand Down