Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend custom destination #1107

Merged
merged 18 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
insert_values_writer_type: str = "default"
supports_multiple_statements: bool = True
supports_clone_table: bool = False
max_table_nesting: Optional[int] = None # destination can overwrite max table nesting
"""Destination supports CREATE TABLE ... CLONE ... statements"""

# do not allow to create default value, destination caps must be always explicitly inserted into container
Expand Down
21 changes: 21 additions & 0 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,27 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
return []


class DoNothingJob(LoadJob):
"""The most lazy class of dlt"""

def __init__(self, file_path: str) -> None:
super().__init__(FileStorage.get_file_name_from_file_path(file_path))

def state(self) -> TLoadJobState:
# this job is always done
return "completed"

def exception(self) -> str:
# this part of code should be never reached
raise NotImplementedError()


class DoNothingFollowupJob(DoNothingJob, FollowupJob):
"""The second most lazy class of dlt"""

pass


class JobClientBase(ABC):
capabilities: ClassVar[DestinationCapabilitiesContext] = None

Expand Down
4 changes: 4 additions & 0 deletions dlt/destinations/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ def destination(
batch_size: int = 10,
name: str = None,
naming_convention: str = "direct",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good! please add this to our docs: that the default settings are such that data comes to sink without changing identifiers, un-nested and with dlt identifiers removed. and that it is good to push stuff to queues and REST APIs

skip_dlt_columns_and_tables: bool = True,
max_table_nesting: int = 0,
spec: Type[GenericDestinationClientConfiguration] = GenericDestinationClientConfiguration,
) -> Callable[
[Callable[Concatenate[Union[TDataItems, str], TTableSchema, TDestinationCallableParams], Any]],
Expand All @@ -49,6 +51,8 @@ def wrapper(
batch_size=batch_size,
destination_name=name,
naming_convention=naming_convention,
skip_dlt_columns_and_tables=skip_dlt_columns_and_tables,
max_table_nesting=max_table_nesting,
**kwargs, # type: ignore
)

Expand Down
23 changes: 1 addition & 22 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from dlt.common.schema.typing import TTableSchema, TColumnType, TWriteDisposition, TTableFormat
from dlt.common.schema.utils import table_schema_has_type, get_table_format
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import LoadJob, FollowupJob
from dlt.common.destination.reference import LoadJob, DoNothingFollowupJob, DoNothingJob
from dlt.common.destination.reference import TLoadJobState, NewLoadJob, SupportsStagingDestination
from dlt.common.storages import FileStorage
from dlt.common.data_writers.escape import escape_bigquery_identifier
Expand Down Expand Up @@ -149,27 +149,6 @@ def __init__(self) -> None:
DLTAthenaFormatter._INSTANCE = self


class DoNothingJob(LoadJob):
"""The most lazy class of dlt"""

def __init__(self, file_path: str) -> None:
super().__init__(FileStorage.get_file_name_from_file_path(file_path))

def state(self) -> TLoadJobState:
# this job is always done
return "completed"

def exception(self) -> str:
# this part of code should be never reached
raise NotImplementedError()


class DoNothingFollowupJob(DoNothingJob, FollowupJob):
"""The second most lazy class of dlt"""

pass


class AthenaSQLClient(SqlClientBase[Connection]):
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()
dbapi: ClassVar[DBApi] = pyathena
Expand Down
3 changes: 3 additions & 0 deletions dlt/destinations/impl/destination/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from typing import Optional
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.data_writers import TLoaderFileFormat


def capabilities(
preferred_loader_file_format: TLoaderFileFormat = "puae-jsonl",
naming_convention: str = "direct",
max_table_nesting: Optional[int] = 0,
) -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext.generic_capabilities(preferred_loader_file_format)
caps.supported_loader_file_formats = ["puae-jsonl", "parquet"]
caps.supports_ddl_transactions = False
caps.supports_transactions = False
caps.naming_convention = naming_convention
caps.max_table_nesting = max_table_nesting
return caps
2 changes: 2 additions & 0 deletions dlt/destinations/impl/destination/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class GenericDestinationClientConfiguration(DestinationClientConfiguration):
destination_callable: Optional[Union[str, TDestinationCallable]] = None # noqa: A003
loader_file_format: TLoaderFileFormat = "puae-jsonl"
batch_size: int = 10
skip_dlt_columns_and_tables: bool = True
max_table_nesting: int = 0

if TYPE_CHECKING:

Expand Down
32 changes: 30 additions & 2 deletions dlt/destinations/impl/destination/destination.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from abc import ABC, abstractmethod
from types import TracebackType
from typing import ClassVar, Dict, Optional, Type, Iterable, Iterable, cast, Dict
from typing import ClassVar, Dict, Optional, Type, Iterable, Iterable, cast, Dict, List
from copy import deepcopy

from dlt.common.destination.reference import LoadJob
from dlt.destinations.job_impl import EmptyLoadJob
from dlt.common.typing import TDataItems, AnyFun
from dlt.common import json
Expand All @@ -18,6 +20,7 @@
from dlt.common.destination.reference import (
TLoadJobState,
LoadJob,
DoNothingJob,
JobClientBase,
)

Expand All @@ -27,6 +30,8 @@
TDestinationCallable,
)

