Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SCD2 support #1168

Merged
merged 45 commits into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
720b115
format examples
Mar 31, 2024
115b4c9
add core functionality for scd2 merge strategy
Mar 31, 2024
37befbc
make scd2 validity column names configurable
Apr 1, 2024
7726d98
make alias descriptive
Apr 2, 2024
30bb2e0
add validity column name conflict checking
Apr 2, 2024
765d652
Merge branch 'devel' of https://github.com/dlt-hub/dlt into 828-scd2
Apr 7, 2024
8f4d4ce
extend write disposition with dictionary configuration option
Apr 7, 2024
396ec59
add default delete-insert merge strategy
Apr 7, 2024
c8d84d8
update write_disposition type hints
Apr 7, 2024
11748a6
extend tested destinations
Apr 7, 2024
e9c8f61
2nd time setup (#1202)
adrianbr Apr 9, 2024
1f399bc
remove obsolete deepcopy
Apr 9, 2024
c8f4173
Merge branch 'devel' of https://github.com/dlt-hub/dlt into 828-scd2
Apr 9, 2024
c99d612
Merge pull request #1200 from dlt-hub/devel
rudolfix Apr 9, 2024
0fa603b
add scd2 docs
Apr 9, 2024
c110aae
add write_disposition existence condition
Apr 9, 2024
55df900
add nullability hints to validity columns
Apr 9, 2024
6b24378
cache functions to limit schema lookups
Apr 10, 2024
4124d61
add row_hash_column_name config option
Apr 10, 2024
4236f20
default to default merge strategy
Apr 11, 2024
0e7f8c0
replace hardcoded column name with variable to fix test
Apr 11, 2024
93e7f45
fix doc snippets
Apr 11, 2024
36da1f2
compares records without order and with caps timestamps precision in …
rudolfix Apr 13, 2024
0d6919a
defines create load id, stores package state typed, allows package st…
rudolfix Apr 13, 2024
2195ebf
creates new package to normalize from extracted package so state is c…
rudolfix Apr 13, 2024
52f0d7b
bans direct pendulum import
rudolfix Apr 13, 2024
caa9ae7
uses timestamps with properly reduced precision in scd2
rudolfix Apr 13, 2024
64baf2d
selects newest state by load_id, not created_at. this will not affect…
rudolfix Apr 13, 2024
8cb24af
adds formating datetime literal to escape
rudolfix Apr 13, 2024
6039f1c
renames x-row-hash to x-row-version
rudolfix Apr 13, 2024
dee8e08
corrects json and pendulum imports
rudolfix Apr 13, 2024
e63ffe1
uses unique column in scd2 sql generation
rudolfix Apr 13, 2024
12bdf2b
renames arrow items literal
rudolfix Apr 13, 2024
c1614b9
adds limitations to docs
rudolfix Apr 13, 2024
a3f47fc
passes only complete columns to arrow normalize
rudolfix Apr 13, 2024
e1c53b8
renames mode to disposition
rudolfix Apr 13, 2024
c815792
Merge branch 'master' into 828-scd2
rudolfix Apr 13, 2024
900cf06
Merge branch 'devel' into 828-scd2
rudolfix Apr 13, 2024
6ed8000
saves parquet with timestamp precision corresponding to the destinati…
rudolfix Apr 13, 2024
4fde3cc
adds transform that computes hashes of tables
rudolfix Apr 13, 2024
7796b00
tests arrow/pandas + scd2
rudolfix Apr 13, 2024
feca9cd
allows scd2 columns to be added to arrow items
rudolfix Apr 13, 2024
e5c78fd
various renames
rudolfix Apr 13, 2024
d462a5b
uses generic caps when writing parquet if no destination context
rudolfix Apr 14, 2024
6944828
disables coercing timestamps in parquet arrow writer
rudolfix Apr 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,4 @@ The dlt project is quickly growing, and we're excited to have you join our commu

## License

DLT is released under the [Apache 2.0 License](LICENSE.txt).
`dlt` is released under the [Apache 2.0 License](LICENSE.txt).
2 changes: 1 addition & 1 deletion dlt/cli/_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import click

from dlt.version import __version__
from dlt.common import json
from dlt.common.json import json
from dlt.common.schema import Schema
from dlt.common.typing import DictStrAny
from dlt.common.runners import Venv
Expand Down
2 changes: 1 addition & 1 deletion dlt/cli/config_toml_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from tomlkit.container import Container as TOMLContainer
from collections.abc import Sequence as C_Sequence

from dlt.common import pendulum
from dlt.common.pendulum import pendulum
from dlt.common.configuration.specs import (
BaseConfiguration,
is_base_configuration_inner_hint,
Expand Down
2 changes: 1 addition & 1 deletion dlt/cli/pipeline_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import dlt
from dlt.cli.exceptions import CliCommandException

from dlt.common import json
from dlt.common.json import json
from dlt.common.pipeline import resource_state, get_dlt_pipelines_dir, TSourceState
from dlt.common.destination.reference import TDestinationReferenceArg
from dlt.common.runners import Venv
Expand Down
3 changes: 0 additions & 3 deletions dlt/cli/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import ast
import os
import tempfile
from typing import Callable

from dlt.common import git
from dlt.common.reflection.utils import set_ast_parents
from dlt.common.storages import FileStorage
from dlt.common.typing import TFun
from dlt.common.configuration import resolve_configuration
from dlt.common.configuration.specs import RunConfiguration
Expand Down
3 changes: 1 addition & 2 deletions dlt/common/configuration/providers/google_secrets.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import base64
import string
import re
from typing import Tuple

from dlt.common import json
from dlt.common.json import json
from dlt.common.configuration.specs import GcpServiceAccountCredentials
from dlt.common.exceptions import MissingDependencyException
from .toml import VaultTomlProvider
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/configuration/providers/toml.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from tomlkit.container import Container as TOMLContainer
from typing import Any, Dict, Optional, Tuple, Type, Union

from dlt.common import pendulum
from dlt.common.pendulum import pendulum
from dlt.common.configuration.paths import get_dlt_settings_dir, get_dlt_data_dir
from dlt.common.configuration.utils import auto_cast
from dlt.common.configuration.specs import known_sections
Expand Down
3 changes: 1 addition & 2 deletions dlt/common/configuration/specs/azure_credentials.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Optional, Dict, Any

from dlt.common import pendulum
from dlt.common.exceptions import MissingDependencyException
from dlt.common.pendulum import pendulum
from dlt.common.typing import TSecretStrValue
from dlt.common.configuration.specs import (
CredentialsConfiguration,
Expand Down
3 changes: 2 additions & 1 deletion dlt/common/configuration/specs/gcp_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import sys
from typing import Any, ClassVar, Final, List, Tuple, Union, Dict

from dlt.common import json, pendulum
from dlt.common.json import json
from dlt.common.pendulum import pendulum
from dlt.common.configuration.specs.api_credentials import OAuth2Credentials
from dlt.common.configuration.specs.exceptions import (
InvalidGoogleNativeCredentialsType,
Expand Down
4 changes: 1 addition & 3 deletions dlt/common/configuration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Dict, Mapping, NamedTuple, Optional, Tuple, Type, Sequence
from collections.abc import Mapping as C_Mapping

from dlt.common import json
from dlt.common.json import json
from dlt.common.typing import AnyType, TAny
from dlt.common.data_types import coerce_value, py_type_to_sc_type
from dlt.common.configuration.providers import EnvironProvider
Expand Down Expand Up @@ -122,8 +122,6 @@ def log_traces(
default_value: Any,
traces: Sequence[LookupTrace],
) -> None:
from dlt.common import logger

# if logger.is_logging() and logger.log_level() == "DEBUG" and config:
# logger.debug(f"Field {key} with type {hint} in {type(config).__name__} {'NOT RESOLVED' if value is None else 'RESOLVED'}")
# print(f"Field {key} with type {hint} in {type(config).__name__} {'NOT RESOLVED' if value is None else 'RESOLVED'}")
Expand Down
6 changes: 3 additions & 3 deletions dlt/common/data_types/type_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import dataclasses
import datetime # noqa: I251
from collections.abc import Mapping as C_Mapping, Sequence as C_Sequence
from typing import Any, Type, Literal, Union, cast
from typing import Any, Type, Union
from enum import Enum

from dlt.common import pendulum, json, Decimal, Wei
from dlt.common.json import custom_pua_remove, json
from dlt.common.json._simplejson import custom_encode as json_custom_encode
from dlt.common.arithmetics import InvalidOperation
from dlt.common.wei import Wei
from dlt.common.arithmetics import InvalidOperation, Decimal
from dlt.common.data_types.typing import TDataType
from dlt.common.time import (
ensure_pendulum_datetime,
Expand Down
16 changes: 16 additions & 0 deletions dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from datetime import date, datetime, time # noqa: I251

from dlt.common.json import json
from dlt.common.pendulum import pendulum
from dlt.common.time import reduce_pendulum_datetime_precision

# use regex to escape characters in single pass
SQL_ESCAPE_DICT = {"'": "''", "\\": "\\\\", "\n": "\\n", "\r": "\\r"}
Expand Down Expand Up @@ -152,3 +154,17 @@ def escape_databricks_literal(v: Any) -> Any:
return "NULL"

return str(v)


def format_datetime_literal(v: pendulum.DateTime, precision: int = 6, no_tz: bool = False) -> str:
"""Converts `v` to ISO string, optionally without timezone spec (in UTC) and with given `precision`"""
if no_tz:
v = v.in_timezone(tz="UTC").replace(tzinfo=None)
v = reduce_pendulum_datetime_precision(v, precision)
# yet another precision translation
timespec: str = "microseconds"
if precision < 6:
timespec = "milliseconds"
elif precision < 3:
timespec = "seconds"
return v.isoformat(sep=" ", timespec=timespec)
21 changes: 17 additions & 4 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
TypeVar,
)

from dlt.common import json
from dlt.common.json import json
from dlt.common.configuration import configspec, known_sections, with_config
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.data_writers.exceptions import DataWriterNotFound, InvalidDataItem
Expand Down Expand Up @@ -176,6 +176,9 @@ def writer_spec(cls) -> FileWriterSpec:

class InsertValuesWriter(DataWriter):
def __init__(self, f: IO[Any], caps: DestinationCapabilitiesContext = None) -> None:
assert (
caps is not None
), "InsertValuesWriter requires destination capabilities to be present"
super().__init__(f, caps)
self._chunks_written = 0
self._headers_lookup: Dict[str, int] = None
Expand Down Expand Up @@ -272,7 +275,7 @@ def __init__(
coerce_timestamps: Optional[Literal["s", "ms", "us", "ns"]] = None,
allow_truncated_timestamps: bool = False,
) -> None:
super().__init__(f, caps)
super().__init__(f, caps or DestinationCapabilitiesContext.generic_capabilities("parquet"))
from dlt.common.libs.pyarrow import pyarrow

self.writer: Optional[pyarrow.parquet.ParquetWriter] = None
Expand All @@ -287,7 +290,15 @@ def __init__(
self.allow_truncated_timestamps = allow_truncated_timestamps

def _create_writer(self, schema: "pa.Schema") -> "pa.parquet.ParquetWriter":
from dlt.common.libs.pyarrow import pyarrow
from dlt.common.libs.pyarrow import pyarrow, get_py_arrow_timestamp

# if timestamps are not explicitly coerced, use destination resolution
# TODO: introduce maximum timestamp resolution, using timestamp_precision too aggressive
# if not self.coerce_timestamps:
# self.coerce_timestamps = get_py_arrow_timestamp(
# self._caps.timestamp_precision, "UTC"
# ).unit
# self.allow_truncated_timestamps = True

return pyarrow.parquet.ParquetWriter(
self._f,
Expand Down Expand Up @@ -331,7 +342,9 @@ def write_data(self, rows: Sequence[Any]) -> None:
for key in self.complex_indices:
for row in rows:
if (value := row.get(key)) is not None:
row[key] = json.dumps(value)
# TODO: make this configurable
if value is not None and not isinstance(value, str):
row[key] = json.dumps(value)

table = pyarrow.Table.from_pylist(rows, schema=self.schema)
# Write
Expand Down
7 changes: 7 additions & 0 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from dlt.common import logger
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.typing import MERGE_STRATEGIES
from dlt.common.schema.exceptions import SchemaException
from dlt.common.schema.utils import (
get_write_disposition,
Expand Down Expand Up @@ -344,6 +345,12 @@ def _verify_schema(self) -> None:
table_name,
self.capabilities.max_identifier_length,
)
if table.get("write_disposition") == "merge":
if "x-merge-strategy" in table and table["x-merge-strategy"] not in MERGE_STRATEGIES: # type: ignore[typeddict-item]
raise SchemaException(
f'"{table["x-merge-strategy"]}" is not a valid merge strategy. ' # type: ignore[typeddict-item]
f"""Allowed values: {', '.join(['"' + s + '"' for s in MERGE_STRATEGIES])}."""
)
if has_column_with_prop(table, "hard_delete"):
if len(get_columns_names_with_prop(table, "hard_delete")) > 1:
raise SchemaException(
Expand Down
8 changes: 4 additions & 4 deletions dlt/common/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
except ImportError:
PydanticBaseModel = None # type: ignore[misc]

from dlt.common import pendulum
from dlt.common.pendulum import pendulum
from dlt.common.arithmetics import Decimal
from dlt.common.wei import Wei
from dlt.common.utils import map_nested_in_place
Expand Down Expand Up @@ -99,19 +99,19 @@ def _datetime_decoder(obj: str) -> datetime:
# Backwards compatibility for data encoded with previous dlt version
# fromisoformat does not support Z suffix (until py3.11)
obj = obj[:-1] + "+00:00"
return pendulum.DateTime.fromisoformat(obj) # type: ignore[attr-defined, no-any-return]
return pendulum.DateTime.fromisoformat(obj)


# define decoder for each prefix
DECODERS: List[Callable[[Any], Any]] = [
Decimal,
_datetime_decoder,
pendulum.Date.fromisoformat, # type: ignore[attr-defined]
pendulum.Date.fromisoformat,
UUID,
HexBytes,
base64.b64decode,
Wei,
pendulum.Time.fromisoformat, # type: ignore[attr-defined]
pendulum.Time.fromisoformat,
]
# how many decoders?
PUA_CHARACTER_MAX = len(DECODERS)
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/libs/numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
try:
import numpy
except ModuleNotFoundError:
raise MissingDependencyException("DLT Numpy Helpers", ["numpy"])
raise MissingDependencyException("dlt Numpy Helpers", ["numpy"])
2 changes: 1 addition & 1 deletion dlt/common/libs/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
try:
import pandas
except ModuleNotFoundError:
raise MissingDependencyException("DLT Pandas Helpers", ["pandas"])
raise MissingDependencyException("dlt Pandas Helpers", ["pandas"])


def pandas_to_arrow(df: pandas.DataFrame) -> Any:
Expand Down
10 changes: 5 additions & 5 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)

from dlt import version
from dlt.common import pendulum
from dlt.common.pendulum import pendulum
from dlt.common.exceptions import MissingDependencyException
from dlt.common.schema.typing import DLT_NAME_PREFIX, TTableSchemaColumns

Expand Down Expand Up @@ -119,7 +119,7 @@ def get_pyarrow_int(precision: Optional[int]) -> Any:
return pyarrow.int64()


def _get_column_type_from_py_arrow(dtype: pyarrow.DataType) -> TColumnType:
def get_column_type_from_py_arrow(dtype: pyarrow.DataType) -> TColumnType:
"""Returns (data_type, precision, scale) tuple from pyarrow.DataType"""
if pyarrow.types.is_string(dtype) or pyarrow.types.is_large_string(dtype):
return dict(data_type="text")
Expand Down Expand Up @@ -226,14 +226,14 @@ def should_normalize_arrow_schema(
) -> Tuple[bool, Mapping[str, str], Dict[str, str], TTableSchemaColumns]:
rename_mapping = get_normalized_arrow_fields_mapping(schema, naming)
rev_mapping = {v: k for k, v in rename_mapping.items()}
dlt_table_prefix = naming.normalize_table_identifier(DLT_NAME_PREFIX)
dlt_tables = list(map(naming.normalize_table_identifier, ("_dlt_id", "_dlt_load_id")))

# remove all columns that are dlt columns but are not present in arrow schema. we do not want to add such columns
# that should happen in the normalizer
columns = {
name: column
for name, column in columns.items()
if not name.startswith(dlt_table_prefix) or name in rev_mapping
if name not in dlt_tables or name in rev_mapping
}

# check if nothing to rename
Expand Down Expand Up @@ -322,7 +322,7 @@ def py_arrow_to_table_schema_columns(schema: pyarrow.Schema) -> TTableSchemaColu
result[field.name] = {
"name": field.name,
"nullable": field.nullable,
**_get_column_type_from_py_arrow(field.type),
**get_column_type_from_py_arrow(field.type),
}
return result

Expand Down
Loading
Loading