Skip to content

Commit

Permalink
moves type mappers into destination capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Sep 8, 2024
1 parent c2d35bf commit 1a8111d
Show file tree
Hide file tree
Showing 51 changed files with 1,527 additions and 1,032 deletions.
1 change: 1 addition & 0 deletions dlt/common/data_writers/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import NamedTuple, Sequence

from dlt.common.destination import TLoaderFileFormat
from dlt.common.exceptions import DltException

Expand Down
7 changes: 4 additions & 3 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
from dlt.common.destination import (
DestinationCapabilitiesContext,
TLoaderFileFormat,
ALL_SUPPORTED_FILE_FORMATS,
LOADER_FILE_FORMATS,
)
from dlt.common.metrics import DataWriterMetrics
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.schema.utils import is_nullable_column
from dlt.common.typing import StrAny


Expand Down Expand Up @@ -115,7 +116,7 @@ def item_format_from_file_extension(cls, extension: str) -> TDataItemFormat:
elif extension == "parquet":
return "arrow"
# those files may be imported by normalizer as is
elif extension in ALL_SUPPORTED_FILE_FORMATS:
elif extension in LOADER_FILE_FORMATS:
return "file"
else:
raise ValueError(f"Cannot figure out data item format for extension {extension}")
Expand Down Expand Up @@ -331,7 +332,7 @@ def write_header(self, columns_schema: TTableSchemaColumns) -> None:
self._caps,
self.timestamp_timezone,
),
nullable=schema_item.get("nullable", True),
nullable=is_nullable_column(schema_item),
)
for name, schema_item in columns_schema.items()
]
Expand Down
6 changes: 4 additions & 2 deletions dlt/common/destination/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
DestinationCapabilitiesContext,
merge_caps_file_formats,
TLoaderFileFormat,
ALL_SUPPORTED_FILE_FORMATS,
LOADER_FILE_FORMATS,
)
from dlt.common.destination.reference import TDestinationReferenceArg, Destination, TDestination
from dlt.common.destination.typing import PreparedTableSchema

__all__ = [
"DestinationCapabilitiesContext",
"merge_caps_file_formats",
"TLoaderFileFormat",
"ALL_SUPPORTED_FILE_FORMATS",
"LOADER_FILE_FORMATS",
"PreparedTableSchema",
"TDestinationReferenceArg",
"Destination",
"TDestination",
Expand Down
70 changes: 68 additions & 2 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from abc import ABC, abstractmethod
from typing import (
Any,
Callable,
ClassVar,
Iterable,
Literal,
Optional,
Sequence,
Tuple,
Set,
Protocol,
Type,
get_args,
)
from dlt.common.data_types import TDataType
from dlt.common.exceptions import TerminalValueError
from dlt.common.normalizers.typing import TNamingConventionReferenceArg
from dlt.common.typing import TLoaderFileFormat
from dlt.common.configuration.utils import serialize_value
Expand All @@ -20,12 +25,19 @@
DestinationLoadingViaStagingNotSupported,
DestinationLoadingWithoutStagingNotSupported,
)
from dlt.common.destination.typing import PreparedTableSchema
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.schema.typing import TTableSchema, TLoaderMergeStrategy, TTableFormat
from dlt.common.schema.typing import (
TColumnSchema,
TColumnType,
TTableSchema,
TLoaderMergeStrategy,
TTableFormat,
)
from dlt.common.wei import EVM_DECIMAL_PRECISION

TLoaderParallelismStrategy = Literal["parallel", "table-sequential", "sequential"]
ALL_SUPPORTED_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat))
LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat))


class LoaderFileFormatSelector(Protocol):
Expand Down Expand Up @@ -53,6 +65,56 @@ def __call__(
) -> Sequence["TLoaderMergeStrategy"]: ...


class DataTypeMapper(ABC):
def __init__(self, capabilities: "DestinationCapabilitiesContext") -> None:
"""Maps dlt data types into destination data types"""
self.capabilities = capabilities

@abstractmethod
def to_destination_type(self, column: TColumnSchema, table: PreparedTableSchema) -> str:
"""Gets destination data type for a particular `column` in prepared `table`"""
pass

@abstractmethod
def from_destination_type(
self, db_type: str, precision: Optional[int], scale: Optional[int]
) -> TColumnType:
"""Gets column type from db type"""
pass

@abstractmethod
def ensure_supported_type(
self,
column: TColumnSchema,
table: PreparedTableSchema,
loader_file_format: TLoaderFileFormat,
) -> None:
"""Makes sure that dlt type in `column` in prepared `table` is supported by the destination for a given file format"""
pass


