Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable schema evolution for merge write disposition with delta table format #1742

Merged
merged 12 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 53 additions & 2 deletions dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
from dlt.common import logger
from dlt.common.libs.pyarrow import pyarrow as pa
from dlt.common.libs.pyarrow import cast_arrow_schema_types
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.schema.typing import TWriteDisposition, TTableSchema
from dlt.common.schema.utils import get_first_column_name_with_prop, get_columns_names_with_prop
from dlt.common.exceptions import MissingDependencyException
from dlt.common.storages import FilesystemConfiguration
from dlt.common.utils import assert_min_pkg_version
from dlt.destinations.impl.filesystem.filesystem import FilesystemClient

try:
import deltalake
from deltalake import write_deltalake, DeltaTable
from deltalake.writer import try_get_deltatable
except ModuleNotFoundError:
Expand Down Expand Up @@ -74,7 +76,7 @@ def write_delta_table(
partition_by: Optional[Union[List[str], str]] = None,
storage_options: Optional[Dict[str, str]] = None,
) -> None:
"""Writes in-memory Arrow table to on-disk Delta table.
"""Writes in-memory Arrow data to on-disk Delta table.

Thin wrapper around `deltalake.write_deltalake`.
"""
Expand All @@ -93,6 +95,42 @@ def write_delta_table(
)


def merge_delta_table(
table: DeltaTable,
data: Union[pa.Table, pa.RecordBatchReader],
schema: TTableSchema,
) -> None:
"""Merges in-memory Arrow data into on-disk Delta table."""

strategy = schema["x-merge-strategy"] # type: ignore[typeddict-item]
if strategy == "upsert":
# `DeltaTable.merge` does not support automatic schema evolution
# https://github.com/delta-io/delta-rs/issues/2282
_evolve_delta_table_schema(table, data.schema)

if "parent" in schema:
unique_column = get_first_column_name_with_prop(schema, "unique")
predicate = f"target.{unique_column} = source.{unique_column}"
else:
primary_keys = get_columns_names_with_prop(schema, "primary_key")
predicate = " AND ".join([f"target.{c} = source.{c}" for c in primary_keys])

qry = (
table.merge(
source=ensure_delta_compatible_arrow_data(data),
predicate=predicate,
source_alias="source",
target_alias="target",
)
.when_matched_update_all()
.when_not_matched_insert_all()
)

qry.execute()
else:
ValueError(f'Merge strategy "{strategy}" not supported.')


def get_delta_tables(pipeline: Pipeline, *tables: str) -> Dict[str, DeltaTable]:
"""Returns Delta tables in `pipeline.default_schema` as `deltalake.DeltaTable` objects.

Expand Down Expand Up @@ -145,3 +183,16 @@ def _deltalake_storage_options(config: FilesystemConfiguration) -> Dict[str, str
+ ". dlt will use the values in `deltalake_storage_options`."
)
return {**creds, **extra_options}


def _evolve_delta_table_schema(delta_table: DeltaTable, arrow_schema: pa.Schema) -> None:
"""Evolves `delta_table` schema if different from `arrow_schema`.

Adds column(s) to `delta_table` present in `arrow_schema` but not in `delta_table`.
"""
new_fields = [
deltalake.Field.from_pyarrow(field)
for field in ensure_delta_compatible_arrow_schema(arrow_schema)
if field not in delta_table.to_pyarrow_dataset().schema
]
delta_table.alter.add_columns(new_fields)
28 changes: 6 additions & 22 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def run(self) -> None:
from dlt.common.libs.deltalake import (
DeltaTable,
write_delta_table,
merge_delta_table,
ensure_delta_compatible_arrow_schema,
_deltalake_storage_options,
try_get_deltatable,
Expand Down Expand Up @@ -136,28 +137,11 @@ def run(self) -> None:

if self._load_table["write_disposition"] == "merge" and dt is not None:
assert self._load_table["x-merge-strategy"] in self._job_client.capabilities.supported_merge_strategies # type: ignore[typeddict-item]

if self._load_table["x-merge-strategy"] == "upsert": # type: ignore[typeddict-item]
if "parent" in self._load_table:
unique_column = get_first_column_name_with_prop(self._load_table, "unique")
predicate = f"target.{unique_column} = source.{unique_column}"
else:
primary_keys = get_columns_names_with_prop(self._load_table, "primary_key")
predicate = " AND ".join([f"target.{c} = source.{c}" for c in primary_keys])

qry = (
dt.merge(
source=arrow_rbr,
predicate=predicate,
source_alias="source",
target_alias="target",
)
.when_matched_update_all()
.when_not_matched_insert_all()
)

qry.execute()

merge_delta_table(
table=dt,
data=arrow_rbr,
schema=self._load_table,
)
else:
write_delta_table(
table_or_uri=dt_path if dt is None else dt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from typing import List, Dict, Any, Generator
import dlt


# Define a dlt resource with write disposition to 'merge'
@dlt.resource(name="parent_with_children", write_disposition={"disposition": "merge"})
def data_source() -> Generator[List[Dict[str, Any]], None, None]:
Expand All @@ -44,13 +45,15 @@ def data_source() -> Generator[List[Dict[str, Any]], None, None]:

yield data


# Function to add parent_id to each child record within a parent record
def add_parent_id(record: Dict[str, Any]) -> Dict[str, Any]:
parent_id_key = "parent_id"
for child in record["children"]:
child[parent_id_key] = record[parent_id_key]
return record


if __name__ == "__main__":
# Create and configure the dlt pipeline
pipeline = dlt.pipeline(
Expand All @@ -60,10 +63,6 @@ def add_parent_id(record: Dict[str, Any]) -> Dict[str, Any]:
)

# Run the pipeline
load_info = pipeline.run(
data_source()
.add_map(add_parent_id),
primary_key="parent_id"
)
load_info = pipeline.run(data_source().add_map(add_parent_id), primary_key="parent_id")
# Output the load information after pipeline execution
print(load_info)
Loading
Loading