Skip to content

Commit

Permalink
new iceberg approach
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Oct 9, 2023
1 parent 92613ec commit 0924bc5
Show file tree
Hide file tree
Showing 16 changed files with 178 additions and 139 deletions.
15 changes: 5 additions & 10 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.schema.exceptions import InvalidDatasetName
from dlt.common.schema.utils import get_load_table
from dlt.common.configuration import configspec
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.configuration.accessors import config
Expand Down Expand Up @@ -244,13 +245,8 @@ def restore_file_load(self, file_path: str) -> LoadJob:
"""Finds and restores already started loading job identified by `file_path` if destination supports it."""
pass

def get_truncate_destination_table_dispositions(self) -> List[TWriteDisposition]:
# in the base job, all replace strategies are treated the same, see filesystem for example
return ["replace"]

def get_truncate_staging_destination_table_dispositions(self) -> List[TWriteDisposition]:
# some clients need to additionally be able to get the staging destination to truncate tables
return []
def table_needs_truncating(self, table: TTableSchema) -> bool:
return table["write_disposition"] == "replace"

def create_table_chain_completed_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
"""Creates a list of followup jobs that should be executed after a table chain is completed"""
Expand Down Expand Up @@ -313,9 +309,8 @@ class WithStagingDataset(ABC):
"""Adds capability to use staging dataset and request it from the loader"""

@abstractmethod
def get_stage_dispositions(self) -> List[TWriteDisposition]:
"""Returns a list of write dispositions that require staging dataset"""
return []
def table_needs_staging(self, table: TTableSchema) -> bool:
return False

@abstractmethod
def with_staging_dataset(self)-> ContextManager["JobClientBase"]:
Expand Down
5 changes: 5 additions & 0 deletions dlt/common/schema/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,8 @@ def __init__(self, schema_name: str, init_engine: int, from_engine: int, to_engi
self.from_engine = from_engine
self.to_engine = to_engine
super().__init__(f"No engine upgrade path in schema {schema_name} from {init_engine} to {to_engine}, stopped at {from_engine}")

class UnknownTableException(SchemaException):
def __init__(self, table_name: str) -> None:
self.table_name = table_name
super().__init__(f"Trying to access unknown table {table_name}.")
2 changes: 2 additions & 0 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
TColumnHint = Literal["not_null", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique", "root_key", "merge_key"]
"""Known hints of a column used to declare hint regexes."""
TWriteDisposition = Literal["skip", "append", "replace", "merge"]
TTableFormat = Literal["iceberg"]
TTypeDetections = Literal["timestamp", "iso_timestamp", "large_integer", "hexbytes_to_text", "wei_to_double"]
TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]]
TColumnNames = Union[str, Sequence[str]]
Expand Down Expand Up @@ -86,6 +87,7 @@ class TTableSchema(TypedDict, total=False):
filters: Optional[TRowFilters]
columns: TTableSchemaColumns
resource: Optional[str]
table_format: Optional[TTableFormat]


class TPartialTableSchema(TTableSchema):
Expand Down
46 changes: 36 additions & 10 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
from dlt.common.validation import TCustomValidator, validate_dict, validate_dict_ignoring_xkeys
from dlt.common.schema import detections
from dlt.common.schema.typing import (COLUMN_HINTS, SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, SIMPLE_REGEX_PREFIX, VERSION_TABLE_NAME, TColumnName, TPartialTableSchema, TSchemaTables, TSchemaUpdate,
TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp,
TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp, TTableFormat,
TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition)
from dlt.common.schema.exceptions import (CannotCoerceColumnException, ParentTableNotFoundException, SchemaEngineNoUpgradePathException, SchemaException,
TablePropertiesConflictException, InvalidSchemaName)
TablePropertiesConflictException, InvalidSchemaName, UnknownTableException)

from dlt.common.normalizers.utils import import_normalizers
from dlt.common.schema.typing import TAnySchemaColumns
Expand Down Expand Up @@ -493,18 +493,29 @@ def merge_schema_updates(schema_updates: Sequence[TSchemaUpdate]) -> TSchemaTabl
return aggregated_update