INTERNAL_MARKER = "_dlt"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you must use schema._dlt_tables_prefix (which may be normalized) to detect dlt identifiers. you may add such method to schema (but it will be slower to call a method)



class DestinationLoadJob(LoadJob, ABC):
def __init__(
Expand All @@ -37,6 +42,7 @@ def __init__(
schema: Schema,
destination_state: Dict[str, int],
destination_callable: TDestinationCallable,
skipped_columns: List[str],
) -> None:
super().__init__(FileStorage.get_file_name_from_file_path(file_path))
self._file_path = file_path
Expand All @@ -47,6 +53,7 @@ def __init__(
self._callable = destination_callable
self._state: TLoadJobState = "running"
self._storage_id = f"{self._parsed_file_name.table_name}.{self._parsed_file_name.file_id}"
self.skipped_columns = skipped_columns
try:
if self._config.batch_size == 0:
# on batch size zero we only call the callable with the filename
Expand Down Expand Up @@ -93,9 +100,14 @@ def run(self, start_index: int) -> Iterable[TDataItems]:
start_index % self._config.batch_size
) == 0, "Batch size was changed during processing of one load package"

# on record batches we cannot drop columns, we need to
# select the ones we want to keep
keep_columns = list(self._table["columns"].keys())
start_batch = start_index / self._config.batch_size
with pyarrow.parquet.ParquetFile(self._file_path) as reader:
for record_batch in reader.iter_batches(batch_size=self._config.batch_size):
for record_batch in reader.iter_batches(
batch_size=self._config.batch_size, columns=keep_columns
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
):
if start_batch > 0:
start_batch -= 1
continue
Expand All @@ -115,6 +127,9 @@ def run(self, start_index: int) -> Iterable[TDataItems]:
if start_index > 0:
start_index -= 1
continue
# skip internal columns
for column in self.skipped_columns:
item.pop(column, None)
current_batch.append(item)
if len(current_batch) == self._config.batch_size:
yield current_batch
Expand Down Expand Up @@ -150,6 +165,17 @@ def update_stored_schema(
return super().update_stored_schema(only_tables, expected_update)

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
# skip internal tables and remove columns from schema if so configured
skipped_columns: List[str] = []
if self.config.skip_dlt_columns_and_tables:
if table["name"].startswith(INTERNAL_MARKER):
return DoNothingJob(file_path)
table = deepcopy(table)
for column in list(table["columns"].keys()):
if column.startswith(INTERNAL_MARKER):
table["columns"].pop(column)
skipped_columns.append(column)

# save our state in destination name scope
load_state = destination_state()
if file_path.endswith("parquet"):
Expand All @@ -160,6 +186,7 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
self.schema,
load_state,
self.destination_callable,
skipped_columns,
)
if file_path.endswith("jsonl"):
return DestinationJsonlLoadJob(
Expand All @@ -169,6 +196,7 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
self.schema,
load_state,
self.destination_callable,
skipped_columns,
)
return None

Expand Down
5 changes: 3 additions & 2 deletions dlt/destinations/impl/destination/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ class DestinationInfo(t.NamedTuple):
class destination(Destination[GenericDestinationClientConfiguration, "DestinationClient"]):
def capabilities(self) -> DestinationCapabilitiesContext:
return capabilities(
self.config_params.get("loader_file_format", "puae-jsonl"),
self.config_params.get("naming_convention", "direct"),
preferred_loader_file_format=self.config_params.get("loader_file_format", "puae-jsonl"),
naming_convention=self.config_params.get("naming_convention", "direct"),
max_table_nesting=self.config_params.get("max_table_nesting", None),
)

@property
Expand Down
13 changes: 12 additions & 1 deletion dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,18 @@ def normalize(
return None

# make sure destination capabilities are available
self._get_destination_capabilities()
caps = self._get_destination_capabilities()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rudolfix i need some guidance where to inject / overwrite the max_nesting_level coming from a destination. I realize this place is very likely not the right one, but I am not sure where and how to do it. Should I get the capabilities context in the relationalnormalizer and not persist this setting to the schema at all, or what is the best way?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only thing you need to do is to fix NormalizersConfiguration

def on_resolved(self) -> None:
        # get naming from capabilities if not present
        if self.naming is None:
            if self.destination_capabilities:
                self.naming = self.destination_capabilities.naming_convention

detect the type of the json normalizer and apply the settings to it like the below. you can override existing settings. I think capabilities (if not None) should have precedence over the source settings.

what happens later:
when new schema is created this setting will be used
when schema is loaded - it will not but when we call update_normalize - it will. and we do that in normalizer (in schema.clone). so it should work!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is set on the nested json normalizer settings i had to change a bit more but not much, I hope it is ok to change the type from mapping to dict in there.

if caps.max_table_nesting is not None:
# destination settings override normalizer settings in schema
from dlt.common.normalizers.json.relational import (
DataItemNormalizer as RelationalNormalizer,
)

RelationalNormalizer.update_normalizer_config(
self.default_schema, {"max_nesting": caps.max_table_nesting}
)
self._schema_storage.save_schema(self.default_schema)

# create default normalize config
normalize_config = NormalizeConfiguration(
workers=workers,
Expand Down
4 changes: 3 additions & 1 deletion docs/website/docs/dlt-ecosystem/destinations/destination.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ the sink from your pipeline constructor. Now you can run your pipeline and see t
The full signature of the destination decorator plus its function is the following:

```py
@dlt.destination(batch_size=10, loader_file_format="jsonl", name="my_sink", naming="direct")
@dlt.destination(batch_size=10, loader_file_format="jsonl", name="my_sink", naming_convention="direct", max_nesting_level=0, skip_dlt_columns_and_tables=True)
def sink(items: TDataItems, table: TTableSchema) -> None:
...
```
Expand All @@ -82,6 +82,8 @@ in any way you like.
this can be `jsonl` or `parquet`.
* The `name` parameter on the destination decorator defines the name of the destination that get's created by the destination decorator.
* The `naming_convention` parameter on the destination decorator defines the name of the destination that gets created by the destination decorator. This controls
* The `max_nesting_level` parameter on the destination decorator defines how deep the normalizer will go to normalize complex fields on your data to create subtables. This overwrites any settings on your `source` and is set to zero to not create any nested tables by default.
* The `skip_dlt_columns_and_tables` parameter on the destination decorator defines wether internal tables and columns will be fed into the custom destination function. This is set to False by default.
how table and column names are normalized. The default is `direct` which will keep all names the same.

#### Sink function
Expand Down
Loading
Loading