class UnsupportedTypeMapper(DataTypeMapper):
"""Type Mapper that can't map any type"""

def to_destination_type(self, column: TColumnSchema, table: PreparedTableSchema) -> str:
raise NotImplementedError("No types are supported, use real type mapper")

def from_destination_type(
self, db_type: str, precision: Optional[int], scale: Optional[int]
) -> TColumnType:
raise NotImplementedError("No types are supported, use real type mapper")

def ensure_supported_type(
self,
column: TColumnSchema,
table: PreparedTableSchema,
loader_file_format: TLoaderFileFormat,
) -> None:
raise TerminalValueError(
"No types are supported, use real type mapper", column["data_type"]
)


@configspec
class DestinationCapabilitiesContext(ContainerInjectableContext):
"""Injectable destination capabilities required for many Pipeline stages ie. normalize"""
Expand All @@ -65,6 +127,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
loader_file_format_selector: LoaderFileFormatSelector = None
"""Callable that adapts `preferred_loader_file_format` and `supported_loader_file_formats` at runtime."""
supported_table_formats: Sequence[TTableFormat] = None
type_mapper: Optional[Type[DataTypeMapper]] = None
recommended_file_size: Optional[int] = None
"""Recommended file size in bytes when writing extract/load files"""
preferred_staging_file_format: Optional[TLoaderFileFormat] = None
Expand Down Expand Up @@ -157,6 +220,9 @@ def generic_capabilities(
caps.merge_strategies_selector = merge_strategies_selector
return caps

def get_type_mapper(self) -> DataTypeMapper:
return self.type_mapper(self)


def merge_caps_file_formats(
destination: str,
Expand Down
33 changes: 32 additions & 1 deletion dlt/common/destination/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Iterable, List
from typing import Any, Iterable, List, Sequence

from dlt.common.exceptions import DltException, TerminalException, TransientException

Expand Down Expand Up @@ -102,6 +102,37 @@ def __init__(
)


class UnsupportedDataType(DestinationTerminalException):
def __init__(
self,
destination_type: str,
table_name: str,
column: str,
data_type: str,
file_format: str,
available_in_formats: Sequence[str],
more_info: str,
) -> None:
self.destination_type = destination_type
self.table_name = table_name
self.column = column
self.data_type = data_type
self.file_format = file_format
self.available_in_formats = available_in_formats
self.more_info = more_info
msg = (
f"Destination {destination_type} cannot load data type '{data_type}' from"
f" '{file_format}' files. The affected table is '{table_name}' column '{column}'."
)
if available_in_formats:
msg += f" Note: '{data_type}' can be loaded from {available_in_formats} formats(s)."
else:
msg += f" None of available file formats support '{data_type}' for this destination."
if more_info:
msg += " More info: " + more_info
super().__init__(msg)


class DestinationHasFailedJobs(DestinationTerminalException):
def __init__(self, destination_name: str, load_id: str, failed_jobs: List[Any]) -> None:
self.destination_name = destination_name
Expand Down
26 changes: 15 additions & 11 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

from dlt.common import logger, pendulum
from dlt.common.configuration.specs.base_configuration import extract_inner_hint
from dlt.common.destination.utils import verify_schema_capabilities
from dlt.common.destination.typing import PreparedTableSchema
from dlt.common.destination.utils import verify_schema_capabilities, verify_supported_data_types
from dlt.common.exceptions import TerminalValueError
from dlt.common.metrics import LoadJobMetrics
from dlt.common.normalizers.naming import NamingConvention
Expand Down Expand Up @@ -268,13 +269,6 @@ class DestinationClientDwhWithStagingConfiguration(DestinationClientDwhConfigura
"""If dlt should truncate the tables on staging destination before loading data."""


class PreparedTableSchema(_TTableSchemaBase, total=False):
"""Table schema with all hints prepared to be loaded"""

write_disposition: TWriteDisposition
_x_prepared: bool # needed for the type checker


TLoadJobState = Literal["ready", "running", "failed", "retry", "completed"]


Expand Down Expand Up @@ -459,7 +453,9 @@ def drop_storage(self) -> None:
"""Brings storage back into not initialized state. Typically data in storage is destroyed."""
pass

def verify_schema(self, only_tables: Iterable[str] = None) -> List[PreparedTableSchema]:
def verify_schema(
self, only_tables: Iterable[str] = None, new_jobs: Iterable[ParsedLoadJobFileName] = None
) -> List[PreparedTableSchema]:
"""Verifies schema before loading, returns a list of verified loaded tables."""
if exceptions := verify_schema_capabilities(
self.schema,
Expand All @@ -470,13 +466,21 @@ def verify_schema(self, only_tables: Iterable[str] = None) -> List[PreparedTable
for exception in exceptions:
logger.error(str(exception))
raise exceptions[0]
# verify all tables with data
return [

prepared_tables = [
self.prepare_load_table(table_name)
for table_name in set(
list(only_tables or []) + self.schema.data_table_names(seen_data_only=True)
)
]
verify_supported_data_types(
prepared_tables,
new_jobs,
self.capabilities,
self.config.destination_type,
warnings=False,
)
return prepared_tables

def update_stored_schema(
self,
Expand Down
8 changes: 8 additions & 0 deletions dlt/common/destination/typing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from dlt.common.schema.typing import _TTableSchemaBase, TWriteDisposition


class PreparedTableSchema(_TTableSchemaBase, total=False):
"""Table schema with all hints prepared to be loaded"""

write_disposition: TWriteDisposition
_x_prepared: bool # needed for the type checker
82 changes: 77 additions & 5 deletions dlt/common/destination/utils.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
from typing import List, Optional, Sequence
import contextlib
from typing import Dict, Iterable, List, Optional, Set

from dlt.common import logger
from dlt.common.configuration.inject import with_config
from dlt.common.destination.exceptions import (
DestinationCapabilitiesException,
IdentifierTooLongException,
)
from dlt.common.destination.typing import PreparedTableSchema
from dlt.common.destination.exceptions import UnsupportedDataType
from dlt.common.destination.capabilities import DestinationCapabilitiesContext, LOADER_FILE_FORMATS
from dlt.common.schema import Schema
from dlt.common.schema.exceptions import (
SchemaIdentifierNormalizationCollision,
)
from dlt.common.schema.typing import TLoaderMergeStrategy, TSchemaTables, TTableSchema
from dlt.common.schema.typing import TColumnType, TLoaderMergeStrategy, TSchemaTables, TTableSchema
from dlt.common.schema.utils import get_merge_strategy
from dlt.common.typing import ConfigValue, DictStrStr

from .capabilities import DestinationCapabilitiesContext
from dlt.common.storages import ParsedLoadJobFileName
from dlt.common.typing import ConfigValue, DictStrStr, TLoaderFileFormat


def verify_schema_capabilities(
Expand Down Expand Up @@ -113,6 +116,75 @@ def verify_schema_capabilities(
return exception_log


def column_type_to_str(column: TColumnType) -> str:
"""Converts column type to db-like type string"""
data_type: str = column["data_type"]
if (precision := column.get("precision")) and (scale := column.get("scale")):
data_type += f"({precision},{scale})"
elif precision:
data_type += f"({precision})"
return data_type


def verify_supported_data_types(
prepared_tables: Iterable[PreparedTableSchema],
new_jobs: Iterable[ParsedLoadJobFileName],
capabilities: DestinationCapabilitiesContext,
destination_type: str,
warnings: bool = True,
) -> List[Exception]:
exception_log: List[Exception] = []
# can't check types without type mapper
if capabilities.type_mapper is None:
return exception_log

type_mapper = capabilities.get_type_mapper()

# index available file formats
table_file_formats: Dict[str, Set[TLoaderFileFormat]] = {}
for parsed_file in new_jobs:
formats = table_file_formats.setdefault(parsed_file.table_name, set())
if parsed_file.file_format in LOADER_FILE_FORMATS:
formats.add(parsed_file.file_format) # type: ignore[arg-type]
# all file formats
all_file_formats = set(capabilities.supported_loader_file_formats or []) | set(
capabilities.supported_staging_file_formats or []
)

for table in prepared_tables:
# map types
for column in table["columns"].values():
try:
type_mapper.to_destination_type(column, table)
except Exception as ex:
# collect mapping exceptions
exception_log.append(ex)
# ensure if types can be loaded from file formats present in jobs
for format_ in table_file_formats.get(table["name"], []):
try:
type_mapper.ensure_supported_type(column, table, format_)
except ValueError as err:
# figure out where data type is supported
available_in_formats: List[TLoaderFileFormat] = []
for candidate_format in all_file_formats - set([format_]):
with contextlib.suppress(Exception):
type_mapper.ensure_supported_type(column, table, candidate_format)
available_in_formats.append(candidate_format)
exception_log.append(
UnsupportedDataType(
destination_type,
table["name"],
column["name"],
column_type_to_str(column),
format_,
available_in_formats,
err.args[0],
)
)

return exception_log


@with_config
def resolve_merge_strategy(
tables: TSchemaTables,
Expand Down
Loading

0 comments on commit 1a8111d

Please sign in to comment.