def get_write_disposition(tables: TSchemaTables, table_name: str) -> TWriteDisposition:
"""Returns write disposition of a table if present. If not, looks up into parent table"""
def get_inherited_table_hint(tables: TSchemaTables, table_name: str, table_hint_name: str, allow_none: bool = False) -> Any:
table = tables[table_name]
w_d = table.get("write_disposition")
if w_d:
return w_d
hint = table.get(table_hint_name)
if hint:
return hint

parent = table.get("parent")
if parent:
return get_write_disposition(tables, parent)
return get_inherited_table_hint(tables, parent, table_hint_name, allow_none)

if allow_none:
return None

raise ValueError(f"No table hint '{table_hint_name} found in the chain of tables for '{table_name}'.")


def get_write_disposition(tables: TSchemaTables, table_name: str) -> TWriteDisposition:
"""Returns table hint of a table if present. If not, looks up into parent table"""
return get_inherited_table_hint(tables, table_name, "write_disposition", allow_none=False)


raise ValueError(f"No write disposition found in the chain of tables for '{table_name}'.")
def get_table_format(tables: TSchemaTables, table_name: str) -> TTableFormat:
return get_inherited_table_hint(tables, table_name, "table_format", allow_none=True)


def table_schema_has_type(table: TTableSchema, _typ: TDataType) -> bool:
Expand All @@ -525,6 +536,18 @@ def get_top_level_table(tables: TSchemaTables, table_name: str) -> TTableSchema:
return get_top_level_table(tables, parent)
return table

def get_load_table(tables: TSchemaTables, table_name: str) -> TTableSchema:
try:
# make a copy of the schema so modifications do not affect the original document
table = copy(tables[table_name])
# add write disposition if not specified - in child tables
if "write_disposition" not in table:
table["write_disposition"] = get_write_disposition(tables, table_name)
if "table_format" not in table:
table["table_format"] = get_table_format(tables, table_name)
return table
except KeyError:
raise UnknownTableException(table_name)

def get_child_tables(tables: TSchemaTables, table_name: str) -> List[TTableSchema]:
"""Get child tables for table name and return a list of tables ordered by ancestry so the child tables are always after their parents"""
Expand Down Expand Up @@ -637,7 +660,8 @@ def new_table(
write_disposition: TWriteDisposition = None,
columns: Sequence[TColumnSchema] = None,
validate_schema: bool = False,
resource: str = None
resource: str = None,
table_format: TTableFormat = None
) -> TTableSchema:

