Skip to content

Commit

Permalink
SCD2 support (#1168)
Browse files Browse the repository at this point in the history
* format examples

* add core functionality for scd2 merge strategy

* make scd2 validity column names configurable

* make alias descriptive

* add validity column name conflict checking

* extend write disposition with dictionary configuration option

* add default delete-insert merge strategy

* update write_disposition type hints

* extend tested destinations

* 2nd time setup (#1202)

* remove obsolete deepcopy

* add scd2 docs

* add write_disposition existence condition

* add nullability hints to validity columns

* cache functions to limit schema lookups

* add row_hash_column_name config option

* default to default merge strategy

* replace hardcoded column name with variable to fix test

* fix doc snippets

* compares records without order and with caps timestamps precision in scd2 tests

* defines create load id, stores package state typed, allows package state to be passed on, uses load_id as created_at if possible

* creates new package to normalize from extracted package so state is carried on

* bans direct pendulum import

* uses timestamps with properly reduced precision in scd2

* selects newest state by load_id, not created_at. this will not affect execution as long as packages are processed in order

* adds formating datetime literal to escape

* renames x-row-hash to x-row-version

* corrects json and pendulum imports

* uses unique column in scd2 sql generation

* renames arrow items literal

* adds limitations to docs

* passes only complete columns to arrow normalize

* renames mode to disposition

* saves parquet with timestamp precision corresponding to the destination and updates schema in the normalizer

* adds transform that computes hashes of tables

* tests arrow/pandas + scd2

* allows scd2 columns to be added to arrow items

* various renames

* uses generic caps when writing parquet if no destination context

* disables coercing timestamps in parquet arrow writer

---------

Co-authored-by: Jorrit Sandbrink <[email protected]>
Co-authored-by: adrianbr <[email protected]>
Co-authored-by: rudolfix <[email protected]>
  • Loading branch information
4 people authored Apr 14, 2024
1 parent 1c01821 commit 05aa413
Show file tree
Hide file tree
Showing 109 changed files with 1,663 additions and 314 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,4 @@ The dlt project is quickly growing, and we're excited to have you join our commu

## License

DLT is released under the [Apache 2.0 License](LICENSE.txt).
`dlt` is released under the [Apache 2.0 License](LICENSE.txt).
2 changes: 1 addition & 1 deletion dlt/cli/_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import click

from dlt.version import __version__
from dlt.common import json
from dlt.common.json import json
from dlt.common.schema import Schema
from dlt.common.typing import DictStrAny
from dlt.common.runners import Venv
Expand Down
2 changes: 1 addition & 1 deletion dlt/cli/config_toml_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from tomlkit.container import Container as TOMLContainer
from collections.abc import Sequence as C_Sequence

from dlt.common import pendulum
from dlt.common.pendulum import pendulum
from dlt.common.configuration.specs import (
BaseConfiguration,
is_base_configuration_inner_hint,
Expand Down
2 changes: 1 addition & 1 deletion dlt/cli/pipeline_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import dlt
from dlt.cli.exceptions import CliCommandException

from dlt.common import json
from dlt.common.json import json
from dlt.common.pipeline import resource_state, get_dlt_pipelines_dir, TSourceState
from dlt.common.destination.reference import TDestinationReferenceArg
from dlt.common.runners import Venv
Expand Down
3 changes: 0 additions & 3 deletions dlt/cli/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import ast
import os
import tempfile
from typing import Callable

from dlt.common import git
from dlt.common.reflection.utils import set_ast_parents
from dlt.common.storages import FileStorage
from dlt.common.typing import TFun
from dlt.common.configuration import resolve_configuration
from dlt.common.configuration.specs import RunConfiguration
Expand Down
3 changes: 1 addition & 2 deletions dlt/common/configuration/providers/google_secrets.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import base64
import string
import re
from typing import Tuple

from dlt.common import json
from dlt.common.json import json
from dlt.common.configuration.specs import GcpServiceAccountCredentials
from dlt.common.exceptions import MissingDependencyException
from .toml import VaultTomlProvider
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/configuration/providers/toml.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from tomlkit.container import Container as TOMLContainer
from typing import Any, Dict, Optional, Tuple, Type, Union

from dlt.common import pendulum
from dlt.common.pendulum import pendulum
from dlt.common.configuration.paths import get_dlt_settings_dir, get_dlt_data_dir
from dlt.common.configuration.utils import auto_cast
from dlt.common.configuration.specs import known_sections
Expand Down
3 changes: 1 addition & 2 deletions dlt/common/configuration/specs/azure_credentials.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Optional, Dict, Any

from dlt.common import pendulum
from dlt.common.exceptions import MissingDependencyException
from dlt.common.pendulum import pendulum
from dlt.common.typing import TSecretStrValue
from dlt.common.configuration.specs import (
CredentialsConfiguration,
Expand Down
3 changes: 2 additions & 1 deletion dlt/common/configuration/specs/gcp_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import sys
from typing import Any, ClassVar, Final, List, Tuple, Union, Dict

from dlt.common import json, pendulum
from dlt.common.json import json
from dlt.common.pendulum import pendulum
from dlt.common.configuration.specs.api_credentials import OAuth2Credentials
from dlt.common.configuration.specs.exceptions import (
InvalidGoogleNativeCredentialsType,
Expand Down
4 changes: 1 addition & 3 deletions dlt/common/configuration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Dict, Mapping, NamedTuple, Optional, Tuple, Type, Sequence
from collections.abc import Mapping as C_Mapping

from dlt.common import json
from dlt.common.json import json
from dlt.common.typing import AnyType, TAny
from dlt.common.data_types import coerce_value, py_type_to_sc_type
from dlt.common.configuration.providers import EnvironProvider
Expand Down Expand Up @@ -122,8 +122,6 @@ def log_traces(
default_value: Any,
traces: Sequence[LookupTrace],
) -> None:
from dlt.common import logger

# if logger.is_logging() and logger.log_level() == "DEBUG" and config:
# logger.debug(f"Field {key} with type {hint} in {type(config).__name__} {'NOT RESOLVED' if value is None else 'RESOLVED'}")
# print(f"Field {key} with type {hint} in {type(config).__name__} {'NOT RESOLVED' if value is None else 'RESOLVED'}")
Expand Down
6 changes: 3 additions & 3 deletions dlt/common/data_types/type_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import dataclasses
import datetime # noqa: I251
from collections.abc import Mapping as C_Mapping, Sequence as C_Sequence
from typing import Any, Type, Literal, Union, cast
from typing import Any, Type, Union
from enum import Enum

from dlt.common import pendulum, json, Decimal, Wei
from dlt.common.json import custom_pua_remove, json
from dlt.common.json._simplejson import custom_encode as json_custom_encode
from dlt.common.arithmetics import InvalidOperation
from dlt.common.wei import Wei
from dlt.common.arithmetics import InvalidOperation, Decimal
from dlt.common.data_types.typing import TDataType
from dlt.common.time import (
ensure_pendulum_datetime,
Expand Down
16 changes: 16 additions & 0 deletions dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from datetime import date, datetime, time # noqa: I251

from dlt.common.json import json
from dlt.common.pendulum import pendulum
from dlt.common.time import reduce_pendulum_datetime_precision

# use regex to escape characters in single pass
SQL_ESCAPE_DICT = {"'": "''", "\\": "\\\\", "\n": "\\n", "\r": "\\r"}
Expand Down Expand Up @@ -152,3 +154,17 @@ def escape_databricks_literal(v: Any) -> Any:
return "NULL"

return str(v)


def format_datetime_literal(v: pendulum.DateTime, precision: int = 6, no_tz: bool = False) -> str:
"""Converts `v` to ISO string, optionally without timezone spec (in UTC) and with given `precision`"""
if no_tz:
v = v.in_timezone(tz="UTC").replace(tzinfo=None)
v = reduce_pendulum_datetime_precision(v, precision)
# yet another precision translation
timespec: str = "microseconds"
if precision < 6:
timespec = "milliseconds"
elif precision < 3:
timespec = "seconds"
return v.isoformat(sep=" ", timespec=timespec)
21 changes: 17 additions & 4 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
TypeVar,
)

from dlt.common import json
from dlt.common.json import json
from dlt.common.configuration import configspec, known_sections, with_config
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.data_writers.exceptions import DataWriterNotFound, InvalidDataItem
Expand Down Expand Up @@ -176,6 +176,9 @@ def writer_spec(cls) -> FileWriterSpec:

class InsertValuesWriter(DataWriter):
def __init__(self, f: IO[Any], caps: DestinationCapabilitiesContext = None) -> None:
assert (
caps is not None
), "InsertValuesWriter requires destination capabilities to be present"
super().__init__(f, caps)
self._chunks_written = 0
self._headers_lookup: Dict[str, int] = None
Expand Down Expand Up @@ -272,7 +275,7 @@ def __init__(
coerce_timestamps: Optional[Literal["s", "ms", "us", "ns"]] = None,
allow_truncated_timestamps: bool = False,
) -> None:
super().__init__(f, caps)
super().__init__(f, caps or DestinationCapabilitiesContext.generic_capabilities("parquet"))
from dlt.common.libs.pyarrow import pyarrow

self.writer: Optional[pyarrow.parquet.ParquetWriter] = None
Expand All @@ -287,7 +290,15 @@ def __init__(
self.allow_truncated_timestamps = allow_truncated_timestamps

def _create_writer(self, schema: "pa.Schema") -> "pa.parquet.ParquetWriter":
from dlt.common.libs.pyarrow import pyarrow
from dlt.common.libs.pyarrow import pyarrow, get_py_arrow_timestamp

# if timestamps are not explicitly coerced, use destination resolution
# TODO: introduce maximum timestamp resolution, using timestamp_precision too aggressive
# if not self.coerce_timestamps:
# self.coerce_timestamps = get_py_arrow_timestamp(
# self._caps.timestamp_precision, "UTC"
# ).unit
# self.allow_truncated_timestamps = True

return pyarrow.parquet.ParquetWriter(
self._f,
Expand Down Expand Up @@ -331,7 +342,9 @@ def write_data(self, rows: Sequence[Any]) -> None:
for key in self.complex_indices:
for row in rows:
if (value := row.get(key)) is not None:
row[key] = json.dumps(value)
# TODO: make this configurable
if value is not None and not isinstance(value, str):
row[key] = json.dumps(value)

table = pyarrow.Table.from_pylist(rows, schema=self.schema)
# Write
Expand Down
7 changes: 7 additions & 0 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from dlt.common import logger
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.typing import MERGE_STRATEGIES
from dlt.common.schema.exceptions import SchemaException
from dlt.common.schema.utils import (
get_write_disposition,
Expand Down Expand Up @@ -344,6 +345,12 @@ def _verify_schema(self) -> None:
table_name,
self.capabilities.max_identifier_length,
)
if table.get("write_disposition") == "merge":
if "x-merge-strategy" in table and table["x-merge-strategy"] not in MERGE_STRATEGIES: # type: ignore[typeddict-item]
raise SchemaException(
f'"{table["x-merge-strategy"]}" is not a valid merge strategy. ' # type: ignore[typeddict-item]
f"""Allowed values: {', '.join(['"' + s + '"' for s in MERGE_STRATEGIES])}."""
)
if has_column_with_prop(table, "hard_delete"):
if len(get_columns_names_with_prop(table, "hard_delete")) > 1:
raise SchemaException(
Expand Down
8 changes: 4 additions & 4 deletions dlt/common/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
except ImportError:
PydanticBaseModel = None # type: ignore[misc]

from dlt.common import pendulum
from dlt.common.pendulum import pendulum
from dlt.common.arithmetics import Decimal
from dlt.common.wei import Wei
from dlt.common.utils import map_nested_in_place
Expand Down Expand Up @@ -99,19 +99,19 @@ def _datetime_decoder(obj: str) -> datetime:
# Backwards compatibility for data encoded with previous dlt version
# fromisoformat does not support Z suffix (until py3.11)
obj = obj[:-1] + "+00:00"
return pendulum.DateTime.fromisoformat(obj) # type: ignore[attr-defined, no-any-return]
return pendulum.DateTime.fromisoformat(obj)


# define decoder for each prefix
DECODERS: List[Callable[[Any], Any]] = [
Decimal,
_datetime_decoder,
pendulum.Date.fromisoformat, # type: ignore[attr-defined]
pendulum.Date.fromisoformat,
UUID,
HexBytes,
base64.b64decode,
Wei,
pendulum.Time.fromisoformat, # type: ignore[attr-defined]
pendulum.Time.fromisoformat,
]
# how many decoders?
PUA_CHARACTER_MAX = len(DECODERS)
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/libs/numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
try:
import numpy
except ModuleNotFoundError:
raise MissingDependencyException("DLT Numpy Helpers", ["numpy"])
raise MissingDependencyException("dlt Numpy Helpers", ["numpy"])
2 changes: 1 addition & 1 deletion dlt/common/libs/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
try:
import pandas
except ModuleNotFoundError:
raise MissingDependencyException("DLT Pandas Helpers", ["pandas"])
raise MissingDependencyException("dlt Pandas Helpers", ["pandas"])


def pandas_to_arrow(df: pandas.DataFrame) -> Any:
Expand Down
10 changes: 5 additions & 5 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)

from dlt import version
from dlt.common import pendulum
from dlt.common.pendulum import pendulum
from dlt.common.exceptions import MissingDependencyException
from dlt.common.schema.typing import DLT_NAME_PREFIX, TTableSchemaColumns

Expand Down Expand Up @@ -119,7 +119,7 @@ def get_pyarrow_int(precision: Optional[int]) -> Any:
return pyarrow.int64()


def _get_column_type_from_py_arrow(dtype: pyarrow.DataType) -> TColumnType:
def get_column_type_from_py_arrow(dtype: pyarrow.DataType) -> TColumnType:
"""Returns (data_type, precision, scale) tuple from pyarrow.DataType"""
if pyarrow.types.is_string(dtype) or pyarrow.types.is_large_string(dtype):
return dict(data_type="text")
Expand Down Expand Up @@ -226,14 +226,14 @@ def should_normalize_arrow_schema(
) -> Tuple[bool, Mapping[str, str], Dict[str, str], TTableSchemaColumns]:
rename_mapping = get_normalized_arrow_fields_mapping(schema, naming)
rev_mapping = {v: k for k, v in rename_mapping.items()}
dlt_table_prefix = naming.normalize_table_identifier(DLT_NAME_PREFIX)
dlt_tables = list(map(naming.normalize_table_identifier, ("_dlt_id", "_dlt_load_id")))

# remove all columns that are dlt columns but are not present in arrow schema. we do not want to add such columns
# that should happen in the normalizer
columns = {
name: column
for name, column in columns.items()
if not name.startswith(dlt_table_prefix) or name in rev_mapping
if name not in dlt_tables or name in rev_mapping
}

# check if nothing to rename
Expand Down Expand Up @@ -322,7 +322,7 @@ def py_arrow_to_table_schema_columns(schema: pyarrow.Schema) -> TTableSchemaColu
result[field.name] = {
"name": field.name,
"nullable": field.nullable,
**_get_column_type_from_py_arrow(field.type),
**get_column_type_from_py_arrow(field.type),
}
return result

Expand Down
Loading

0 comments on commit 05aa413

Please sign in to comment.