table: TTableSchema = {
Expand All @@ -652,6 +676,8 @@ def new_table(
# set write disposition only for root tables
table["write_disposition"] = write_disposition or DEFAULT_WRITE_DISPOSITION
table["resource"] = resource or table_name
if table_format:
table["table_format"] = table_format
if validate_schema:
validate_dict_ignoring_xkeys(
spec=TColumnSchema,
Expand Down
5 changes: 4 additions & 1 deletion dlt/common/storages/load_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,11 @@ def list_failed_jobs(self, load_id: str) -> Sequence[str]:
return self.storage.list_folder_files(self._get_job_folder_path(load_id, LoadStorage.FAILED_JOBS_FOLDER))

def list_jobs_for_table(self, load_id: str, table_name: str) -> Sequence[LoadJobInfo]:
return [job for job in self.list_all_jobs(load_id) if job.job_file_info.table_name == table_name]

def list_all_jobs(self, load_id: str) -> Sequence[LoadJobInfo]:
info = self.get_load_package_info(load_id)
return [job for job in flatten_list_or_items(iter(info.jobs.values())) if job.job_file_info.table_name == table_name] # type: ignore
return [job for job in flatten_list_or_items(iter(info.jobs.values()))] # type: ignore

def list_completed_failed_jobs(self, load_id: str) -> Sequence[str]:
return self.storage.list_folder_files(self._get_job_folder_completed_path(load_id, LoadStorage.FAILED_JOBS_FOLDER))
Expand Down
25 changes: 11 additions & 14 deletions dlt/destinations/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dlt.common.data_types import TDataType
from dlt.common.schema import TColumnSchema, Schema
from dlt.common.schema.typing import TTableSchema, TColumnType, TWriteDisposition
from dlt.common.schema.utils import table_schema_has_type
from dlt.common.schema.utils import table_schema_has_type, get_table_format
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import LoadJob, FollowupJob
from dlt.common.destination.reference import TLoadJobState, NewLoadJob
Expand Down Expand Up @@ -325,12 +325,13 @@ def _get_table_update_sql(self, table_name: str, new_columns: Sequence[TColumnSc

# for the system tables we need to create empty iceberg tables to be able to run, DELETE and UPDATE queries
# or if we are in iceberg mode, we create iceberg tables for all tables
is_iceberg = self.schema.tables[table_name].get("write_disposition", None) == "skip"
is_iceberg = (self.schema.tables[table_name].get("write_disposition", None) == "skip") or (self._is_iceberg_table(self.schema.tables[table_name]) and not self.in_staging_mode)
columns = ", ".join([self._get_column_def_sql(c) for c in new_columns])

# this will fail if the table prefix is not properly defined
table_prefix = self.table_prefix_layout.format(table_name=table_name)
location = f"{bucket}/{dataset}/{table_prefix}"

# use qualified table names
qualified_table_name = self.sql_client.make_qualified_ddl_table_name(table_name)
if is_iceberg and not generate_alter:
Expand Down Expand Up @@ -372,18 +373,14 @@ def _create_replace_followup_jobs(self, table_chain: Sequence[TTableSchema]) ->
return super()._create_replace_followup_jobs(table_chain)

def _is_iceberg_table(self, table: TTableSchema) -> bool:
return False

def get_stage_dispositions(self) -> List[TWriteDisposition]:
# in iceberg mode, we always use staging tables
# if self.iceberg_mode:
# return ["append", "replace", "merge"]
return super().get_stage_dispositions()

def get_truncate_staging_destination_table_dispositions(self) -> List[TWriteDisposition]:
# if self.iceberg_mode:
# return ["append", "replace", "merge"]
return []
table_format = get_table_format(self.schema.tables, table["name"])
return table_format == "iceberg"

def table_needs_staging(self, table: TTableSchema) -> bool:
# all iceberg tables need staging
if self._is_iceberg_table(table):
return True
return super().table_needs_staging(table)

@staticmethod
def is_dbapi_exception(ex: Exception) -> bool:
Expand Down
5 changes: 3 additions & 2 deletions dlt/destinations/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
from dlt.common.storages.file_storage import FileStorage
from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns
from dlt.common.schema.typing import TTableSchema, TColumnType
from dlt.common.schema.exceptions import UnknownTableException

from dlt.destinations.job_client_impl import SqlJobClientWithStaging
from dlt.destinations.exceptions import DestinationSchemaWillNotUpdate, DestinationTransientException, LoadJobNotExistsException, LoadJobTerminalException, LoadJobUnknownTableException
from dlt.destinations.exceptions import DestinationSchemaWillNotUpdate, DestinationTransientException, LoadJobNotExistsException, LoadJobTerminalException

from dlt.destinations.bigquery import capabilities
from dlt.destinations.bigquery.configuration import BigQueryClientConfiguration
Expand Down Expand Up @@ -220,7 +221,7 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
reason = BigQuerySqlClient._get_reason_from_errors(gace)
if reason == "notFound":
# google.api_core.exceptions.NotFound: 404 - table not found
raise LoadJobUnknownTableException(table["name"], file_path)
raise UnknownTableException(table["name"])
elif reason == "duplicate":
# google.api_core.exceptions.Conflict: 409 PUT - already exists
return self.restore_file_load(file_path)
Expand Down
6 changes: 0 additions & 6 deletions dlt/destinations/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,6 @@ def __init__(self, file_path: str, message: str) -> None:
super().__init__(f"Job with id/file name {file_path} encountered unrecoverable problem: {message}")


class LoadJobUnknownTableException(DestinationTerminalException):
def __init__(self, table_name: str, file_name: str) -> None:
self.table_name = table_name
super().__init__(f"Client does not know table {table_name} for load file {file_name}")


class LoadJobInvalidStateTransitionException(DestinationTerminalException):
def __init__(self, from_state: TLoadJobState, to_state: TLoadJobState) -> None:
self.from_state = from_state
Expand Down
29 changes: 21 additions & 8 deletions dlt/destinations/filesystem/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import posixpath
import os
from types import TracebackType
from typing import ClassVar, List, Type, Iterable, Set
from typing import ClassVar, List, Type, Iterable, Set, Iterator
from fsspec import AbstractFileSystem
from contextlib import contextmanager

from dlt.common import logger
from dlt.common.schema import Schema, TSchemaTables, TTableSchema
from dlt.common.storages import FileStorage, LoadStorage, filesystem_from_config
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import NewLoadJob, TLoadJobState, LoadJob, JobClientBase, FollowupJob
from dlt.common.destination.reference import NewLoadJob, TLoadJobState, LoadJob, JobClientBase, FollowupJob, WithStagingDataset

from dlt.destinations.job_impl import EmptyLoadJob
from dlt.destinations.filesystem import capabilities
Expand Down Expand Up @@ -68,7 +69,7 @@ def create_followup_jobs(self, next_state: str) -> List[NewLoadJob]:
return jobs


class FilesystemClient(JobClientBase):
class FilesystemClient(JobClientBase, WithStagingDataset):
"""filesystem client storing jobs in memory"""

capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()
Expand All @@ -82,16 +83,22 @@ def __init__(self, schema: Schema, config: FilesystemDestinationClientConfigurat
# verify files layout. we need {table_name} and only allow {schema_name} before it, otherwise tables
# cannot be replaced and we cannot initialize folders consistently
self.table_prefix_layout = path_utils.get_table_prefix_layout(config.layout)

@property
def dataset_path(self) -> str:
ds_path = posixpath.join(self.fs_path, self.config.normalize_dataset_name(self.schema))
return ds_path
self.dataset_path = posixpath.join(self.fs_path, self.config.normalize_dataset_name(self.schema))

def drop_storage(self) -> None:
if self.is_storage_initialized():
self.fs_client.rm(self.dataset_path, recursive=True)

@contextmanager
def with_staging_dataset(self) -> Iterator["FilesystemClient"]:
current_dataset_path = self.dataset_path
try:
self.dataset_path = posixpath.join(self.fs_path, self.config.normalize_dataset_name(self.schema)) + "_staging"
yield self
finally:
# restore previous dataset name
self.dataset_path = current_dataset_path

def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
# clean up existing files for tables selected for truncating
if truncate_tables and self.fs_client.isdir(self.dataset_path):
Expand Down Expand Up @@ -169,3 +176,9 @@ def __enter__(self) -> "FilesystemClient":

def __exit__(self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb: TracebackType) -> None:
pass

def table_needs_staging(self, table: TTableSchema) -> bool:
# not so nice, how to do it better, collect this info from the main destination as before?
if table["table_format"] == "iceberg":
return True
return super().table_needs_staging(table)
20 changes: 9 additions & 11 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,8 @@ def maybe_ddl_transaction(self) -> Iterator[None]:
else:
yield

def get_truncate_destination_table_dispositions(self) -> List[TWriteDisposition]:
if self.config.replace_strategy == "truncate-and-insert":
return ["replace"]
return []
def table_needs_truncating(self, table: TTableSchema) -> bool:
return table["write_disposition"] == "replace" and self.config.replace_strategy == "truncate-and-insert"

def _create_append_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
return []
Expand Down Expand Up @@ -442,10 +440,10 @@ def with_staging_dataset(self)-> Iterator["SqlJobClientBase"]:
finally:
self.in_staging_mode = False

def get_stage_dispositions(self) -> List[TWriteDisposition]:
"""Returns a list of dispositions that require staging tables to be populated"""
dispositions: List[TWriteDisposition] = ["merge"]
# if we have anything but the truncate-and-insert replace strategy, we need staging tables
if self.config.replace_strategy in ["insert-from-staging", "staging-optimized"]:
dispositions.append("replace")
return dispositions
def table_needs_staging(self, table: TTableSchema) -> bool:
if table["write_disposition"] == "merge":
return True
elif table["write_disposition"] == "replace" and (self.config.replace_strategy in ["insert-from-staging", "staging-optimized"]):
return True
return False

Loading

0 comments on commit 0924bc5

Please sign in to comment.